diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..71b96fc --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "venvPath": "source", + "venv": ".venv" +} diff --git a/source/concierge/signals.py b/source/concierge/signals.py index 08a7dd0..16ea348 100644 --- a/source/concierge/signals.py +++ b/source/concierge/signals.py @@ -1,11 +1,12 @@ from django.dispatch import receiver from .models import Task -from config.signals import stream_inactive +from config.signals_shared import stream_inactive from config.models import Stream @receiver(stream_inactive) -def delete_tasks(sender, **kwargs): - # when a stream was unpublished, all related tasks need to be deleted. +def delete_tasks_when_stream_inactive(sender, **kwargs): + # when a stream has become inactive, all related tasks which are created for the stream need to be deleted. + # we need to exclude the pull tasks, as they are not created for the stream. stream = Stream.objects.get(stream=kwargs['stream']) - Task.objects.filter(stream=stream).delete() + Task.objects.filter(stream=stream).exclude(type="pull").delete() diff --git a/source/config/admin.py b/source/config/admin.py index b8c2993..3d029e3 100644 --- a/source/config/admin.py +++ b/source/config/admin.py @@ -1,6 +1,6 @@ from django.contrib import admin from guardian.admin import GuardedModelAdmin -from config.models import Stream, Restream, SRSNode, SRSStreamInstance +from config.models import Stream, Restream, Pull, SRSNode, SRSStreamInstance @admin.register(Stream) @@ -11,6 +11,10 @@ class StreamAdmin(GuardedModelAdmin): class RestreamAdmin(GuardedModelAdmin): fields = ['name', 'active', 'stream', 'format', 'target'] +@admin.register(Pull) +class PullAdmin(GuardedModelAdmin): + fields = ['name', 'active', 'stream', 'source'] + @admin.register(SRSNode) class SRSNodeAdmin(GuardedModelAdmin): fields = ['name', 'api_base', 'rtmp_base', 'active'] @@ -24,4 +28,4 @@ class SRSStreamInstanceAdmin(GuardedModelAdmin): def has_change_permission(self, request, obj=None): return False def has_add_permission(self, request): - return False \ No newline at end of file + return False diff --git a/source/config/api.py b/source/config/api.py index c904a72..e5f1d36 100644 --- a/source/config/api.py +++ b/source/config/api.py @@ -22,6 +22,20 @@ class RestreamPatch(ModelSchema): extra = "forbid" +class Pull(ModelSchema): + class Meta: + model = models.Pull + fields = "__all__" + + +class PullPatch(ModelSchema): + class Meta: + model = models.Pull + exclude = ["id"] + fields_optional = "__all__" + extra = "forbid" + + class Stream(ModelSchema): class Meta: model = models.Stream @@ -87,6 +101,7 @@ def delete_stream(request, id: int): stream.delete() +#################################################################################################### @router.get('/restreams', response=List[Restream]) def list_restreams(request): @@ -141,4 +156,61 @@ def delete_restream(request, id: int): if not request.user.has_perm('delete_restream', restream): raise HttpError(401, "unauthorized") - restream.delete() \ No newline at end of file + restream.delete() + +#################################################################################################### + +@router.get('/pulls', response=List[Pull]) +def list_pulls(request): + return get_objects_for_user(request.user, 'view_pull', models.Pull.objects.all()) + + +@router.post('/pulls', response=Pull) +def create_pull(request, payload: Pull): + if not request.user.has_perm('view_stream', payload.stream): + raise HttpError(401, "unauthorized") + + pull = models.Pull.objects.create(**payload.dict()) + assign_perm( 'view_pull', request.user, Pull) + assign_perm('change_pull', request.user, Pull) + assign_perm('delete_pull', request.user, Pull) + return pull + + +@router.get('/pulls/{id}', response=Pull) +def get_pull(request, id: int): + pull = get_object_or_404(models.Pull, id=id) + + if not request.user.has_perm('view_pull', pull): + raise HttpError(401, "unauthorized") + + return pull + + +@router.patch('/pulls/{id}', response=Pull) +def patch_pull(request, id: int, payload: PullPatch): + pull = get_object_or_404(models.Pull, id=id) + + if not request.user.has_perm('change_pull', pull): + raise HttpError(401, "unauthorized") + + if payload.stream: + payload.stream = get_object_or_404(models.Stream, id=payload.stream) + + if not request.user.has_perm('view_stream', payload.stream): + raise HttpError(401, "unauthorized") + + for key, value in payload.dict(exclude_unset=True).items(): + setattr(pull, key, value) + pull.save() + return pull + + +@router.delete('/pulls/{id}', response=None) +def delete_pull(request, id: int): + pull = get_object_or_404(models.Pull, id=id) + + if not request.user.has_perm('delete_pull', pull): + raise HttpError(401, "unauthorized") + + pull.delete() \ No newline at end of file diff --git a/source/config/apps.py b/source/config/apps.py index 450e90b..e421c5d 100644 --- a/source/config/apps.py +++ b/source/config/apps.py @@ -6,4 +6,4 @@ class ConfigConfig(AppConfig): name = 'config' def ready(self): - import config.signals, config.signals_shared # noqa \ No newline at end of file + import config.signals.pull, config.signals.restream, config.signals_shared # noqa diff --git a/source/config/forms.py b/source/config/forms.py index 14bb633..b3a4ce8 100644 --- a/source/config/forms.py +++ b/source/config/forms.py @@ -14,3 +14,15 @@ class RestreamFilteredStreamForm(ModelForm): # limit the stream selection to user-accessible streams self.fields['stream'].queryset = get_objects_for_user(user, 'config.view_stream') + +class PullFilteredStreamForm(ModelForm): + class Meta: + model = models.Pull + fields = ['name', 'stream', 'source', 'active'] + + def __init__(self, *args, **kwargs): + user = kwargs.pop('user', None) + super().__init__(*args, **kwargs) + + # limit the stream selection to user-accessible streams + self.fields['stream'].queryset = get_objects_for_user(user, 'config.view_stream') diff --git a/source/config/migrations/0003_pull.py b/source/config/migrations/0003_pull.py new file mode 100644 index 0000000..a514b76 --- /dev/null +++ b/source/config/migrations/0003_pull.py @@ -0,0 +1,28 @@ +# Generated by Django 5.0.2 on 2024-04-01 08:08 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('config', '0002_srsnode_srsstreaminstance'), + ] + + operations = [ + migrations.CreateModel( + name='Pull', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('source', models.CharField(help_text='pull_source_help', max_length=500)), + ('active', models.BooleanField(help_text='pull_activate_help')), + ('name', models.CharField(help_text='pull_name_help', max_length=100)), + ('stream', models.ForeignKey(help_text='pull_stream_help', on_delete=django.db.models.deletion.CASCADE, to='config.stream')), + ], + options={ + 'verbose_name': 'pull_verbose_name', + 'verbose_name_plural': 'pull_verbose_name_plural', + }, + ), + ] diff --git a/source/config/migrations/0004_alter_pull_source_alter_restream_target.py b/source/config/migrations/0004_alter_pull_source_alter_restream_target.py new file mode 100644 index 0000000..13ff920 --- /dev/null +++ b/source/config/migrations/0004_alter_pull_source_alter_restream_target.py @@ -0,0 +1,24 @@ +# Generated by Django 5.0.2 on 2024-04-01 10:42 + +import config.util +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('config', '0003_pull'), + ] + + operations = [ + migrations.AlterField( + model_name='pull', + name='source', + field=models.CharField(help_text='pull_source_help', max_length=500, validators=[config.util.validate_stream_url]), + ), + migrations.AlterField( + model_name='restream', + name='target', + field=models.CharField(help_text='restream_target_help', max_length=500, validators=[config.util.validate_stream_url]), + ), + ] diff --git a/source/config/models.py b/source/config/models.py index 3258dfa..3840ee2 100644 --- a/source/config/models.py +++ b/source/config/models.py @@ -8,6 +8,7 @@ from django.utils.translation import gettext as _ from django.db.models.signals import pre_delete from portier.common import handlers from config import signals_shared +from config.util import validate_stream_url class Stream(models.Model): @@ -41,7 +42,7 @@ class Stream(models.Model): return _('stream_class_name') def __str__(self): - return self.name + return str(self.name) pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream) @@ -75,13 +76,64 @@ class SRSStreamInstance(models.Model): return f"{self.stream} on {self.node}" +class Pull(models.Model): + stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('pull_stream_help')) + source = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('pull_source_help')) + active = models.BooleanField(help_text=_('pull_activate_help')) + name = models.CharField(max_length=100, help_text=_('pull_name_help')) + + class Meta: + verbose_name = _('pull_verbose_name') + verbose_name_plural = _('pull_verbose_name_plural') + + def class_name(self): + return _('pull_class_name') + + def get_absolute_url(self): + return reverse('config:pull_detail', kwargs={'pk': self.pk}) + + def __str__(self): + return str(self.name) + + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + if self.active: + signals_shared.pull_active.send( + sender=self.__class__, + pull_id=self.id, + ) + else: + signals_shared.pull_inactive.send( + sender=self.__class__, + pull_id=self.id, + ) + + def get_concierge_configuration(self): + if not self.active: + return None + + # Select a random node that is active to ingest the pulled stream. + node = SRSNode.objects.filter(active=True).order_by('?').first() + + return { + "type": "pull", + "active": self.active, + "name": self.name, + "source": self.source, + "target": f"{node.rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}", + } + + +pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Pull) + + class Restream(models.Model): FORMATS = ( ('flv', 'flv (RTMP)'), ('mpegts', 'mpegts (SRT)'), ) stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('restream_stream_help')) - target = models.CharField(max_length=500, help_text=_('restream_target_help')) + target = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('restream_target_help')) name = models.CharField(max_length=100, help_text=_('restream_name_help')) active = models.BooleanField(help_text=_('restream_activate_help')) format = models.CharField(max_length=6, choices=FORMATS, default='flv', help_text=_('restream_format_help')) @@ -128,4 +180,3 @@ class Restream(models.Model): pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) - diff --git a/source/config/signals/__init__.py b/source/config/signals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source/config/signals/pull.py b/source/config/signals/pull.py new file mode 100644 index 0000000..98ca80b --- /dev/null +++ b/source/config/signals/pull.py @@ -0,0 +1,46 @@ +import json + +from django.dispatch import receiver +from django.db.models.signals import post_save, post_delete + +from config.models import Stream, Pull +from config.signals_shared import stream_active, stream_inactive, pull_active, pull_inactive +from concierge.models import Task + +@receiver(pull_active) +def create_pull_tasks_on_activate(sender, **kwargs): + pull = Pull.objects.get(id=kwargs['pull_id']) + Task.objects.get_or_create(stream=pull.stream, type='pull', config_id=pull.id, + configuration=json.dumps(pull.get_concierge_configuration())) + + +@receiver(pull_inactive) +def delete_pull_tasks_on_inactivate(sender, **kwargs): + pull = Pull.objects.get(id=kwargs['pull_id']) + Task.objects.filter(type='pull', config_id=pull.id).delete() + + +@receiver(post_save, sender=Pull) +def update_pull_tasks(sender, **kwargs): + instance = kwargs['instance'] + + try: + task = Task.objects.filter(type='pull', config_id=instance.id).get() + task.delete() + except Task.DoesNotExist: + pass + + if instance.active: + task = Task(stream=instance.stream, type='pull', config_id=instance.id, + configuration=json.dumps(instance.get_concierge_configuration())) + task.save() + +@receiver(post_delete, sender=Pull) +def delete_pull_tasks(sender, **kwargs): + instance = kwargs['instance'] + # Get the current task instance if it exists, and remove it + try: + task = Task.objects.filter(type='pull', config_id=instance.id).get() + task.delete() + except Task.DoesNotExist: + pass diff --git a/source/config/signals/restream.py b/source/config/signals/restream.py new file mode 100644 index 0000000..5c0de0f --- /dev/null +++ b/source/config/signals/restream.py @@ -0,0 +1,46 @@ +import json + +from django.dispatch import receiver +from django.db.models.signals import post_save, post_delete + +from config.models import Restream, Stream +from config.signals_shared import stream_active, stream_inactive +from concierge.models import Task + +@receiver(stream_active) +def create_restream_tasks(sender, **kwargs): + stream = Stream.objects.get(stream=kwargs['stream']) + restreams = Restream.objects.filter(active=True, stream=stream) + for restream in restreams: + Task.objects.get_or_create(stream=restream.stream, type='restream', config_id=restream.id, + configuration=json.dumps(restream.get_concierge_configuration())) + + +@receiver(post_save, sender=Restream) +def update_restream_tasks(sender, **kwargs): + instance = kwargs['instance'] + # TODO: check for breaking changes using update_fields. This needs custom save_model functions though. + + # Get the current task instance if it exists, and remove it + try: + task = Task.objects.filter(type='restream', config_id=instance.id).get() + task.delete() + except Task.DoesNotExist: + pass + + # If the configuration is set to be active, and the stream is published, (re)create new task + if instance.active and instance.stream.publish_counter > 0: + task = Task(stream=instance.stream, type='restream', config_id=instance.id, + configuration=json.dumps(instance.get_concierge_configuration())) + task.save() + + +@receiver(post_delete, sender=Restream) +def delete_restream_tasks(sender, **kwargs): + instance = kwargs['instance'] + # Get the current task instance if it exists, and remove it + try: + task = Task.objects.filter(type='restream', config_id=instance.id).get() + task.delete() + except Task.DoesNotExist: + pass diff --git a/source/config/signals_shared.py b/source/config/signals_shared.py index 5dd2adc..ab98dd1 100644 --- a/source/config/signals_shared.py +++ b/source/config/signals_shared.py @@ -1,4 +1,6 @@ from django.dispatch import Signal stream_active = Signal() -stream_inactive = Signal() \ No newline at end of file +stream_inactive = Signal() +pull_active = Signal() +pull_inactive = Signal() diff --git a/source/config/templates/config/pull_confirm_delete.html b/source/config/templates/config/pull_confirm_delete.html new file mode 100644 index 0000000..ac0b2cc --- /dev/null +++ b/source/config/templates/config/pull_confirm_delete.html @@ -0,0 +1,31 @@ +{% extends 'base.html' %} +{% load i18n %} +{% load bootstrap4 %} +{% load fontawesome_5 %} + +{% block 'sidenav' %} + {% with 'pull' as section %} + {{ block.super }} + {% endwith %} +{% endblock %} + +{% block 'content' %} +
{% trans "name" %} | +{% trans "active" %} | +{% trans "actions" %} | +|
---|---|---|---|
{% verbatim %}{{cfg.name}}{% endverbatim %} | ++ + | ++ + | ++ {% trans "details" %} + {% fa5_icon 'trash' %} + | +