From 124e366268d1474417e6828c52ebbd70242256aa Mon Sep 17 00:00:00 2001 From: Jan Koppe Date: Thu, 29 Feb 2024 18:33:52 +0100 Subject: [PATCH] implement active srs sync and celery reworks the entire logic how active streams are being tracked. instead of keeping a counter by listening only to srs callback hooks, portier will now actively scrape the http api of known srs nodes to retrieve information about all currently existing streams on a srs node. this prevents portier from being wrong about active stream counts due to drift, and allows us to show more information about stream data to users in the future, as the srs api will also expose information about used codecs, stream resolution and data rates as seen by srs itself. to implement this, the previous remains of celery have been made active again, and it is now required to run exactly one celery beat instance and one or more celery workers beside portier itself. these will make sure that every 5 seconds all srs nodes are actively being scraped, on top of the scrape that is triggered by every srs callback hook. this keeps the data always superfresh. the celery beat function allows us to implement cron-based automation for many other functions (restream, pull, etc) in the future as well, so it's okay to pull in something more heavy here rather than just using a system cron and executing a custom management command all the time. --- Dockerfile | 3 +- docker-compose.yaml | 22 ++- docker/supervisord-celerybeat.conf | 21 +++ ...{supervisord.conf => supervisord-web.conf} | 14 ++ source/config/admin.py | 17 +- .../0002_srsnode_srsstreaminstance.py | 42 +++++ source/config/models.py | 68 ++++--- source/config/signals.py | 19 +- source/config/tasks.py | 51 +++++ .../templates/config/stream_detail.html | 7 +- source/config/tests.py | 3 - source/config/urls.py | 25 +-- source/config/views.py | 177 ------------------ source/config/views/__init__.py | 0 source/config/views/restream.py | 76 ++++++++ source/config/views/srs.py | 47 +++++ source/config/views/stream.py | 81 ++++++++ source/portier/settings.py | 38 +++- source/requirements.txt | 11 +- source/restapi/urls.py | 4 +- source/restapi/views.py | 6 +- source/start.sh | 8 +- source/static/js/restream-list.js | 2 +- source/static/js/stream-list.js | 4 +- srs.conf | 4 +- 25 files changed, 498 insertions(+), 252 deletions(-) create mode 100644 docker/supervisord-celerybeat.conf rename docker/{supervisord.conf => supervisord-web.conf} (76%) create mode 100644 source/config/migrations/0002_srsnode_srsstreaminstance.py create mode 100644 source/config/tasks.py delete mode 100644 source/config/tests.py delete mode 100644 source/config/views.py create mode 100644 source/config/views/__init__.py create mode 100644 source/config/views/restream.py create mode 100644 source/config/views/srs.py create mode 100644 source/config/views/stream.py diff --git a/Dockerfile b/Dockerfile index 056cdf2..d032a0f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN pip install -r requirements.txt # add supervisor and nginx configs ADD ./docker/nginx.conf /etc/nginx/nginx.conf -ADD ./docker/supervisord.conf /etc/supervisord.conf +ADD ./docker/supervisor*.conf /etc/ # add user RUN addgroup -S portier && adduser -S portier -G portier @@ -30,4 +30,5 @@ RUN ./fetch_frontend_libs.sh \ RUN ./manage.py collectstatic --noinput --link RUN ./manage.py compilemessages +ENV COMPONENT=web CMD ["/app/start.sh"] diff --git a/docker-compose.yaml b/docker-compose.yaml index a99fee5..a43dbbd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,7 +23,27 @@ services: - "EMAIL_HOST=${EMAIL_HOST}" - "EMAIL_HOST_USER=${EMAIL_HOST_USER}" - "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}" - - "ADVERTISED_RTMP_HOSTS=localhost:1935 laserpope:1234" + celerybeat: + build: . + depends_on: + - postgres + - redis + environment: + - COMPONENT=celerybeat + - DEBUG=1 + - "SECRET_KEY=D4mn1t_Ch4nG3_M3!1!!" + - SQL_ENGINE=django.db.backends.postgresql + - SQL_USER=portier + - SQL_PASSWORD=portier + - SQL_DATABASE=portier + - SQL_HOST=postgres + - SQL_PORT=5432 + - REDIS_HOST=redis + - REDIS_PORT=6379 + - "EMAIL_FROM=${EMAIL_FROM}" + - "EMAIL_HOST=${EMAIL_HOST}" + - "EMAIL_HOST_USER=${EMAIL_HOST_USER}" + - "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}" redis: image: redis:7-alpine postgres: diff --git a/docker/supervisord-celerybeat.conf b/docker/supervisord-celerybeat.conf new file mode 100644 index 0000000..9a5d085 --- /dev/null +++ b/docker/supervisord-celerybeat.conf @@ -0,0 +1,21 @@ +[supervisord] +logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log) +logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB) +logfile_backups=10 ; (num of main logfile rotation backups;default 10) +loglevel=info ; (log level;default info; others: debug,warn,trace) +nodaemon=true ; (start in foreground if true;default false) +user=root + +[program:celery_beat] +directory=/app +command=/usr/local/bin/celery -A portier beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler +autostart=true +autorestart=true +priority=5 +stdout_events_enabled=true +stderr_events_enabled=true +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +user=portier diff --git a/docker/supervisord.conf b/docker/supervisord-web.conf similarity index 76% rename from docker/supervisord.conf rename to docker/supervisord-web.conf index 84981d8..65df5ff 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord-web.conf @@ -6,6 +6,20 @@ loglevel=info ; (log level;default info; others: debug,warn,trace nodaemon=true ; (start in foreground if true;default false) user=root +[program:celery_worker] +directory=/app +command=/usr/local/bin/celery -A portier worker -l INFO +autostart=true +autorestart=true +priority=5 +stdout_events_enabled=true +stderr_events_enabled=true +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +user=portier + [program:gunicorn] directory=/app command=/usr/local/bin/gunicorn -w 4 --bind 0.0.0.0:8000 portier.wsgi diff --git a/source/config/admin.py b/source/config/admin.py index beba580..b8c2993 100644 --- a/source/config/admin.py +++ b/source/config/admin.py @@ -1,6 +1,6 @@ from django.contrib import admin from guardian.admin import GuardedModelAdmin -from .models import Stream, Restream +from config.models import Stream, Restream, SRSNode, SRSStreamInstance @admin.register(Stream) @@ -10,3 +10,18 @@ class StreamAdmin(GuardedModelAdmin): @admin.register(Restream) class RestreamAdmin(GuardedModelAdmin): fields = ['name', 'active', 'stream', 'format', 'target'] + +@admin.register(SRSNode) +class SRSNodeAdmin(GuardedModelAdmin): + fields = ['name', 'api_base', 'rtmp_base', 'active'] + +@admin.register(SRSStreamInstance) +class SRSStreamInstanceAdmin(GuardedModelAdmin): + fields = ['stream', 'node'] + + # Stream Instances are just representations of the streams on the SRS server, + # and should not be addable/editable. Deleting them can be useful though. + def has_change_permission(self, request, obj=None): + return False + def has_add_permission(self, request): + return False \ No newline at end of file diff --git a/source/config/migrations/0002_srsnode_srsstreaminstance.py b/source/config/migrations/0002_srsnode_srsstreaminstance.py new file mode 100644 index 0000000..47b301f --- /dev/null +++ b/source/config/migrations/0002_srsnode_srsstreaminstance.py @@ -0,0 +1,42 @@ +# Generated by Django 5.0.2 on 2024-02-29 17:26 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('config', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='SRSNode', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(help_text='srsnode_name_help', max_length=100)), + ('api_base', models.CharField(help_text='srsnode_api_base_help', max_length=256)), + ('rtmp_base', models.CharField(help_text='srsnode_rtmp_base_help', max_length=256)), + ('active', models.BooleanField(help_text='srsnode_active_help')), + ], + options={ + 'verbose_name': 'srsnode_verbose_name', + 'verbose_name_plural': 'srsnode_verbose_name_plural', + }, + ), + migrations.CreateModel( + name='SRSStreamInstance', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('last_update', models.DateTimeField(auto_now=True, help_text='srsstreaminstance_last_update_help')), + ('statusdata', models.TextField(default='{}', help_text='srsstreaminstance_statusdata_help')), + ('node', models.ForeignKey(help_text='srsstreaminstance_node_help', on_delete=django.db.models.deletion.CASCADE, to='config.srsnode')), + ('stream', models.ForeignKey(help_text='srsstreaminstance_stream_help', on_delete=django.db.models.deletion.CASCADE, to='config.stream')), + ], + options={ + 'verbose_name': 'srsstreaminstance_verbose_name', + 'verbose_name_plural': 'srsstreaminstance_verbose_name_plural', + }, + ), + ] diff --git a/source/config/models.py b/source/config/models.py index 0cf22d1..517541c 100644 --- a/source/config/models.py +++ b/source/config/models.py @@ -7,7 +7,7 @@ from django.urls import reverse from django.utils.translation import gettext as _ from django.db.models.signals import pre_delete from portier.common import handlers -from . import signals_shared +from config import signals_shared class Stream(models.Model): @@ -20,32 +20,19 @@ class Stream(models.Model): # and only send signals when we are going to / coming from 0 published streams. publish_counter = models.PositiveIntegerField(default=0) - def on_publish(self, param): - # if so far there were less than one incoming streams, this stream - # is now being considered active - if self.publish_counter < 1: - signals_shared.stream_active.send(sender=self.__class__, - stream=str(self.stream), - param=param - ) - - # keep track of this incoming stream - self.publish_counter += 1 - self.save() - - def on_unpublish(self, param): - # note that we now have on less incoming stream + def save(self, *args, **kwargs): + super().save(*args, **kwargs) if self.publish_counter > 0: - self.publish_counter -= 1 - - # if we now have less than one incoming stream, this stream is being - # considered inactive - if self.publish_counter < 1: + signals_shared.stream_active.send(sender=self.__class__, + stream=str(self.stream), + param=None + ) + else: signals_shared.stream_inactive.send(sender=self.__class__, - stream=str(self.stream), - param=param - ) - self.save() + stream=str(self.stream), + param=None + ) + def get_absolute_url(self): return reverse('config:stream_detail', kwargs={'pk': self.pk}) @@ -59,6 +46,7 @@ class Stream(models.Model): pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream) + class Restream(models.Model): FORMATS = ( ('flv', 'flv (RTMP)'), @@ -94,4 +82,32 @@ class Restream(models.Model): return json.dumps(config) -pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) \ No newline at end of file +pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) + + +class SRSNode(models.Model): + name = models.CharField(max_length=100, help_text=_('srsnode_name_help')) + api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help')) + rtmp_base = models.CharField(max_length=256, help_text=_('srsnode_rtmp_base_help')) + active = models.BooleanField(help_text=_('srsnode_active_help')) + + class Meta: + verbose_name = _('srsnode_verbose_name') + verbose_name_plural = _('srsnode_verbose_name_plural') + + def __str__(self): + return self.name + + +class SRSStreamInstance(models.Model): + node = models.ForeignKey(SRSNode, on_delete=models.CASCADE, help_text=_('srsstreaminstance_node_help')) + stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('srsstreaminstance_stream_help')) + last_update = models.DateTimeField(auto_now=True, help_text=_('srsstreaminstance_last_update_help')) + statusdata = models.TextField(default="{}", help_text=_('srsstreaminstance_statusdata_help')) + + class Meta: + verbose_name = _('srsstreaminstance_verbose_name') + verbose_name_plural = _('srsstreaminstance_verbose_name_plural') + + def __str__(self): + return f"{self.stream} on {self.node}" diff --git a/source/config/signals.py b/source/config/signals.py index 5d48dc8..f120b01 100644 --- a/source/config/signals.py +++ b/source/config/signals.py @@ -1,21 +1,20 @@ from django.dispatch import receiver from django.db.models.signals import post_save, post_delete -from .models import Restream, Stream +from config.models import Restream, Stream +from config.signals_shared import stream_active, stream_inactive from concierge.models import Task -from .signals_shared import stream_active, stream_inactive @receiver(stream_active) -def create_tasks(sender, **kwargs): +def create_restream_tasks(sender, **kwargs): stream = Stream.objects.get(stream=kwargs['stream']) - instances = Restream.objects.filter(active=True, stream=stream) - for instance in instances: - task = Task(stream=instance.stream, type='restream', config_id=instance.id, - configuration=instance.get_json_config()) - task.save() + restreams = Restream.objects.filter(active=True, stream=stream) + for restream in restreams: + Task.objects.get_or_create(stream=restream.stream, type='restream', config_id=restream.id, + configuration=restream.get_json_config()) @receiver(post_save, sender=Restream) -def update_tasks(sender, **kwargs): +def update_restream_tasks(sender, **kwargs): instance = kwargs['instance'] # TODO: check for breaking changes using update_fields. This needs custom save_model functions though. @@ -34,7 +33,7 @@ def update_tasks(sender, **kwargs): @receiver(post_delete, sender=Restream) -def delete_tasks(sender, **kwargs): +def delete_restream_tasks(sender, **kwargs): instance = kwargs['instance'] # Get the current task instance if it exists, and remove it try: diff --git a/source/config/tasks.py b/source/config/tasks.py new file mode 100644 index 0000000..65d8044 --- /dev/null +++ b/source/config/tasks.py @@ -0,0 +1,51 @@ +import json +import logging + +from celery import shared_task +import requests + +from config.models import Stream, SRSNode, SRSStreamInstance + +logger = logging.getLogger(__name__) + + +def scrape_srs_servers(): + for node in SRSNode.objects.filter(active=True): + scrape_srs_server(node) + update_stream_counters() + + +def scrape_srs_server(node: SRSNode): + try: + response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2) + response.raise_for_status() + streams = response.json().get('streams', []) + + streamobjs = [] + + for streamjson in streams: + # find the corresponding stream object by comparing the stream uuid + stream = Stream.objects.get(stream=streamjson.get('name')) + streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node) + streaminstance.statusdata = json.dumps(streamjson) + streamobjs.append(stream) + + # Delete the stream instances that are not in the response + SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete() + except requests.exceptions.RequestException as e: + logger.error('Error while trying to scrape SRS server') + logger.error(node) + logger.error(e) + + +def update_stream_counters(): + for stream in Stream.objects.all(): + stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all()) + logger.error(stream.publish_counter) + logger.error(SRSStreamInstance.objects.filter(stream=stream).all()) + stream.save() + + +@shared_task +def async_scrape_srs_servers(): + scrape_srs_servers() diff --git a/source/config/templates/config/stream_detail.html b/source/config/templates/config/stream_detail.html index 72d6a42..80194a3 100644 --- a/source/config/templates/config/stream_detail.html +++ b/source/config/templates/config/stream_detail.html @@ -40,9 +40,10 @@

