#4 add concierge app that implements the idea of tasks; rework rtmp to better handle multiple incoming streams; rework restream to create tasks
This commit is contained in:
		
							parent
							
								
									8e71bd515b
								
							
						
					
					
						commit
						ca65c4eaa7
					
				| 
						 | 
					@ -0,0 +1,19 @@
 | 
				
			||||||
 | 
					from django.contrib import admin
 | 
				
			||||||
 | 
					from .models import Identity, Task, Claim
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class IdentityAdmin(admin.ModelAdmin):
 | 
				
			||||||
 | 
					    fields = ['identity', 'name', 'notes', 'heartbeat']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TaskAdmin(admin.ModelAdmin):
 | 
				
			||||||
 | 
					    fields = ['stream', 'type', 'configuration']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ClaimAdmin(admin.ModelAdmin):
 | 
				
			||||||
 | 
					    fields = ['owner', 'task']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					admin.site.register(Identity, IdentityAdmin)
 | 
				
			||||||
 | 
					admin.site.register(Task, TaskAdmin)
 | 
				
			||||||
 | 
					admin.site.register(Claim, ClaimAdmin)
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,8 @@
 | 
				
			||||||
 | 
					from django.apps import AppConfig
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class ConciergeConfig(AppConfig):
 | 
				
			||||||
 | 
					    name = 'concierge'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def ready(self):
 | 
				
			||||||
 | 
					        import concierge.signals  # noqa
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,44 @@
 | 
				
			||||||
 | 
					# Generated by Django 3.0.5 on 2020-04-26 17:25
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.db import migrations, models
 | 
				
			||||||
 | 
					import django.db.models.deletion
 | 
				
			||||||
 | 
					import uuid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Migration(migrations.Migration):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    initial = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    dependencies = [
 | 
				
			||||||
 | 
					        ('rtmp', '0002_stream_publish_counter'),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    operations = [
 | 
				
			||||||
 | 
					        migrations.CreateModel(
 | 
				
			||||||
 | 
					            name='Identity',
 | 
				
			||||||
 | 
					            fields=[
 | 
				
			||||||
 | 
					                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
 | 
				
			||||||
 | 
					                ('identity', models.CharField(default=uuid.uuid4, max_length=36, unique=True)),
 | 
				
			||||||
 | 
					                ('name', models.CharField(max_length=100)),
 | 
				
			||||||
 | 
					                ('notes', models.TextField()),
 | 
				
			||||||
 | 
					                ('heartbeat', models.DateTimeField(blank=True)),
 | 
				
			||||||
 | 
					            ],
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					        migrations.CreateModel(
 | 
				
			||||||
 | 
					            name='Task',
 | 
				
			||||||
 | 
					            fields=[
 | 
				
			||||||
 | 
					                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
 | 
				
			||||||
 | 
					                ('type', models.CharField(max_length=100)),
 | 
				
			||||||
 | 
					                ('configuration', models.TextField()),
 | 
				
			||||||
 | 
					                ('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='rtmp.Stream')),
 | 
				
			||||||
 | 
					            ],
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					        migrations.CreateModel(
 | 
				
			||||||
 | 
					            name='Claim',
 | 
				
			||||||
 | 
					            fields=[
 | 
				
			||||||
 | 
					                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
 | 
				
			||||||
 | 
					                ('owner', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity')),
 | 
				
			||||||
 | 
					                ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='concierge.Task')),
 | 
				
			||||||
 | 
					            ],
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,28 @@
 | 
				
			||||||
 | 
					import uuid
 | 
				
			||||||
 | 
					from django.db import models
 | 
				
			||||||
 | 
					from rtmp.models import Stream
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Identity(models.Model):
 | 
				
			||||||
 | 
					    # models a concierge identity. every running concierge needs to have a
 | 
				
			||||||
 | 
					    # unique identity that is being used for task claims, etc.
 | 
				
			||||||
 | 
					    identity = models.CharField(max_length=36, unique=True, default=uuid.uuid4)
 | 
				
			||||||
 | 
					    name = models.CharField(max_length=100)
 | 
				
			||||||
 | 
					    notes = models.TextField()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # heartbeat indicates last point in time that this identity was seen.
 | 
				
			||||||
 | 
					    # some cronjob should scan the heartbeats and release all claims by
 | 
				
			||||||
 | 
					    # identities that have not been seen in a while. this interval should
 | 
				
			||||||
 | 
					    # be quite short so that the tasks can be claimed by other identities asap.
 | 
				
			||||||
 | 
					    heartbeat = models.DateTimeField(blank=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Task(models.Model):
 | 
				
			||||||
 | 
					    stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
 | 
				
			||||||
 | 
					    type = models.CharField(max_length=100)
 | 
				
			||||||
 | 
					    configuration = models.TextField()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Claim(models.Model):
 | 
				
			||||||
 | 
					    owner = models.ForeignKey(Identity, on_delete=models.CASCADE)
 | 
				
			||||||
 | 
					    task = models.ForeignKey(Task, on_delete=models.CASCADE)
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,11 @@
 | 
				
			||||||
 | 
					from django.dispatch import receiver
 | 
				
			||||||
 | 
					from rtmp.signals import stream_inactive
 | 
				
			||||||
 | 
					from .models import Task
 | 
				
			||||||
 | 
					from rtmp.models import Stream
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@receiver(stream_inactive)
 | 
				
			||||||
 | 
					def delete_tasks(sender, **kwargs):
 | 
				
			||||||
 | 
					    # when a stream was unpublished, all related tasks need to be deleted.
 | 
				
			||||||
 | 
					    stream = Stream.objects.get(stream=kwargs['stream'])
 | 
				
			||||||
 | 
					    Task.objects.filter(stream=stream).delete()
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,3 @@
 | 
				
			||||||
 | 
					from django.test import TestCase  # noqa
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Create your tests here.
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,3 @@
 | 
				
			||||||
 | 
					from django.shortcuts import render  # noqa
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# Create your views here.
 | 
				
			||||||
| 
						 | 
					@ -40,6 +40,7 @@ INSTALLED_APPS = [
 | 
				
			||||||
    'fa',
 | 
					    'fa',
 | 
				
			||||||
    'portal.apps.PortalConfig',
 | 
					    'portal.apps.PortalConfig',
 | 
				
			||||||
    'rtmp.apps.RtmpConfig',
 | 
					    'rtmp.apps.RtmpConfig',
 | 
				
			||||||
 | 
					    'concierge.apps.ConciergeConfig',
 | 
				
			||||||
    'restream.apps.RestreamConfig',
 | 
					    'restream.apps.RestreamConfig',
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,31 +1,14 @@
 | 
				
			||||||
import logging
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from django.dispatch import receiver
 | 
					from django.dispatch import receiver
 | 
				
			||||||
from rtmp.signals import on_publish, on_unpublish
 | 
					from rtmp.signals import stream_active
 | 
				
			||||||
 | 
					 | 
				
			||||||
from portier.celery import app as celery
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from .models import RestreamConfig
 | 
					from .models import RestreamConfig
 | 
				
			||||||
from rtmp.models import Stream
 | 
					from rtmp.models import Stream
 | 
				
			||||||
 | 
					from concierge.models import Task
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@receiver(on_unpublish)
 | 
					@receiver(stream_active)
 | 
				
			||||||
def callback_on_unpublish(sender, **kwargs):
 | 
					def create_tasks(sender, **kwargs):
 | 
				
			||||||
    logger.info("stop publish - {}".format(kwargs['name']))
 | 
					    stream = Stream.objects.get(stream=kwargs['stream'])
 | 
				
			||||||
    celery.send_task('main.stop_restream', kwargs={'name': kwargs['name']})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@receiver(on_publish)
 | 
					 | 
				
			||||||
def callback_on_publish(sender, **kwargs):
 | 
					 | 
				
			||||||
    logger.info("start publish - {}".format(kwargs['name']))
 | 
					 | 
				
			||||||
    stream = Stream.objects.get(key=kwargs['stream'])
 | 
					 | 
				
			||||||
    configs = RestreamConfig.objects.filter(stream=stream)
 | 
					    configs = RestreamConfig.objects.filter(stream=stream)
 | 
				
			||||||
    for config in configs:
 | 
					    for config in configs:
 | 
				
			||||||
        celery.send_task('main.start_restream', kwargs={
 | 
					        task = Task(stream=stream, type='restream', configuration='{}')
 | 
				
			||||||
            'app': kwargs['app'],
 | 
					        task.save()
 | 
				
			||||||
            'stream': kwargs['stream'],
 | 
					 | 
				
			||||||
            'target': config.target,
 | 
					 | 
				
			||||||
            'id': config.id
 | 
					 | 
				
			||||||
        })
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -7,7 +7,7 @@ class ApplicationAdmin(admin.ModelAdmin):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class StreamAdmin(admin.ModelAdmin):
 | 
					class StreamAdmin(admin.ModelAdmin):
 | 
				
			||||||
    fields = ['application', 'stream', 'name']
 | 
					    fields = ['application', 'stream', 'name', 'publish_counter']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
admin.site.register(Application, ApplicationAdmin)
 | 
					admin.site.register(Application, ApplicationAdmin)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,18 @@
 | 
				
			||||||
 | 
					# Generated by Django 3.0.5 on 2020-04-26 17:25
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.db import migrations, models
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Migration(migrations.Migration):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    dependencies = [
 | 
				
			||||||
 | 
					        ('rtmp', '0001_initial'),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    operations = [
 | 
				
			||||||
 | 
					        migrations.AddField(
 | 
				
			||||||
 | 
					            model_name='stream',
 | 
				
			||||||
 | 
					            name='publish_counter',
 | 
				
			||||||
 | 
					            field=models.PositiveIntegerField(default=0),
 | 
				
			||||||
 | 
					        ),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
| 
						 | 
					@ -17,21 +17,38 @@ class Stream(models.Model):
 | 
				
			||||||
    stream = models.CharField(max_length=64, unique=True, default=uuid.uuid4)
 | 
					    stream = models.CharField(max_length=64, unique=True, default=uuid.uuid4)
 | 
				
			||||||
    name = models.CharField(max_length=100)
 | 
					    name = models.CharField(max_length=100)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # the same stream uuid can be published multiple times to different origin
 | 
				
			||||||
 | 
					    # servers. this is a valid scheme to achieve a failover on the origin layer.
 | 
				
			||||||
 | 
					    # thus we need to keep track if a stream is published at least once,
 | 
				
			||||||
 | 
					    # and only send signals when we are going to / coming from 0 published streams.
 | 
				
			||||||
 | 
					    publish_counter = models.PositiveIntegerField(default=0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def on_publish(self, param):
 | 
					    def on_publish(self, param):
 | 
				
			||||||
        signals.on_publish.send(sender=self.__class__,
 | 
					        # if so far there were less than one incoming streams, this stream
 | 
				
			||||||
                                name=self.name,
 | 
					        # is now being considered active
 | 
				
			||||||
 | 
					        if self.publish_counter < 1:
 | 
				
			||||||
 | 
					            signals.stream_active.send(sender=self.__class__,
 | 
				
			||||||
                                       stream=self.stream,
 | 
					                                       stream=self.stream,
 | 
				
			||||||
                                app=str(self.application),
 | 
					 | 
				
			||||||
                                       param=param
 | 
					                                       param=param
 | 
				
			||||||
                                       )
 | 
					                                       )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # keep track of this incoming stream
 | 
				
			||||||
 | 
					        self.publish_counter += 1
 | 
				
			||||||
 | 
					        self.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def on_unpublish(self, param):
 | 
					    def on_unpublish(self, param):
 | 
				
			||||||
        signals.on_unpublish.send(sender=self.__class__,
 | 
					        # note that we now have on less incoming stream
 | 
				
			||||||
                                  name=self.name,
 | 
					        if self.publish_counter > 0:
 | 
				
			||||||
 | 
					            self.publish_counter -= 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # if we now have less than one incoming stream, this stream is being
 | 
				
			||||||
 | 
					        # considered inactive
 | 
				
			||||||
 | 
					        if self.publish_counter < 1:
 | 
				
			||||||
 | 
					            signals.stream_inactive.send(sender=self.__class__,
 | 
				
			||||||
                                         stream=self.stream,
 | 
					                                         stream=self.stream,
 | 
				
			||||||
                                  app=str(self.application),
 | 
					 | 
				
			||||||
                                         param=param
 | 
					                                         param=param
 | 
				
			||||||
                                         )
 | 
					                                         )
 | 
				
			||||||
 | 
					        self.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __str__(self):
 | 
					    def __str__(self):
 | 
				
			||||||
        return self.name
 | 
					        return self.name
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,4 +1,4 @@
 | 
				
			||||||
from django.dispatch import Signal
 | 
					from django.dispatch import Signal
 | 
				
			||||||
 | 
					
 | 
				
			||||||
on_publish = Signal(providing_args=['application', 'stream', 'params'])
 | 
					stream_active = Signal(providing_args=['stream', 'params'])
 | 
				
			||||||
on_unpublish = Signal(providing_args=['application', 'stream', 'params'])
 | 
					stream_inactive = Signal(providing_args=['stream', 'params'])
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue