diff --git a/Dockerfile b/Dockerfile index 056cdf2..d032a0f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN pip install -r requirements.txt # add supervisor and nginx configs ADD ./docker/nginx.conf /etc/nginx/nginx.conf -ADD ./docker/supervisord.conf /etc/supervisord.conf +ADD ./docker/supervisor*.conf /etc/ # add user RUN addgroup -S portier && adduser -S portier -G portier @@ -30,4 +30,5 @@ RUN ./fetch_frontend_libs.sh \ RUN ./manage.py collectstatic --noinput --link RUN ./manage.py compilemessages +ENV COMPONENT=web CMD ["/app/start.sh"] diff --git a/docker-compose.yaml b/docker-compose.yaml index a99fee5..a43dbbd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -23,7 +23,27 @@ services: - "EMAIL_HOST=${EMAIL_HOST}" - "EMAIL_HOST_USER=${EMAIL_HOST_USER}" - "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}" - - "ADVERTISED_RTMP_HOSTS=localhost:1935 laserpope:1234" + celerybeat: + build: . + depends_on: + - postgres + - redis + environment: + - COMPONENT=celerybeat + - DEBUG=1 + - "SECRET_KEY=D4mn1t_Ch4nG3_M3!1!!" + - SQL_ENGINE=django.db.backends.postgresql + - SQL_USER=portier + - SQL_PASSWORD=portier + - SQL_DATABASE=portier + - SQL_HOST=postgres + - SQL_PORT=5432 + - REDIS_HOST=redis + - REDIS_PORT=6379 + - "EMAIL_FROM=${EMAIL_FROM}" + - "EMAIL_HOST=${EMAIL_HOST}" + - "EMAIL_HOST_USER=${EMAIL_HOST_USER}" + - "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}" redis: image: redis:7-alpine postgres: diff --git a/docker/supervisord-celerybeat.conf b/docker/supervisord-celerybeat.conf new file mode 100644 index 0000000..9a5d085 --- /dev/null +++ b/docker/supervisord-celerybeat.conf @@ -0,0 +1,21 @@ +[supervisord] +logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log) +logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB) +logfile_backups=10 ; (num of main logfile rotation backups;default 10) +loglevel=info ; (log level;default info; others: debug,warn,trace) +nodaemon=true ; (start in foreground if true;default false) +user=root + +[program:celery_beat] +directory=/app +command=/usr/local/bin/celery -A portier beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler +autostart=true +autorestart=true +priority=5 +stdout_events_enabled=true +stderr_events_enabled=true +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +user=portier diff --git a/docker/supervisord.conf b/docker/supervisord-web.conf similarity index 76% rename from docker/supervisord.conf rename to docker/supervisord-web.conf index 84981d8..65df5ff 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord-web.conf @@ -6,6 +6,20 @@ loglevel=info ; (log level;default info; others: debug,warn,trace nodaemon=true ; (start in foreground if true;default false) user=root +[program:celery_worker] +directory=/app +command=/usr/local/bin/celery -A portier worker -l INFO +autostart=true +autorestart=true +priority=5 +stdout_events_enabled=true +stderr_events_enabled=true +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +user=portier + [program:gunicorn] directory=/app command=/usr/local/bin/gunicorn -w 4 --bind 0.0.0.0:8000 portier.wsgi diff --git a/source/config/admin.py b/source/config/admin.py index beba580..b8c2993 100644 --- a/source/config/admin.py +++ b/source/config/admin.py @@ -1,6 +1,6 @@ from django.contrib import admin from guardian.admin import GuardedModelAdmin -from .models import Stream, Restream +from config.models import Stream, Restream, SRSNode, SRSStreamInstance @admin.register(Stream) @@ -10,3 +10,18 @@ class StreamAdmin(GuardedModelAdmin): @admin.register(Restream) class RestreamAdmin(GuardedModelAdmin): fields = ['name', 'active', 'stream', 'format', 'target'] + +@admin.register(SRSNode) +class SRSNodeAdmin(GuardedModelAdmin): + fields = ['name', 'api_base', 'rtmp_base', 'active'] + +@admin.register(SRSStreamInstance) +class SRSStreamInstanceAdmin(GuardedModelAdmin): + fields = ['stream', 'node'] + + # Stream Instances are just representations of the streams on the SRS server, + # and should not be addable/editable. Deleting them can be useful though. + def has_change_permission(self, request, obj=None): + return False + def has_add_permission(self, request): + return False \ No newline at end of file diff --git a/source/config/migrations/0002_srsnode_srsstreaminstance.py b/source/config/migrations/0002_srsnode_srsstreaminstance.py new file mode 100644 index 0000000..47b301f --- /dev/null +++ b/source/config/migrations/0002_srsnode_srsstreaminstance.py @@ -0,0 +1,42 @@ +# Generated by Django 5.0.2 on 2024-02-29 17:26 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('config', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='SRSNode', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(help_text='srsnode_name_help', max_length=100)), + ('api_base', models.CharField(help_text='srsnode_api_base_help', max_length=256)), + ('rtmp_base', models.CharField(help_text='srsnode_rtmp_base_help', max_length=256)), + ('active', models.BooleanField(help_text='srsnode_active_help')), + ], + options={ + 'verbose_name': 'srsnode_verbose_name', + 'verbose_name_plural': 'srsnode_verbose_name_plural', + }, + ), + migrations.CreateModel( + name='SRSStreamInstance', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('last_update', models.DateTimeField(auto_now=True, help_text='srsstreaminstance_last_update_help')), + ('statusdata', models.TextField(default='{}', help_text='srsstreaminstance_statusdata_help')), + ('node', models.ForeignKey(help_text='srsstreaminstance_node_help', on_delete=django.db.models.deletion.CASCADE, to='config.srsnode')), + ('stream', models.ForeignKey(help_text='srsstreaminstance_stream_help', on_delete=django.db.models.deletion.CASCADE, to='config.stream')), + ], + options={ + 'verbose_name': 'srsstreaminstance_verbose_name', + 'verbose_name_plural': 'srsstreaminstance_verbose_name_plural', + }, + ), + ] diff --git a/source/config/models.py b/source/config/models.py index 0cf22d1..517541c 100644 --- a/source/config/models.py +++ b/source/config/models.py @@ -7,7 +7,7 @@ from django.urls import reverse from django.utils.translation import gettext as _ from django.db.models.signals import pre_delete from portier.common import handlers -from . import signals_shared +from config import signals_shared class Stream(models.Model): @@ -20,32 +20,19 @@ class Stream(models.Model): # and only send signals when we are going to / coming from 0 published streams. publish_counter = models.PositiveIntegerField(default=0) - def on_publish(self, param): - # if so far there were less than one incoming streams, this stream - # is now being considered active - if self.publish_counter < 1: - signals_shared.stream_active.send(sender=self.__class__, - stream=str(self.stream), - param=param - ) - - # keep track of this incoming stream - self.publish_counter += 1 - self.save() - - def on_unpublish(self, param): - # note that we now have on less incoming stream + def save(self, *args, **kwargs): + super().save(*args, **kwargs) if self.publish_counter > 0: - self.publish_counter -= 1 - - # if we now have less than one incoming stream, this stream is being - # considered inactive - if self.publish_counter < 1: + signals_shared.stream_active.send(sender=self.__class__, + stream=str(self.stream), + param=None + ) + else: signals_shared.stream_inactive.send(sender=self.__class__, - stream=str(self.stream), - param=param - ) - self.save() + stream=str(self.stream), + param=None + ) + def get_absolute_url(self): return reverse('config:stream_detail', kwargs={'pk': self.pk}) @@ -59,6 +46,7 @@ class Stream(models.Model): pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream) + class Restream(models.Model): FORMATS = ( ('flv', 'flv (RTMP)'), @@ -94,4 +82,32 @@ class Restream(models.Model): return json.dumps(config) -pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) \ No newline at end of file +pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) + + +class SRSNode(models.Model): + name = models.CharField(max_length=100, help_text=_('srsnode_name_help')) + api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help')) + rtmp_base = models.CharField(max_length=256, help_text=_('srsnode_rtmp_base_help')) + active = models.BooleanField(help_text=_('srsnode_active_help')) + + class Meta: + verbose_name = _('srsnode_verbose_name') + verbose_name_plural = _('srsnode_verbose_name_plural') + + def __str__(self): + return self.name + + +class SRSStreamInstance(models.Model): + node = models.ForeignKey(SRSNode, on_delete=models.CASCADE, help_text=_('srsstreaminstance_node_help')) + stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('srsstreaminstance_stream_help')) + last_update = models.DateTimeField(auto_now=True, help_text=_('srsstreaminstance_last_update_help')) + statusdata = models.TextField(default="{}", help_text=_('srsstreaminstance_statusdata_help')) + + class Meta: + verbose_name = _('srsstreaminstance_verbose_name') + verbose_name_plural = _('srsstreaminstance_verbose_name_plural') + + def __str__(self): + return f"{self.stream} on {self.node}" diff --git a/source/config/signals.py b/source/config/signals.py index 5d48dc8..f120b01 100644 --- a/source/config/signals.py +++ b/source/config/signals.py @@ -1,21 +1,20 @@ from django.dispatch import receiver from django.db.models.signals import post_save, post_delete -from .models import Restream, Stream +from config.models import Restream, Stream +from config.signals_shared import stream_active, stream_inactive from concierge.models import Task -from .signals_shared import stream_active, stream_inactive @receiver(stream_active) -def create_tasks(sender, **kwargs): +def create_restream_tasks(sender, **kwargs): stream = Stream.objects.get(stream=kwargs['stream']) - instances = Restream.objects.filter(active=True, stream=stream) - for instance in instances: - task = Task(stream=instance.stream, type='restream', config_id=instance.id, - configuration=instance.get_json_config()) - task.save() + restreams = Restream.objects.filter(active=True, stream=stream) + for restream in restreams: + Task.objects.get_or_create(stream=restream.stream, type='restream', config_id=restream.id, + configuration=restream.get_json_config()) @receiver(post_save, sender=Restream) -def update_tasks(sender, **kwargs): +def update_restream_tasks(sender, **kwargs): instance = kwargs['instance'] # TODO: check for breaking changes using update_fields. This needs custom save_model functions though. @@ -34,7 +33,7 @@ def update_tasks(sender, **kwargs): @receiver(post_delete, sender=Restream) -def delete_tasks(sender, **kwargs): +def delete_restream_tasks(sender, **kwargs): instance = kwargs['instance'] # Get the current task instance if it exists, and remove it try: diff --git a/source/config/tasks.py b/source/config/tasks.py new file mode 100644 index 0000000..65d8044 --- /dev/null +++ b/source/config/tasks.py @@ -0,0 +1,51 @@ +import json +import logging + +from celery import shared_task +import requests + +from config.models import Stream, SRSNode, SRSStreamInstance + +logger = logging.getLogger(__name__) + + +def scrape_srs_servers(): + for node in SRSNode.objects.filter(active=True): + scrape_srs_server(node) + update_stream_counters() + + +def scrape_srs_server(node: SRSNode): + try: + response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2) + response.raise_for_status() + streams = response.json().get('streams', []) + + streamobjs = [] + + for streamjson in streams: + # find the corresponding stream object by comparing the stream uuid + stream = Stream.objects.get(stream=streamjson.get('name')) + streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node) + streaminstance.statusdata = json.dumps(streamjson) + streamobjs.append(stream) + + # Delete the stream instances that are not in the response + SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete() + except requests.exceptions.RequestException as e: + logger.error('Error while trying to scrape SRS server') + logger.error(node) + logger.error(e) + + +def update_stream_counters(): + for stream in Stream.objects.all(): + stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all()) + logger.error(stream.publish_counter) + logger.error(SRSStreamInstance.objects.filter(stream=stream).all()) + stream.save() + + +@shared_task +def async_scrape_srs_servers(): + scrape_srs_servers() diff --git a/source/config/templates/config/stream_detail.html b/source/config/templates/config/stream_detail.html index 72d6a42..80194a3 100644 --- a/source/config/templates/config/stream_detail.html +++ b/source/config/templates/config/stream_detail.html @@ -40,9 +40,10 @@

