diff --git a/source/config/models.py b/source/config/models.py index 517541c..3258dfa 100644 --- a/source/config/models.py +++ b/source/config/models.py @@ -47,44 +47,6 @@ class Stream(models.Model): pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream) -class Restream(models.Model): - FORMATS = ( - ('flv', 'flv (RTMP)'), - ('mpegts', 'mpegts (SRT)'), - ) - stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('restream_stream_help')) - target = models.CharField(max_length=500, help_text=_('restream_target_help')) - name = models.CharField(max_length=100, help_text=_('restream_name_help')) - active = models.BooleanField(help_text=_('restream_activate_help')) - format = models.CharField(max_length=6, choices=FORMATS, default='flv', help_text=_('restream_format_help')) - - class Meta: - verbose_name = _('restream_verbose_name') - verbose_name_plural = _('restream_verbose_name_plural') - - def class_name(self): - return _('restream_class_name') - - def get_absolute_url(self): - return reverse('config:restream_detail', kwargs={'pk': self.pk}) - - def __str__(self): - return '{} to {}'.format(self.stream, self.name) - - def get_json_config(self): - config = { - 'name': self.name, - 'app': settings.GLOBAL_STREAM_NAMESPACE, - 'stream': str(self.stream.stream), - 'target': self.target, - 'format': self.format - } - return json.dumps(config) - - -pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) - - class SRSNode(models.Model): name = models.CharField(max_length=100, help_text=_('srsnode_name_help')) api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help')) @@ -111,3 +73,59 @@ class SRSStreamInstance(models.Model): def __str__(self): return f"{self.stream} on {self.node}" + + +class Restream(models.Model): + FORMATS = ( + ('flv', 'flv (RTMP)'), + ('mpegts', 'mpegts (SRT)'), + ) + stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('restream_stream_help')) + target = models.CharField(max_length=500, help_text=_('restream_target_help')) + name = models.CharField(max_length=100, help_text=_('restream_name_help')) + active = models.BooleanField(help_text=_('restream_activate_help')) + format = models.CharField(max_length=6, choices=FORMATS, default='flv', help_text=_('restream_format_help')) + + class Meta: + verbose_name = _('restream_verbose_name') + verbose_name_plural = _('restream_verbose_name_plural') + + def class_name(self): + return _('restream_class_name') + + def get_absolute_url(self): + return reverse('config:restream_detail', kwargs={'pk': self.pk}) + + def __str__(self): + return '{} to {}'.format(self.stream, self.name) + + def get_concierge_configuration(self): + stream_instances = SRSStreamInstance.objects.filter(stream=self.stream).all() + + if not stream_instances: + return None + + # We use the actual SRS node of the first stream instance to make sure + # that we use the same node for the restreaming. This avoids unnecessary + # traffic between the SRS nodes. In theory a clustered SRS setup could + # provide the same stream on all edges, but that would mean east-west + # traffic between the nodes. + + # This assumption breaks when a second stream instance appears on another + # SRS node, and only afterwards the first stream instance is removed. + # In that case, the restreaming would not be retriggered, and the edge + # would automatically start pulling the stream from the other SRS node. + + # It's a tradeoff between stability and efficiency. We choose stability. + rtmp_base = stream_instances[0].node.rtmp_base + + return { + 'config_version': 1, + 'stream_source_url': f"{rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}", + 'stream_target_url': self.target, + 'stream_target_transport': self.format, + } + + +pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) + diff --git a/source/config/signals.py b/source/config/signals.py index f120b01..5c0de0f 100644 --- a/source/config/signals.py +++ b/source/config/signals.py @@ -1,5 +1,8 @@ +import json + from django.dispatch import receiver from django.db.models.signals import post_save, post_delete + from config.models import Restream, Stream from config.signals_shared import stream_active, stream_inactive from concierge.models import Task @@ -10,7 +13,7 @@ def create_restream_tasks(sender, **kwargs): restreams = Restream.objects.filter(active=True, stream=stream) for restream in restreams: Task.objects.get_or_create(stream=restream.stream, type='restream', config_id=restream.id, - configuration=restream.get_json_config()) + configuration=json.dumps(restream.get_concierge_configuration())) @receiver(post_save, sender=Restream) @@ -28,7 +31,7 @@ def update_restream_tasks(sender, **kwargs): # If the configuration is set to be active, and the stream is published, (re)create new task if instance.active and instance.stream.publish_counter > 0: task = Task(stream=instance.stream, type='restream', config_id=instance.id, - configuration=instance.get_json_config()) + configuration=json.dumps(instance.get_concierge_configuration())) task.save()