{% trans "set_this_stream_server_in_encoder" %}

{% trans "set_this_stream_id_in_encoder" %}

diff --git a/source/config/tests.py b/source/config/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/source/config/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/source/config/urls.py b/source/config/urls.py index 703a28c..ae30de1 100644 --- a/source/config/urls.py +++ b/source/config/urls.py @@ -1,18 +1,19 @@ from django.urls import path -from . import views +from config.views import restream, stream +from config.views.srs import callback_srs app_name = 'config' urlpatterns = [ - path('callback/srs', views.callback_srs, name='callback_srs'), - path('streams/', views.StreamList.as_view(), name='stream_list'), - path('streams//', views.StreamDetail.as_view(), name='stream_detail'), - path('streams//change', views.StreamChange.as_view(), name='stream_change'), - path('streams//delete', views.StreamDelete.as_view(), name='stream_delete'), - path('streams/create', views.StreamCreate.as_view(), name='stream_create'), - path('restream/', views.RestreamList.as_view(), name='restream_list'), - path('restream//', views.RestreamDetail.as_view(), name='restream_detail'), - path('restream//change', views.RestreamUpdate.as_view(), name='restream_change'), - path('restream//delete', views.RestreamDelete.as_view(), name='restream_delete'), - path('restream/create', views.RestreamCreate.as_view(), name='restream_create'), + path('srs/callback', callback_srs, name='callback_srs'), + path('streams/', stream.StreamList.as_view(), name='stream_list'), + path('streams//', stream.StreamDetail.as_view(), name='stream_detail'), + path('streams//change', stream.StreamChange.as_view(), name='stream_change'), + path('streams//delete', stream.StreamDelete.as_view(), name='stream_delete'), + path('streams/create', stream.StreamCreate.as_view(), name='stream_create'), + path('restream/', restream.RestreamList.as_view(), name='restream_list'), + path('restream//', restream.RestreamDetail.as_view(), name='restream_detail'), + path('restream//change', restream.RestreamUpdate.as_view(), name='restream_change'), + path('restream//delete', restream.RestreamDelete.as_view(), name='restream_delete'), + path('restream/create', restream.RestreamCreate.as_view(), name='restream_create'), ] diff --git a/source/config/views.py b/source/config/views.py deleted file mode 100644 index b16b255..0000000 --- a/source/config/views.py +++ /dev/null @@ -1,177 +0,0 @@ -import json -import logging - -from django.core.exceptions import ObjectDoesNotExist -from django.http import HttpResponse -from django.urls import reverse_lazy -from django.contrib.auth.decorators import login_required -from django.contrib.admin.utils import NestedObjects -from django.utils.decorators import method_decorator -from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie -from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView -from guardian.decorators import permission_required_or_403 -from guardian.shortcuts import assign_perm - -from . import models, forms - -logger = logging.getLogger(__name__) - - -@csrf_exempt -def callback_srs(request): - if request.method != 'POST': - return HttpResponse('1', status=405) - - try: - json_data = json.loads(request.body) - except json.decoder.JSONDecodeError: - return HttpResponse('1', status=400) - - try: - app_name = json_data['app'] - # QUIRK this is a weird bug when pushing from OME to SRS. wtf. - # for some reason srs interprets the incoming app as app/stream, and passes this on to portier. - # only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm - app_name = app_name.split('/')[0] - # ENDQUIRK - stream_name = json_data['stream'] - param = json_data['param'] - except KeyError: - return HttpResponse('1', status=401) - try: - stream = models.Stream.objects.get(stream=stream_name) - - except ObjectDoesNotExist: - return HttpResponse('1', status=401) - - if json_data.get('action') == 'on_publish': - stream.on_publish(param=param) - - if json_data.get('action') == 'on_unpublish': - stream.on_unpublish(param=param) - - return HttpResponse('0') - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_stream'), - name='dispatch') -class StreamList(ListView): - model = models.Stream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.view_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamDetail(DetailView): - model = models.Stream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.change_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamChange(UpdateView): - model = models.Stream - template_name_suffix = '_update_form' - fields = ['name'] - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_stream'), - name='dispatch') -class StreamCreate(CreateView): - model = models.Stream - fields = ['name'] - - def form_valid(self, form): - valid = super().form_valid(form) - if valid: - user = self.request.user - assign_perm('view_stream', user, self.object) - assign_perm('change_stream', user, self.object) - assign_perm('delete_stream', user, self.object) - return valid - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.delete_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamDelete(DeleteView): - model = models.Stream - success_url = reverse_lazy('config:stream_list') - - def get_context_data(self, **kwargs): - context = super().get_context_data(**kwargs) - - collector = NestedObjects(using='default') - collector.collect([self.object]) - - context['to_delete'] = collector.nested() - - print(context['to_delete']) - return context - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_restream'), - name='dispatch') -@method_decorator(ensure_csrf_cookie, name='dispatch') -class RestreamList(ListView): - model = models.Restream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.view_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamDetail(DetailView): - model = models.Restream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.change_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamUpdate(UpdateView): - model = models.Restream - form_class = forms.RestreamFilteredStreamForm - template_name_suffix = '_update_form' - - def get_form_kwargs(self): - kwargs = super().get_form_kwargs() - kwargs['user'] = self.request.user - return kwargs - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_restream'), - name='dispatch') -class RestreamCreate(CreateView): - model = models.Restream - form_class = forms.RestreamFilteredStreamForm - - def get_form_kwargs(self): - kwargs = super().get_form_kwargs() - kwargs['user'] = self.request.user - return kwargs - - def form_valid(self, form): - valid = super().form_valid(form) - if valid: - user = self.request.user - assign_perm('view_restream', user, self.object) - assign_perm('change_restream', user, self.object) - assign_perm('delete_restream', user, self.object) - return valid - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.delete_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamDelete(DeleteView): - model = models.Restream - success_url = reverse_lazy('config:restream_list') diff --git a/source/config/views/__init__.py b/source/config/views/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source/config/views/restream.py b/source/config/views/restream.py new file mode 100644 index 0000000..6fe898b --- /dev/null +++ b/source/config/views/restream.py @@ -0,0 +1,76 @@ +import logging + +from django.urls import reverse_lazy +from django.contrib.auth.decorators import login_required +from django.utils.decorators import method_decorator +from django.views.decorators.csrf import ensure_csrf_cookie +from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView +from guardian.decorators import permission_required_or_403 +from guardian.shortcuts import assign_perm + +from config import models, forms + +logger = logging.getLogger(__name__) + + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_restream'), + name='dispatch') +@method_decorator(ensure_csrf_cookie, name='dispatch') +class RestreamList(ListView): + model = models.Restream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.view_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamDetail(DetailView): + model = models.Restream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.change_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamUpdate(UpdateView): + model = models.Restream + form_class = forms.RestreamFilteredStreamForm + template_name_suffix = '_update_form' + + def get_form_kwargs(self): + kwargs = super().get_form_kwargs() + kwargs['user'] = self.request.user + return kwargs + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_restream'), + name='dispatch') +class RestreamCreate(CreateView): + model = models.Restream + form_class = forms.RestreamFilteredStreamForm + + def get_form_kwargs(self): + kwargs = super().get_form_kwargs() + kwargs['user'] = self.request.user + return kwargs + + def form_valid(self, form): + valid = super().form_valid(form) + if valid: + user = self.request.user + assign_perm('view_restream', user, self.object) + assign_perm('change_restream', user, self.object) + assign_perm('delete_restream', user, self.object) + return valid + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.delete_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamDelete(DeleteView): + model = models.Restream + success_url = reverse_lazy('config:restream_list') diff --git a/source/config/views/srs.py b/source/config/views/srs.py new file mode 100644 index 0000000..1d978a5 --- /dev/null +++ b/source/config/views/srs.py @@ -0,0 +1,47 @@ +import logging +import json + +from django.core.exceptions import ObjectDoesNotExist +from django.http import HttpResponse +from django.views.decorators.csrf import csrf_exempt + +from config.models import Stream + + +logger = logging.getLogger(__name__) + +from config.tasks import async_scrape_srs_servers + + +@csrf_exempt +def callback_srs(request): + if request.method != 'POST': + return HttpResponse('1', status=405) + + try: + json_data = json.loads(request.body) + except json.decoder.JSONDecodeError: + return HttpResponse('1', status=400) + + try: + app_name = json_data['app'] + # QUIRK this is a weird bug when pushing from OME to SRS. wtf. + # for some reason srs interprets the incoming app as app/stream, and passes this on to portier. + # only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm + app_name = app_name.split('/')[0] + # ENDQUIRK + stream_name = json_data['stream'] + param = json_data['param'] + except KeyError: + return HttpResponse('1', status=401) + try: + Stream.objects.get(stream=stream_name) + + except ObjectDoesNotExist: + return HttpResponse('1', status=401) + + # Scraping the server will make sure we are using the actual data from the server + # and updating the count of the stream instances. + async_scrape_srs_servers.delay() + + return HttpResponse('0') diff --git a/source/config/views/stream.py b/source/config/views/stream.py new file mode 100644 index 0000000..ccae7ce --- /dev/null +++ b/source/config/views/stream.py @@ -0,0 +1,81 @@ +import logging + +from django.urls import reverse_lazy +from django.contrib.auth.decorators import login_required +from django.contrib.admin.utils import NestedObjects +from django.utils.decorators import method_decorator +from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView +from guardian.decorators import permission_required_or_403 +from guardian.shortcuts import assign_perm + +from config import models, forms + +logger = logging.getLogger(__name__) + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_stream'), + name='dispatch') +class StreamList(ListView): + model = models.Stream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.view_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamDetail(DetailView): + model = models.Stream + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context["srs_nodes"] = models.SRSNode.objects.all() + return context + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.change_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamChange(UpdateView): + model = models.Stream + template_name_suffix = '_update_form' + fields = ['name'] + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_stream'), + name='dispatch') +class StreamCreate(CreateView): + model = models.Stream + fields = ['name'] + + def form_valid(self, form): + valid = super().form_valid(form) + if valid: + user = self.request.user + assign_perm('view_stream', user, self.object) + assign_perm('change_stream', user, self.object) + assign_perm('delete_stream', user, self.object) + return valid + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.delete_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamDelete(DeleteView): + model = models.Stream + success_url = reverse_lazy('config:stream_list') + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + + collector = NestedObjects(using='default') + collector.collect([self.object]) + + context['to_delete'] = collector.nested() + + print(context['to_delete']) + return context + diff --git a/source/portier/settings.py b/source/portier/settings.py index b303932..0f74cbf 100644 --- a/source/portier/settings.py +++ b/source/portier/settings.py @@ -31,7 +31,6 @@ CSRF_TRUSTED_ORIGINS = os.environ.get("DJANGO_CSRF_TRUSTED_ORIGINS", default="ht DEFAULT_GROUP = 'default' GLOBAL_STREAM_NAMESPACE = 'live' -ADVERTISED_RTMP_HOSTS = os.environ.get("ADVERTISED_RTMP_HOSTS", default="localhost").split(" ") # Application definition @@ -47,6 +46,7 @@ INSTALLED_APPS = [ 'django_registration', 'bootstrap4', 'fontawesome_5', + 'django_celery_beat', 'core.apps.CoreConfig', 'config.apps.ConfigConfig', 'concierge.apps.ConciergeConfig', @@ -182,5 +182,41 @@ CELERY_RESULT_BACKEND = "redis://{}:{}".format(os.environ.get('REDIS_HOST', defa CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json' +CELERY_BEAT_SCHEDULE = { + 'config:scrape_srs_servers': { + 'task': 'config.tasks.async_scrape_srs_servers', + 'schedule': 5.0 + }, +} +# Fixes incompatibility with tzlocal and pytz +DJANGO_CELERY_BEAT_TZ_AWARE = False + DEFAULT_AUTO_FIELD = 'django.db.models.AutoField' + +if DEBUG: + LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'console': { + 'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s', + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'console', + }, + }, + 'loggers': { + 'django': { + 'level': 'DEBUG', + 'handlers': ['console'], + }, + '': { + 'level': 'DEBUG', + 'handlers': ['console'], + }, + }, + } diff --git a/source/requirements.txt b/source/requirements.txt index 9146fce..c801828 100644 --- a/source/requirements.txt +++ b/source/requirements.txt @@ -3,9 +3,12 @@ django-registration>=3.1 django-bootstrap4 django-guardian django-fontawesome-5 -celery>=4.4 +django-celery-beat +django-filter +djangorestframework +djangorestframework-guardian +celery>=5.3 gunicorn>=20 psycopg2-binary -djangorestframework -django-filter -djangorestframework-guardian +requests +redis \ No newline at end of file diff --git a/source/restapi/urls.py b/source/restapi/urls.py index 5387c91..d662a42 100644 --- a/source/restapi/urls.py +++ b/source/restapi/urls.py @@ -5,8 +5,8 @@ from .views import StreamViewSet, RestreamViewSet router = routers.DefaultRouter() -router.register(r'streams', StreamViewSet) -router.register(r'restreams', RestreamViewSet) +router.register(r'stream', StreamViewSet) +router.register(r'restream', RestreamViewSet) app_name = 'restapi' diff --git a/source/restapi/views.py b/source/restapi/views.py index e7679dd..9676828 100644 --- a/source/restapi/views.py +++ b/source/restapi/views.py @@ -33,9 +33,9 @@ class RestreamSerializer(ObjectPermissionsAssignmentMixin, serializers.ModelSeri def get_permissions_map(self, created): current_user = self.context['request'].user return { - 'view_restreamconfig': [current_user], - 'change_restreamconfig': [current_user], - 'delete_restreamconfig': [current_user] + 'view_restream': [current_user], + 'change_restream': [current_user], + 'delete_restream': [current_user] } def validate_stream(self, value): diff --git a/source/start.sh b/source/start.sh index 617bcce..f9a423e 100755 --- a/source/start.sh +++ b/source/start.sh @@ -29,6 +29,8 @@ initialize() { wait_for_redis wait_for_database -migrate -initialize -supervisord -n -c /etc/supervisord.conf +if [ "${COMPONENT}x" = "webx" ]; then + migrate + initialize +fi +supervisord -n -c "/etc/supervisord-${COMPONENT:-web}.conf" \ No newline at end of file diff --git a/source/static/js/restream-list.js b/source/static/js/restream-list.js index 73cbde5..0b4259d 100644 --- a/source/static/js/restream-list.js +++ b/source/static/js/restream-list.js @@ -22,7 +22,7 @@ var app = new Vue({ }, fetchData() { axios - .get('/api/v1/restreams/') + .get('/api/v1/restream/') .then(response => { this.cfgs = response.data this.isLoading = false diff --git a/source/static/js/stream-list.js b/source/static/js/stream-list.js index 238d41e..159a2c9 100644 --- a/source/static/js/stream-list.js +++ b/source/static/js/stream-list.js @@ -16,7 +16,7 @@ var app = new Vue({ }, fetchData() { axios - .get('/api/v1/streams/') + .get('/api/v1/stream/') .then(response => { this.streams = response.data this.isLoading = false @@ -29,6 +29,6 @@ var app = new Vue({ this.fetchData() setInterval(function () { this.fetchData(); - }.bind(this), 5000); + }.bind(this), 1000); } }) diff --git a/srs.conf b/srs.conf index 96334cb..3a3528f 100644 --- a/srs.conf +++ b/srs.conf @@ -24,7 +24,7 @@ vhost __defaultVhost__ { } http_hooks { enabled on; - on_publish http://app/config/callback/srs; - on_unpublish http://app/config/callback/srs; + on_publish http://app/config/srs/callback; + on_unpublish http://app/config/srs/callback; } } \ No newline at end of file