{% trans "set_this_stream_server_in_encoder" %}

{% trans "set_this_stream_id_in_encoder" %}

diff --git a/source/config/tests.py b/source/config/tests.py deleted file mode 100644 index 7ce503c..0000000 --- a/source/config/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/source/config/urls.py b/source/config/urls.py index 703a28c..ae30de1 100644 --- a/source/config/urls.py +++ b/source/config/urls.py @@ -1,18 +1,19 @@ from django.urls import path -from . import views +from config.views import restream, stream +from config.views.srs import callback_srs app_name = 'config' urlpatterns = [ - path('callback/srs', views.callback_srs, name='callback_srs'), - path('streams/', views.StreamList.as_view(), name='stream_list'), - path('streams//', views.StreamDetail.as_view(), name='stream_detail'), - path('streams//change', views.StreamChange.as_view(), name='stream_change'), - path('streams//delete', views.StreamDelete.as_view(), name='stream_delete'), - path('streams/create', views.StreamCreate.as_view(), name='stream_create'), - path('restream/', views.RestreamList.as_view(), name='restream_list'), - path('restream//', views.RestreamDetail.as_view(), name='restream_detail'), - path('restream//change', views.RestreamUpdate.as_view(), name='restream_change'), - path('restream//delete', views.RestreamDelete.as_view(), name='restream_delete'), - path('restream/create', views.RestreamCreate.as_view(), name='restream_create'), + path('srs/callback', callback_srs, name='callback_srs'), + path('streams/', stream.StreamList.as_view(), name='stream_list'), + path('streams//', stream.StreamDetail.as_view(), name='stream_detail'), + path('streams//change', stream.StreamChange.as_view(), name='stream_change'), + path('streams//delete', stream.StreamDelete.as_view(), name='stream_delete'), + path('streams/create', stream.StreamCreate.as_view(), name='stream_create'), + path('restream/', restream.RestreamList.as_view(), name='restream_list'), + path('restream//', restream.RestreamDetail.as_view(), name='restream_detail'), + path('restream//change', restream.RestreamUpdate.as_view(), name='restream_change'), + path('restream//delete', restream.RestreamDelete.as_view(), name='restream_delete'), + path('restream/create', restream.RestreamCreate.as_view(), name='restream_create'), ] diff --git a/source/config/views.py b/source/config/views.py deleted file mode 100644 index b16b255..0000000 --- a/source/config/views.py +++ /dev/null @@ -1,177 +0,0 @@ -import json -import logging - -from django.core.exceptions import ObjectDoesNotExist -from django.http import HttpResponse -from django.urls import reverse_lazy -from django.contrib.auth.decorators import login_required -from django.contrib.admin.utils import NestedObjects -from django.utils.decorators import method_decorator -from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie -from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView -from guardian.decorators import permission_required_or_403 -from guardian.shortcuts import assign_perm - -from . import models, forms - -logger = logging.getLogger(__name__) - - -@csrf_exempt -def callback_srs(request): - if request.method != 'POST': - return HttpResponse('1', status=405) - - try: - json_data = json.loads(request.body) - except json.decoder.JSONDecodeError: - return HttpResponse('1', status=400) - - try: - app_name = json_data['app'] - # QUIRK this is a weird bug when pushing from OME to SRS. wtf. - # for some reason srs interprets the incoming app as app/stream, and passes this on to portier. - # only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm - app_name = app_name.split('/')[0] - # ENDQUIRK - stream_name = json_data['stream'] - param = json_data['param'] - except KeyError: - return HttpResponse('1', status=401) - try: - stream = models.Stream.objects.get(stream=stream_name) - - except ObjectDoesNotExist: - return HttpResponse('1', status=401) - - if json_data.get('action') == 'on_publish': - stream.on_publish(param=param) - - if json_data.get('action') == 'on_unpublish': - stream.on_unpublish(param=param) - - return HttpResponse('0') - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_stream'), - name='dispatch') -class StreamList(ListView): - model = models.Stream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.view_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamDetail(DetailView): - model = models.Stream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.change_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamChange(UpdateView): - model = models.Stream - template_name_suffix = '_update_form' - fields = ['name'] - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_stream'), - name='dispatch') -class StreamCreate(CreateView): - model = models.Stream - fields = ['name'] - - def form_valid(self, form): - valid = super().form_valid(form) - if valid: - user = self.request.user - assign_perm('view_stream', user, self.object) - assign_perm('change_stream', user, self.object) - assign_perm('delete_stream', user, self.object) - return valid - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.delete_stream', - (models.Stream, 'pk', 'pk')), - name='dispatch') -class StreamDelete(DeleteView): - model = models.Stream - success_url = reverse_lazy('config:stream_list') - - def get_context_data(self, **kwargs): - context = super().get_context_data(**kwargs) - - collector = NestedObjects(using='default') - collector.collect([self.object]) - - context['to_delete'] = collector.nested() - - print(context['to_delete']) - return context - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_restream'), - name='dispatch') -@method_decorator(ensure_csrf_cookie, name='dispatch') -class RestreamList(ListView): - model = models.Restream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.view_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamDetail(DetailView): - model = models.Restream - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.change_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamUpdate(UpdateView): - model = models.Restream - form_class = forms.RestreamFilteredStreamForm - template_name_suffix = '_update_form' - - def get_form_kwargs(self): - kwargs = super().get_form_kwargs() - kwargs['user'] = self.request.user - return kwargs - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.add_restream'), - name='dispatch') -class RestreamCreate(CreateView): - model = models.Restream - form_class = forms.RestreamFilteredStreamForm - - def get_form_kwargs(self): - kwargs = super().get_form_kwargs() - kwargs['user'] = self.request.user - return kwargs - - def form_valid(self, form): - valid = super().form_valid(form) - if valid: - user = self.request.user - assign_perm('view_restream', user, self.object) - assign_perm('change_restream', user, self.object) - assign_perm('delete_restream', user, self.object) - return valid - - -@method_decorator(login_required, name='dispatch') -@method_decorator(permission_required_or_403('config.delete_restream', - (models.Restream, 'pk', 'pk')), - name='dispatch') -class RestreamDelete(DeleteView): - model = models.Restream - success_url = reverse_lazy('config:restream_list') diff --git a/source/config/views/__init__.py b/source/config/views/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source/config/views/restream.py b/source/config/views/restream.py new file mode 100644 index 0000000..6fe898b --- /dev/null +++ b/source/config/views/restream.py @@ -0,0 +1,76 @@ +import logging + +from django.urls import reverse_lazy +from django.contrib.auth.decorators import login_required +from django.utils.decorators import method_decorator +from django.views.decorators.csrf import ensure_csrf_cookie +from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView +from guardian.decorators import permission_required_or_403 +from guardian.shortcuts import assign_perm + +from config import models, forms + +logger = logging.getLogger(__name__) + + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_restream'), + name='dispatch') +@method_decorator(ensure_csrf_cookie, name='dispatch') +class RestreamList(ListView): + model = models.Restream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.view_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamDetail(DetailView): + model = models.Restream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.change_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamUpdate(UpdateView): + model = models.Restream + form_class = forms.RestreamFilteredStreamForm + template_name_suffix = '_update_form' + + def get_form_kwargs(self): + kwargs = super().get_form_kwargs() + kwargs['user'] = self.request.user + return kwargs + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_restream'), + name='dispatch') +class RestreamCreate(CreateView): + model = models.Restream + form_class = forms.RestreamFilteredStreamForm + + def get_form_kwargs(self): + kwargs = super().get_form_kwargs() + kwargs['user'] = self.request.user + return kwargs + + def form_valid(self, form): + valid = super().form_valid(form) + if valid: + user = self.request.user + assign_perm('view_restream', user, self.object) + assign_perm('change_restream', user, self.object) + assign_perm('delete_restream', user, self.object) + return valid + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.delete_restream', + (models.Restream, 'pk', 'pk')), + name='dispatch') +class RestreamDelete(DeleteView): + model = models.Restream + success_url = reverse_lazy('config:restream_list') diff --git a/source/config/views/srs.py b/source/config/views/srs.py new file mode 100644 index 0000000..1d978a5 --- /dev/null +++ b/source/config/views/srs.py @@ -0,0 +1,47 @@ +import logging +import json + +from django.core.exceptions import ObjectDoesNotExist +from django.http import HttpResponse +from django.views.decorators.csrf import csrf_exempt + +from config.models import Stream + + +logger = logging.getLogger(__name__) + +from config.tasks import async_scrape_srs_servers + + +@csrf_exempt +def callback_srs(request): + if request.method != 'POST': + return HttpResponse('1', status=405) + + try: + json_data = json.loads(request.body) + except json.decoder.JSONDecodeError: + return HttpResponse('1', status=400) + + try: + app_name = json_data['app'] + # QUIRK this is a weird bug when pushing from OME to SRS. wtf. + # for some reason srs interprets the incoming app as app/stream, and passes this on to portier. + # only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm + app_name = app_name.split('/')[0] + # ENDQUIRK + stream_name = json_data['stream'] + param = json_data['param'] + except KeyError: + return HttpResponse('1', status=401) + try: + Stream.objects.get(stream=stream_name) + + except ObjectDoesNotExist: + return HttpResponse('1', status=401) + + # Scraping the server will make sure we are using the actual data from the server + # and updating the count of the stream instances. + async_scrape_srs_servers.delay() + + return HttpResponse('0') diff --git a/source/config/views/stream.py b/source/config/views/stream.py new file mode 100644 index 0000000..ccae7ce --- /dev/null +++ b/source/config/views/stream.py @@ -0,0 +1,81 @@ +import logging + +from django.urls import reverse_lazy +from django.contrib.auth.decorators import login_required +from django.contrib.admin.utils import NestedObjects +from django.utils.decorators import method_decorator +from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView +from guardian.decorators import permission_required_or_403 +from guardian.shortcuts import assign_perm + +from config import models, forms + +logger = logging.getLogger(__name__) + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_stream'), + name='dispatch') +class StreamList(ListView): + model = models.Stream + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.view_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamDetail(DetailView): + model = models.Stream + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + context["srs_nodes"] = models.SRSNode.objects.all() + return context + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.change_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamChange(UpdateView): + model = models.Stream + template_name_suffix = '_update_form' + fields = ['name'] + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.add_stream'), + name='dispatch') +class StreamCreate(CreateView): + model = models.Stream + fields = ['name'] + + def form_valid(self, form): + valid = super().form_valid(form) + if valid: + user = self.request.user + assign_perm('view_stream', user, self.object) + assign_perm('change_stream', user, self.object) + assign_perm('delete_stream', user, self.object) + return valid + + +@method_decorator(login_required, name='dispatch') +@method_decorator(permission_required_or_403('config.delete_stream', + (models.Stream, 'pk', 'pk')), + name='dispatch') +class StreamDelete(DeleteView): + model = models.Stream + success_url = reverse_lazy('config:stream_list') + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + + collector = NestedObjects(using='default') + collector.collect([self.object]) + + context['to_delete'] = collector.nested() + + print(context['to_delete']) + return context + diff --git a/source/portier/settings.py b/source/portier/settings.py index b303932..0f74cbf 100644 --- a/source/portier/settings.py +++ b/source/portier/settings.py @@ -31,7 +31,6 @@ CSRF_TRUSTED_ORIGINS = os.environ.get("DJANGO_CSRF_TRUSTED_ORIGINS", default="ht DEFAULT_GROUP = 'default' GLOBAL_STREAM_NAMESPACE = 'live' -ADVERTISED_RTMP_HOSTS = os.environ.get("ADVERTISED_RTMP_HOSTS", default="localhost").split(" ") # Application definition @@ -47,6 +46,7 @@ INSTALLED_APPS = [ 'django_registration', 'bootstrap4', 'fontawesome_5', + 'django_celery_beat', 'core.apps.CoreConfig', 'config.apps.ConfigConfig', 'concierge.apps.ConciergeConfig', @@ -182,5 +182,41 @@ CELERY_RESULT_BACKEND = "redis://{}:{}".format(os.environ.get('REDIS_HOST', defa CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json' +CELERY_BEAT_SCHEDULE = { + 'config:scrape_srs_servers': { + 'task': 'config.tasks.async_scrape_srs_servers', + 'schedule': 5.0 + }, +} +# Fixes incompatibility with tzlocal and pytz +DJANGO_CELERY_BEAT_TZ_AWARE = False + DEFAULT_AUTO_FIELD = 'django.db.models.AutoField' + +if DEBUG: + LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'console': { + 'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s', + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'console', + }, + }, + 'loggers': { + 'django': { + 'level': 'DEBUG', + 'handlers': ['console'], + }, + '': { + 'level': 'DEBUG', + 'handlers': ['console'], + }, + }, + } diff --git a/source/requirements.txt b/source/requirements.txt index 9146fce..c801828 100644 --- a/source/requirements.txt +++ b/source/requirements.txt @@ -3,9 +3,12 @@ django-registration>=3.1 django-bootstrap4 django-guardian django-fontawesome-5 -celery>=4.4 +django-celery-beat +django-filter +djangorestframework +djangorestframework-guardian +celery>=5.3 gunicorn>=20 psycopg2-binary -djangorestframework -django-filter -djangorestframework-guardian +requests +redis \ No newline at end of file diff --git a/source/restapi/urls.py b/source/restapi/urls.py index 5387c91..d662a42 100644 --- a/source/restapi/urls.py +++ b/source/restapi/urls.py @@ -5,8 +5,8 @@ from .views import StreamViewSet, RestreamViewSet router = routers.DefaultRouter() -router.register(r'streams', StreamViewSet) -router.register(r'restreams', RestreamViewSet) +router.register(r'stream', StreamViewSet) +router.register(r'restream', RestreamViewSet) app_name = 'restapi' diff --git a/source/restapi/views.py b/source/restapi/views.py index e7679dd..9676828 100644 --- a/source/restapi/views.py +++ b/source/restapi/views.py @@ -33,9 +33,9 @@ class RestreamSerializer(ObjectPermissionsAssignmentMixin, serializers.ModelSeri def get_permissions_map(self, created): current_user = self.context['request'].user return { - 'view_restreamconfig': [current_user], - 'change_restreamconfig': [current_user], - 'delete_restreamconfig': [current_user] + 'view_restream': [current_user], + 'change_restream': [current_user], + 'delete_restream': [current_user] } def validate_stream(self, value): diff --git a/source/start.sh b/source/start.sh index 617bcce..f9a423e 100755 --- a/source/start.sh +++ b/source/start.sh @@ -29,6 +29,8 @@ initialize() { wait_for_redis wait_for_database -migrate -initialize -supervisord -n -c /etc/supervisord.conf +if [ "${COMPONENT}x" = "webx" ]; then + migrate + initialize +fi +supervisord -n -c "/etc/supervisord-${COMPONENT:-web}.conf" \ No newline at end of file diff --git a/source/static/js/restream-list.js b/source/static/js/restream-list.js index 73cbde5..0b4259d 100644 --- a/source/static/js/restream-list.js +++ b/source/static/js/restream-list.js @@ -22,7 +22,7 @@ var app = new Vue({ }, fetchData() { axios - .get('/api/v1/restreams/') + .get('/api/v1/restream/') .then(response => { this.cfgs = response.data this.isLoading = false diff --git a/source/static/js/stream-list.js b/source/static/js/stream-list.js index 238d41e..159a2c9 100644 --- a/source/static/js/stream-list.js +++ b/source/static/js/stream-list.js @@ -16,7 +16,7 @@ var app = new Vue({ }, fetchData() { axios - .get('/api/v1/streams/') + .get('/api/v1/stream/') .then(response => { this.streams = response.data this.isLoading = false @@ -29,6 +29,6 @@ var app = new Vue({ this.fetchData() setInterval(function () { this.fetchData(); - }.bind(this), 5000); + }.bind(this), 1000); } }) diff --git a/srs.conf b/srs.conf index 96334cb..3a3528f 100644 --- a/srs.conf +++ b/srs.conf @@ -24,7 +24,7 @@ vhost __defaultVhost__ { } http_hooks { enabled on; - on_publish http://app/config/callback/srs; - on_unpublish http://app/config/callback/srs; + on_publish http://app/config/srs/callback; + on_unpublish http://app/config/srs/callback; } } \ No newline at end of file