From ca65c4eaa720d1fb3444bf3c8c7178ad0d2a2cbe Mon Sep 17 00:00:00 2001 From: Jan Koppe Date: Sun, 26 Apr 2020 19:46:58 +0200 Subject: [PATCH] #4 add concierge app that implements the idea of tasks; rework rtmp to better handle multiple incoming streams; rework restream to create tasks --- concierge/__init__.py | 0 concierge/admin.py | 19 ++++++++ concierge/apps.py | 8 ++++ concierge/migrations/0001_initial.py | 44 +++++++++++++++++++ concierge/migrations/__init__.py | 0 concierge/models.py | 28 ++++++++++++ concierge/signals.py | 11 +++++ concierge/tests.py | 3 ++ concierge/views.py | 3 ++ portier/settings.py | 1 + restream/signals.py | 31 +++---------- rtmp/admin.py | 2 +- .../migrations/0002_stream_publish_counter.py | 18 ++++++++ rtmp/models.py | 41 ++++++++++++----- rtmp/signals.py | 4 +- 15 files changed, 174 insertions(+), 39 deletions(-) create mode 100644 concierge/__init__.py create mode 100644 concierge/admin.py create mode 100644 concierge/apps.py create mode 100644 concierge/migrations/0001_initial.py create mode 100644 concierge/migrations/__init__.py create mode 100644 concierge/models.py create mode 100644 concierge/signals.py create mode 100644 concierge/tests.py create mode 100644 concierge/views.py create mode 100644 rtmp/migrations/0002_stream_publish_counter.py diff --git a/concierge/__init__.py b/concierge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/concierge/admin.py b/concierge/admin.py new file mode 100644 index 0000000..8b9cc05 --- /dev/null +++ b/concierge/admin.py @@ -0,0 +1,19 @@ +from django.contrib import admin +from .models import Identity, Task, Claim + + +class IdentityAdmin(admin.ModelAdmin): + fields = ['identity', 'name', 'notes', 'heartbeat'] + + +class TaskAdmin(admin.ModelAdmin): + fields = ['stream', 'type', 'configuration'] + + +class ClaimAdmin(admin.ModelAdmin): + fields = ['owner', 'task'] + + +admin.site.register(Identity, IdentityAdmin) +admin.site.register(Task, TaskAdmin) +admin.site.register(Claim, ClaimAdmin) diff --git a/concierge/apps.py b/concierge/apps.py new file mode 100644 index 0000000..1520af6 --- /dev/null +++ b/concierge/apps.py @@ -0,0 +1,8 @@ +from django.apps import AppConfig + + +class ConciergeConfig(AppConfig): + name = 'concierge' + + def ready(self): + import concierge.signals # noqa diff --git a/concierge/migrations/0001_initial.py b/concierge/migrations/0001_initial.py new file mode 100644 index 0000000..d64b046 --- /dev/null +++ b/concierge/migrations/0001_initial.py @@ -0,0 +1,44 @@ +# Generated by Django 3.0.5 on 2020-04-26 17:25 + +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('rtmp', '0002_stream_publish_counter'), + ] + + operations = [ + migrations.CreateModel( + name='Identity', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('identity', models.CharField(default=uuid.uuid4, max_length=36, unique=True)), + ('name', models.CharField(max_length=100)), + ('notes', models.TextField()), + ('heartbeat', models.DateTimeField(blank=True)), + ], + ), + migrations.CreateModel( + name='Task', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('type', models.CharField(max_length=100)), + ('configuration', models.TextField()), + ('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='rtmp.Stream')), + ], + ), + migrations.CreateModel( + name='Claim', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('owner', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity')), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='concierge.Task')), + ], + ), + ] diff --git a/concierge/migrations/__init__.py b/concierge/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/concierge/models.py b/concierge/models.py new file mode 100644 index 0000000..6e210ee --- /dev/null +++ b/concierge/models.py @@ -0,0 +1,28 @@ +import uuid +from django.db import models +from rtmp.models import Stream + + +class Identity(models.Model): + # models a concierge identity. every running concierge needs to have a + # unique identity that is being used for task claims, etc. + identity = models.CharField(max_length=36, unique=True, default=uuid.uuid4) + name = models.CharField(max_length=100) + notes = models.TextField() + + # heartbeat indicates last point in time that this identity was seen. + # some cronjob should scan the heartbeats and release all claims by + # identities that have not been seen in a while. this interval should + # be quite short so that the tasks can be claimed by other identities asap. + heartbeat = models.DateTimeField(blank=True) + + +class Task(models.Model): + stream = models.ForeignKey(Stream, on_delete=models.CASCADE) + type = models.CharField(max_length=100) + configuration = models.TextField() + + +class Claim(models.Model): + owner = models.ForeignKey(Identity, on_delete=models.CASCADE) + task = models.ForeignKey(Task, on_delete=models.CASCADE) diff --git a/concierge/signals.py b/concierge/signals.py new file mode 100644 index 0000000..52ab7f6 --- /dev/null +++ b/concierge/signals.py @@ -0,0 +1,11 @@ +from django.dispatch import receiver +from rtmp.signals import stream_inactive +from .models import Task +from rtmp.models import Stream + + +@receiver(stream_inactive) +def delete_tasks(sender, **kwargs): + # when a stream was unpublished, all related tasks need to be deleted. + stream = Stream.objects.get(stream=kwargs['stream']) + Task.objects.filter(stream=stream).delete() diff --git a/concierge/tests.py b/concierge/tests.py new file mode 100644 index 0000000..9a30df3 --- /dev/null +++ b/concierge/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase # noqa + +# Create your tests here. diff --git a/concierge/views.py b/concierge/views.py new file mode 100644 index 0000000..6100593 --- /dev/null +++ b/concierge/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render # noqa + +# Create your views here. diff --git a/portier/settings.py b/portier/settings.py index b7845b0..a8cb975 100644 --- a/portier/settings.py +++ b/portier/settings.py @@ -40,6 +40,7 @@ INSTALLED_APPS = [ 'fa', 'portal.apps.PortalConfig', 'rtmp.apps.RtmpConfig', + 'concierge.apps.ConciergeConfig', 'restream.apps.RestreamConfig', ] diff --git a/restream/signals.py b/restream/signals.py index 4c66258..0331bfa 100644 --- a/restream/signals.py +++ b/restream/signals.py @@ -1,31 +1,14 @@ -import logging - from django.dispatch import receiver -from rtmp.signals import on_publish, on_unpublish - -from portier.celery import app as celery - +from rtmp.signals import stream_active from .models import RestreamConfig from rtmp.models import Stream - -logger = logging.getLogger(__name__) +from concierge.models import Task -@receiver(on_unpublish) -def callback_on_unpublish(sender, **kwargs): - logger.info("stop publish - {}".format(kwargs['name'])) - celery.send_task('main.stop_restream', kwargs={'name': kwargs['name']}) - - -@receiver(on_publish) -def callback_on_publish(sender, **kwargs): - logger.info("start publish - {}".format(kwargs['name'])) - stream = Stream.objects.get(key=kwargs['stream']) +@receiver(stream_active) +def create_tasks(sender, **kwargs): + stream = Stream.objects.get(stream=kwargs['stream']) configs = RestreamConfig.objects.filter(stream=stream) for config in configs: - celery.send_task('main.start_restream', kwargs={ - 'app': kwargs['app'], - 'stream': kwargs['stream'], - 'target': config.target, - 'id': config.id - }) + task = Task(stream=stream, type='restream', configuration='{}') + task.save() diff --git a/rtmp/admin.py b/rtmp/admin.py index 45af363..025978a 100644 --- a/rtmp/admin.py +++ b/rtmp/admin.py @@ -7,7 +7,7 @@ class ApplicationAdmin(admin.ModelAdmin): class StreamAdmin(admin.ModelAdmin): - fields = ['application', 'stream', 'name'] + fields = ['application', 'stream', 'name', 'publish_counter'] admin.site.register(Application, ApplicationAdmin) diff --git a/rtmp/migrations/0002_stream_publish_counter.py b/rtmp/migrations/0002_stream_publish_counter.py new file mode 100644 index 0000000..f8ab237 --- /dev/null +++ b/rtmp/migrations/0002_stream_publish_counter.py @@ -0,0 +1,18 @@ +# Generated by Django 3.0.5 on 2020-04-26 17:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('rtmp', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='stream', + name='publish_counter', + field=models.PositiveIntegerField(default=0), + ), + ] diff --git a/rtmp/models.py b/rtmp/models.py index 9dc331f..8ab82a6 100644 --- a/rtmp/models.py +++ b/rtmp/models.py @@ -17,21 +17,38 @@ class Stream(models.Model): stream = models.CharField(max_length=64, unique=True, default=uuid.uuid4) name = models.CharField(max_length=100) + # the same stream uuid can be published multiple times to different origin + # servers. this is a valid scheme to achieve a failover on the origin layer. + # thus we need to keep track if a stream is published at least once, + # and only send signals when we are going to / coming from 0 published streams. + publish_counter = models.PositiveIntegerField(default=0) + def on_publish(self, param): - signals.on_publish.send(sender=self.__class__, - name=self.name, - stream=self.stream, - app=str(self.application), - param=param - ) + # if so far there were less than one incoming streams, this stream + # is now being considered active + if self.publish_counter < 1: + signals.stream_active.send(sender=self.__class__, + stream=self.stream, + param=param + ) + + # keep track of this incoming stream + self.publish_counter += 1 + self.save() def on_unpublish(self, param): - signals.on_unpublish.send(sender=self.__class__, - name=self.name, - stream=self.stream, - app=str(self.application), - param=param - ) + # note that we now have on less incoming stream + if self.publish_counter > 0: + self.publish_counter -= 1 + + # if we now have less than one incoming stream, this stream is being + # considered inactive + if self.publish_counter < 1: + signals.stream_inactive.send(sender=self.__class__, + stream=self.stream, + param=param + ) + self.save() def __str__(self): return self.name diff --git a/rtmp/signals.py b/rtmp/signals.py index 42054c2..ab78089 100644 --- a/rtmp/signals.py +++ b/rtmp/signals.py @@ -1,4 +1,4 @@ from django.dispatch import Signal -on_publish = Signal(providing_args=['application', 'stream', 'params']) -on_unpublish = Signal(providing_args=['application', 'stream', 'params']) +stream_active = Signal(providing_args=['stream', 'params']) +stream_inactive = Signal(providing_args=['stream', 'params'])