implement active srs sync and celery
reworks the entire logic how active streams are being tracked. instead of keeping a counter by listening only to srs callback hooks, portier will now actively scrape the http api of known srs nodes to retrieve information about all currently existing streams on a srs node. this prevents portier from being wrong about active stream counts due to drift, and allows us to show more information about stream data to users in the future, as the srs api will also expose information about used codecs, stream resolution and data rates as seen by srs itself. to implement this, the previous remains of celery have been made active again, and it is now required to run exactly one celery beat instance and one or more celery workers beside portier itself. these will make sure that every 5 seconds all srs nodes are actively being scraped, on top of the scrape that is triggered by every srs callback hook. this keeps the data always superfresh. the celery beat function allows us to implement cron-based automation for many other functions (restream, pull, etc) in the future as well, so it's okay to pull in something more heavy here rather than just using a system cron and executing a custom management command all the time.
This commit is contained in:
parent
ccfe3e14ad
commit
124e366268
|
@ -14,7 +14,7 @@ RUN pip install -r requirements.txt
|
|||
|
||||
# add supervisor and nginx configs
|
||||
ADD ./docker/nginx.conf /etc/nginx/nginx.conf
|
||||
ADD ./docker/supervisord.conf /etc/supervisord.conf
|
||||
ADD ./docker/supervisor*.conf /etc/
|
||||
|
||||
# add user
|
||||
RUN addgroup -S portier && adduser -S portier -G portier
|
||||
|
@ -30,4 +30,5 @@ RUN ./fetch_frontend_libs.sh \
|
|||
RUN ./manage.py collectstatic --noinput --link
|
||||
RUN ./manage.py compilemessages
|
||||
|
||||
ENV COMPONENT=web
|
||||
CMD ["/app/start.sh"]
|
||||
|
|
|
@ -23,7 +23,27 @@ services:
|
|||
- "EMAIL_HOST=${EMAIL_HOST}"
|
||||
- "EMAIL_HOST_USER=${EMAIL_HOST_USER}"
|
||||
- "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}"
|
||||
- "ADVERTISED_RTMP_HOSTS=localhost:1935 laserpope:1234"
|
||||
celerybeat:
|
||||
build: .
|
||||
depends_on:
|
||||
- postgres
|
||||
- redis
|
||||
environment:
|
||||
- COMPONENT=celerybeat
|
||||
- DEBUG=1
|
||||
- "SECRET_KEY=D4mn1t_Ch4nG3_M3!1!!"
|
||||
- SQL_ENGINE=django.db.backends.postgresql
|
||||
- SQL_USER=portier
|
||||
- SQL_PASSWORD=portier
|
||||
- SQL_DATABASE=portier
|
||||
- SQL_HOST=postgres
|
||||
- SQL_PORT=5432
|
||||
- REDIS_HOST=redis
|
||||
- REDIS_PORT=6379
|
||||
- "EMAIL_FROM=${EMAIL_FROM}"
|
||||
- "EMAIL_HOST=${EMAIL_HOST}"
|
||||
- "EMAIL_HOST_USER=${EMAIL_HOST_USER}"
|
||||
- "EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}"
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
postgres:
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
[supervisord]
|
||||
logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log)
|
||||
logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB)
|
||||
logfile_backups=10 ; (num of main logfile rotation backups;default 10)
|
||||
loglevel=info ; (log level;default info; others: debug,warn,trace)
|
||||
nodaemon=true ; (start in foreground if true;default false)
|
||||
user=root
|
||||
|
||||
[program:celery_beat]
|
||||
directory=/app
|
||||
command=/usr/local/bin/celery -A portier beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
|
||||
autostart=true
|
||||
autorestart=true
|
||||
priority=5
|
||||
stdout_events_enabled=true
|
||||
stderr_events_enabled=true
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
user=portier
|
|
@ -6,6 +6,20 @@ loglevel=info ; (log level;default info; others: debug,warn,trace
|
|||
nodaemon=true ; (start in foreground if true;default false)
|
||||
user=root
|
||||
|
||||
[program:celery_worker]
|
||||
directory=/app
|
||||
command=/usr/local/bin/celery -A portier worker -l INFO
|
||||
autostart=true
|
||||
autorestart=true
|
||||
priority=5
|
||||
stdout_events_enabled=true
|
||||
stderr_events_enabled=true
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
user=portier
|
||||
|
||||
[program:gunicorn]
|
||||
directory=/app
|
||||
command=/usr/local/bin/gunicorn -w 4 --bind 0.0.0.0:8000 portier.wsgi
|
|
@ -1,6 +1,6 @@
|
|||
from django.contrib import admin
|
||||
from guardian.admin import GuardedModelAdmin
|
||||
from .models import Stream, Restream
|
||||
from config.models import Stream, Restream, SRSNode, SRSStreamInstance
|
||||
|
||||
|
||||
@admin.register(Stream)
|
||||
|
@ -10,3 +10,18 @@ class StreamAdmin(GuardedModelAdmin):
|
|||
@admin.register(Restream)
|
||||
class RestreamAdmin(GuardedModelAdmin):
|
||||
fields = ['name', 'active', 'stream', 'format', 'target']
|
||||
|
||||
@admin.register(SRSNode)
|
||||
class SRSNodeAdmin(GuardedModelAdmin):
|
||||
fields = ['name', 'api_base', 'rtmp_base', 'active']
|
||||
|
||||
@admin.register(SRSStreamInstance)
|
||||
class SRSStreamInstanceAdmin(GuardedModelAdmin):
|
||||
fields = ['stream', 'node']
|
||||
|
||||
# Stream Instances are just representations of the streams on the SRS server,
|
||||
# and should not be addable/editable. Deleting them can be useful though.
|
||||
def has_change_permission(self, request, obj=None):
|
||||
return False
|
||||
def has_add_permission(self, request):
|
||||
return False
|
|
@ -0,0 +1,42 @@
|
|||
# Generated by Django 5.0.2 on 2024-02-29 17:26
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('config', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='SRSNode',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('name', models.CharField(help_text='srsnode_name_help', max_length=100)),
|
||||
('api_base', models.CharField(help_text='srsnode_api_base_help', max_length=256)),
|
||||
('rtmp_base', models.CharField(help_text='srsnode_rtmp_base_help', max_length=256)),
|
||||
('active', models.BooleanField(help_text='srsnode_active_help')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'srsnode_verbose_name',
|
||||
'verbose_name_plural': 'srsnode_verbose_name_plural',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='SRSStreamInstance',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('last_update', models.DateTimeField(auto_now=True, help_text='srsstreaminstance_last_update_help')),
|
||||
('statusdata', models.TextField(default='{}', help_text='srsstreaminstance_statusdata_help')),
|
||||
('node', models.ForeignKey(help_text='srsstreaminstance_node_help', on_delete=django.db.models.deletion.CASCADE, to='config.srsnode')),
|
||||
('stream', models.ForeignKey(help_text='srsstreaminstance_stream_help', on_delete=django.db.models.deletion.CASCADE, to='config.stream')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'srsstreaminstance_verbose_name',
|
||||
'verbose_name_plural': 'srsstreaminstance_verbose_name_plural',
|
||||
},
|
||||
),
|
||||
]
|
|
@ -7,7 +7,7 @@ from django.urls import reverse
|
|||
from django.utils.translation import gettext as _
|
||||
from django.db.models.signals import pre_delete
|
||||
from portier.common import handlers
|
||||
from . import signals_shared
|
||||
from config import signals_shared
|
||||
|
||||
|
||||
class Stream(models.Model):
|
||||
|
@ -20,32 +20,19 @@ class Stream(models.Model):
|
|||
# and only send signals when we are going to / coming from 0 published streams.
|
||||
publish_counter = models.PositiveIntegerField(default=0)
|
||||
|
||||
def on_publish(self, param):
|
||||
# if so far there were less than one incoming streams, this stream
|
||||
# is now being considered active
|
||||
if self.publish_counter < 1:
|
||||
def save(self, *args, **kwargs):
|
||||
super().save(*args, **kwargs)
|
||||
if self.publish_counter > 0:
|
||||
signals_shared.stream_active.send(sender=self.__class__,
|
||||
stream=str(self.stream),
|
||||
param=param
|
||||
param=None
|
||||
)
|
||||
|
||||
# keep track of this incoming stream
|
||||
self.publish_counter += 1
|
||||
self.save()
|
||||
|
||||
def on_unpublish(self, param):
|
||||
# note that we now have on less incoming stream
|
||||
if self.publish_counter > 0:
|
||||
self.publish_counter -= 1
|
||||
|
||||
# if we now have less than one incoming stream, this stream is being
|
||||
# considered inactive
|
||||
if self.publish_counter < 1:
|
||||
else:
|
||||
signals_shared.stream_inactive.send(sender=self.__class__,
|
||||
stream=str(self.stream),
|
||||
param=param
|
||||
param=None
|
||||
)
|
||||
self.save()
|
||||
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('config:stream_detail', kwargs={'pk': self.pk})
|
||||
|
@ -59,6 +46,7 @@ class Stream(models.Model):
|
|||
|
||||
pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream)
|
||||
|
||||
|
||||
class Restream(models.Model):
|
||||
FORMATS = (
|
||||
('flv', 'flv (RTMP)'),
|
||||
|
@ -95,3 +83,31 @@ class Restream(models.Model):
|
|||
|
||||
|
||||
pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream)
|
||||
|
||||
|
||||
class SRSNode(models.Model):
|
||||
name = models.CharField(max_length=100, help_text=_('srsnode_name_help'))
|
||||
api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help'))
|
||||
rtmp_base = models.CharField(max_length=256, help_text=_('srsnode_rtmp_base_help'))
|
||||
active = models.BooleanField(help_text=_('srsnode_active_help'))
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('srsnode_verbose_name')
|
||||
verbose_name_plural = _('srsnode_verbose_name_plural')
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class SRSStreamInstance(models.Model):
|
||||
node = models.ForeignKey(SRSNode, on_delete=models.CASCADE, help_text=_('srsstreaminstance_node_help'))
|
||||
stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('srsstreaminstance_stream_help'))
|
||||
last_update = models.DateTimeField(auto_now=True, help_text=_('srsstreaminstance_last_update_help'))
|
||||
statusdata = models.TextField(default="{}", help_text=_('srsstreaminstance_statusdata_help'))
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('srsstreaminstance_verbose_name')
|
||||
verbose_name_plural = _('srsstreaminstance_verbose_name_plural')
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.stream} on {self.node}"
|
||||
|
|
|
@ -1,21 +1,20 @@
|
|||
from django.dispatch import receiver
|
||||
from django.db.models.signals import post_save, post_delete
|
||||
from .models import Restream, Stream
|
||||
from config.models import Restream, Stream
|
||||
from config.signals_shared import stream_active, stream_inactive
|
||||
from concierge.models import Task
|
||||
from .signals_shared import stream_active, stream_inactive
|
||||
|
||||
@receiver(stream_active)
|
||||
def create_tasks(sender, **kwargs):
|
||||
def create_restream_tasks(sender, **kwargs):
|
||||
stream = Stream.objects.get(stream=kwargs['stream'])
|
||||
instances = Restream.objects.filter(active=True, stream=stream)
|
||||
for instance in instances:
|
||||
task = Task(stream=instance.stream, type='restream', config_id=instance.id,
|
||||
configuration=instance.get_json_config())
|
||||
task.save()
|
||||
restreams = Restream.objects.filter(active=True, stream=stream)
|
||||
for restream in restreams:
|
||||
Task.objects.get_or_create(stream=restream.stream, type='restream', config_id=restream.id,
|
||||
configuration=restream.get_json_config())
|
||||
|
||||
|
||||
@receiver(post_save, sender=Restream)
|
||||
def update_tasks(sender, **kwargs):
|
||||
def update_restream_tasks(sender, **kwargs):
|
||||
instance = kwargs['instance']
|
||||
# TODO: check for breaking changes using update_fields. This needs custom save_model functions though.
|
||||
|
||||
|
@ -34,7 +33,7 @@ def update_tasks(sender, **kwargs):
|
|||
|
||||
|
||||
@receiver(post_delete, sender=Restream)
|
||||
def delete_tasks(sender, **kwargs):
|
||||
def delete_restream_tasks(sender, **kwargs):
|
||||
instance = kwargs['instance']
|
||||
# Get the current task instance if it exists, and remove it
|
||||
try:
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from celery import shared_task
|
||||
import requests
|
||||
|
||||
from config.models import Stream, SRSNode, SRSStreamInstance
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def scrape_srs_servers():
|
||||
for node in SRSNode.objects.filter(active=True):
|
||||
scrape_srs_server(node)
|
||||
update_stream_counters()
|
||||
|
||||
|
||||
def scrape_srs_server(node: SRSNode):
|
||||
try:
|
||||
response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2)
|
||||
response.raise_for_status()
|
||||
streams = response.json().get('streams', [])
|
||||
|
||||
streamobjs = []
|
||||
|
||||
for streamjson in streams:
|
||||
# find the corresponding stream object by comparing the stream uuid
|
||||
stream = Stream.objects.get(stream=streamjson.get('name'))
|
||||
streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node)
|
||||
streaminstance.statusdata = json.dumps(streamjson)
|
||||
streamobjs.append(stream)
|
||||
|
||||
# Delete the stream instances that are not in the response
|
||||
SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete()
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error('Error while trying to scrape SRS server')
|
||||
logger.error(node)
|
||||
logger.error(e)
|
||||
|
||||
|
||||
def update_stream_counters():
|
||||
for stream in Stream.objects.all():
|
||||
stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all())
|
||||
logger.error(stream.publish_counter)
|
||||
logger.error(SRSStreamInstance.objects.filter(stream=stream).all())
|
||||
stream.save()
|
||||
|
||||
|
||||
@shared_task
|
||||
def async_scrape_srs_servers():
|
||||
scrape_srs_servers()
|
|
@ -40,9 +40,10 @@
|
|||
<p>{% trans "set_this_stream_server_in_encoder" %}</p>
|
||||
<p class="mb-4">
|
||||
<ul>
|
||||
{% settings_value "ADVERTISED_RTMP_HOSTS" as hosts %}
|
||||
{% for host in hosts %}
|
||||
<li><code>rtmp://{{ host }}/{% settings_value "GLOBAL_STREAM_NAMESPACE" %}/</code></p>
|
||||
{% for node in srs_nodes %}
|
||||
{% if node.active %}
|
||||
<li><code>{{ node.rtmp_base }}/{% settings_value "GLOBAL_STREAM_NAMESPACE" %}/</code></p>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
<p>{% trans "set_this_stream_id_in_encoder" %}</p>
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
|
@ -1,18 +1,19 @@
|
|||
from django.urls import path
|
||||
from . import views
|
||||
from config.views import restream, stream
|
||||
from config.views.srs import callback_srs
|
||||
|
||||
app_name = 'config'
|
||||
|
||||
urlpatterns = [
|
||||
path('callback/srs', views.callback_srs, name='callback_srs'),
|
||||
path('streams/', views.StreamList.as_view(), name='stream_list'),
|
||||
path('streams/<int:pk>/', views.StreamDetail.as_view(), name='stream_detail'),
|
||||
path('streams/<int:pk>/change', views.StreamChange.as_view(), name='stream_change'),
|
||||
path('streams/<int:pk>/delete', views.StreamDelete.as_view(), name='stream_delete'),
|
||||
path('streams/create', views.StreamCreate.as_view(), name='stream_create'),
|
||||
path('restream/', views.RestreamList.as_view(), name='restream_list'),
|
||||
path('restream/<int:pk>/', views.RestreamDetail.as_view(), name='restream_detail'),
|
||||
path('restream/<int:pk>/change', views.RestreamUpdate.as_view(), name='restream_change'),
|
||||
path('restream/<int:pk>/delete', views.RestreamDelete.as_view(), name='restream_delete'),
|
||||
path('restream/create', views.RestreamCreate.as_view(), name='restream_create'),
|
||||
path('srs/callback', callback_srs, name='callback_srs'),
|
||||
path('streams/', stream.StreamList.as_view(), name='stream_list'),
|
||||
path('streams/<int:pk>/', stream.StreamDetail.as_view(), name='stream_detail'),
|
||||
path('streams/<int:pk>/change', stream.StreamChange.as_view(), name='stream_change'),
|
||||
path('streams/<int:pk>/delete', stream.StreamDelete.as_view(), name='stream_delete'),
|
||||
path('streams/create', stream.StreamCreate.as_view(), name='stream_create'),
|
||||
path('restream/', restream.RestreamList.as_view(), name='restream_list'),
|
||||
path('restream/<int:pk>/', restream.RestreamDetail.as_view(), name='restream_detail'),
|
||||
path('restream/<int:pk>/change', restream.RestreamUpdate.as_view(), name='restream_change'),
|
||||
path('restream/<int:pk>/delete', restream.RestreamDelete.as_view(), name='restream_delete'),
|
||||
path('restream/create', restream.RestreamCreate.as_view(), name='restream_create'),
|
||||
]
|
||||
|
|
|
@ -1,177 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.http import HttpResponse
|
||||
from django.urls import reverse_lazy
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.contrib.admin.utils import NestedObjects
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
|
||||
from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView
|
||||
from guardian.decorators import permission_required_or_403
|
||||
from guardian.shortcuts import assign_perm
|
||||
|
||||
from . import models, forms
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
def callback_srs(request):
|
||||
if request.method != 'POST':
|
||||
return HttpResponse('1', status=405)
|
||||
|
||||
try:
|
||||
json_data = json.loads(request.body)
|
||||
except json.decoder.JSONDecodeError:
|
||||
return HttpResponse('1', status=400)
|
||||
|
||||
try:
|
||||
app_name = json_data['app']
|
||||
# QUIRK this is a weird bug when pushing from OME to SRS. wtf.
|
||||
# for some reason srs interprets the incoming app as app/stream, and passes this on to portier.
|
||||
# only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm
|
||||
app_name = app_name.split('/')[0]
|
||||
# ENDQUIRK
|
||||
stream_name = json_data['stream']
|
||||
param = json_data['param']
|
||||
except KeyError:
|
||||
return HttpResponse('1', status=401)
|
||||
try:
|
||||
stream = models.Stream.objects.get(stream=stream_name)
|
||||
|
||||
except ObjectDoesNotExist:
|
||||
return HttpResponse('1', status=401)
|
||||
|
||||
if json_data.get('action') == 'on_publish':
|
||||
stream.on_publish(param=param)
|
||||
|
||||
if json_data.get('action') == 'on_unpublish':
|
||||
stream.on_unpublish(param=param)
|
||||
|
||||
return HttpResponse('0')
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_stream'),
|
||||
name='dispatch')
|
||||
class StreamList(ListView):
|
||||
model = models.Stream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.view_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamDetail(DetailView):
|
||||
model = models.Stream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.change_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamChange(UpdateView):
|
||||
model = models.Stream
|
||||
template_name_suffix = '_update_form'
|
||||
fields = ['name']
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_stream'),
|
||||
name='dispatch')
|
||||
class StreamCreate(CreateView):
|
||||
model = models.Stream
|
||||
fields = ['name']
|
||||
|
||||
def form_valid(self, form):
|
||||
valid = super().form_valid(form)
|
||||
if valid:
|
||||
user = self.request.user
|
||||
assign_perm('view_stream', user, self.object)
|
||||
assign_perm('change_stream', user, self.object)
|
||||
assign_perm('delete_stream', user, self.object)
|
||||
return valid
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.delete_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamDelete(DeleteView):
|
||||
model = models.Stream
|
||||
success_url = reverse_lazy('config:stream_list')
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
|
||||
collector = NestedObjects(using='default')
|
||||
collector.collect([self.object])
|
||||
|
||||
context['to_delete'] = collector.nested()
|
||||
|
||||
print(context['to_delete'])
|
||||
return context
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_restream'),
|
||||
name='dispatch')
|
||||
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
||||
class RestreamList(ListView):
|
||||
model = models.Restream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.view_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamDetail(DetailView):
|
||||
model = models.Restream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.change_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamUpdate(UpdateView):
|
||||
model = models.Restream
|
||||
form_class = forms.RestreamFilteredStreamForm
|
||||
template_name_suffix = '_update_form'
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super().get_form_kwargs()
|
||||
kwargs['user'] = self.request.user
|
||||
return kwargs
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_restream'),
|
||||
name='dispatch')
|
||||
class RestreamCreate(CreateView):
|
||||
model = models.Restream
|
||||
form_class = forms.RestreamFilteredStreamForm
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super().get_form_kwargs()
|
||||
kwargs['user'] = self.request.user
|
||||
return kwargs
|
||||
|
||||
def form_valid(self, form):
|
||||
valid = super().form_valid(form)
|
||||
if valid:
|
||||
user = self.request.user
|
||||
assign_perm('view_restream', user, self.object)
|
||||
assign_perm('change_restream', user, self.object)
|
||||
assign_perm('delete_restream', user, self.object)
|
||||
return valid
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.delete_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamDelete(DeleteView):
|
||||
model = models.Restream
|
||||
success_url = reverse_lazy('config:restream_list')
|
|
@ -0,0 +1,76 @@
|
|||
import logging
|
||||
|
||||
from django.urls import reverse_lazy
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.csrf import ensure_csrf_cookie
|
||||
from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView
|
||||
from guardian.decorators import permission_required_or_403
|
||||
from guardian.shortcuts import assign_perm
|
||||
|
||||
from config import models, forms
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_restream'),
|
||||
name='dispatch')
|
||||
@method_decorator(ensure_csrf_cookie, name='dispatch')
|
||||
class RestreamList(ListView):
|
||||
model = models.Restream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.view_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamDetail(DetailView):
|
||||
model = models.Restream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.change_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamUpdate(UpdateView):
|
||||
model = models.Restream
|
||||
form_class = forms.RestreamFilteredStreamForm
|
||||
template_name_suffix = '_update_form'
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super().get_form_kwargs()
|
||||
kwargs['user'] = self.request.user
|
||||
return kwargs
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_restream'),
|
||||
name='dispatch')
|
||||
class RestreamCreate(CreateView):
|
||||
model = models.Restream
|
||||
form_class = forms.RestreamFilteredStreamForm
|
||||
|
||||
def get_form_kwargs(self):
|
||||
kwargs = super().get_form_kwargs()
|
||||
kwargs['user'] = self.request.user
|
||||
return kwargs
|
||||
|
||||
def form_valid(self, form):
|
||||
valid = super().form_valid(form)
|
||||
if valid:
|
||||
user = self.request.user
|
||||
assign_perm('view_restream', user, self.object)
|
||||
assign_perm('change_restream', user, self.object)
|
||||
assign_perm('delete_restream', user, self.object)
|
||||
return valid
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.delete_restream',
|
||||
(models.Restream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class RestreamDelete(DeleteView):
|
||||
model = models.Restream
|
||||
success_url = reverse_lazy('config:restream_list')
|
|
@ -0,0 +1,47 @@
|
|||
import logging
|
||||
import json
|
||||
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.http import HttpResponse
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
|
||||
from config.models import Stream
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from config.tasks import async_scrape_srs_servers
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
def callback_srs(request):
|
||||
if request.method != 'POST':
|
||||
return HttpResponse('1', status=405)
|
||||
|
||||
try:
|
||||
json_data = json.loads(request.body)
|
||||
except json.decoder.JSONDecodeError:
|
||||
return HttpResponse('1', status=400)
|
||||
|
||||
try:
|
||||
app_name = json_data['app']
|
||||
# QUIRK this is a weird bug when pushing from OME to SRS. wtf.
|
||||
# for some reason srs interprets the incoming app as app/stream, and passes this on to portier.
|
||||
# only keep the stuff infront of a (potential) slash, and throw away the rest. problem solved^tm
|
||||
app_name = app_name.split('/')[0]
|
||||
# ENDQUIRK
|
||||
stream_name = json_data['stream']
|
||||
param = json_data['param']
|
||||
except KeyError:
|
||||
return HttpResponse('1', status=401)
|
||||
try:
|
||||
Stream.objects.get(stream=stream_name)
|
||||
|
||||
except ObjectDoesNotExist:
|
||||
return HttpResponse('1', status=401)
|
||||
|
||||
# Scraping the server will make sure we are using the actual data from the server
|
||||
# and updating the count of the stream instances.
|
||||
async_scrape_srs_servers.delay()
|
||||
|
||||
return HttpResponse('0')
|
|
@ -0,0 +1,81 @@
|
|||
import logging
|
||||
|
||||
from django.urls import reverse_lazy
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.contrib.admin.utils import NestedObjects
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.generic import ListView, DetailView, CreateView, DeleteView, UpdateView
|
||||
from guardian.decorators import permission_required_or_403
|
||||
from guardian.shortcuts import assign_perm
|
||||
|
||||
from config import models, forms
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_stream'),
|
||||
name='dispatch')
|
||||
class StreamList(ListView):
|
||||
model = models.Stream
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.view_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamDetail(DetailView):
|
||||
model = models.Stream
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
context["srs_nodes"] = models.SRSNode.objects.all()
|
||||
return context
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.change_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamChange(UpdateView):
|
||||
model = models.Stream
|
||||
template_name_suffix = '_update_form'
|
||||
fields = ['name']
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.add_stream'),
|
||||
name='dispatch')
|
||||
class StreamCreate(CreateView):
|
||||
model = models.Stream
|
||||
fields = ['name']
|
||||
|
||||
def form_valid(self, form):
|
||||
valid = super().form_valid(form)
|
||||
if valid:
|
||||
user = self.request.user
|
||||
assign_perm('view_stream', user, self.object)
|
||||
assign_perm('change_stream', user, self.object)
|
||||
assign_perm('delete_stream', user, self.object)
|
||||
return valid
|
||||
|
||||
|
||||
@method_decorator(login_required, name='dispatch')
|
||||
@method_decorator(permission_required_or_403('config.delete_stream',
|
||||
(models.Stream, 'pk', 'pk')),
|
||||
name='dispatch')
|
||||
class StreamDelete(DeleteView):
|
||||
model = models.Stream
|
||||
success_url = reverse_lazy('config:stream_list')
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
context = super().get_context_data(**kwargs)
|
||||
|
||||
collector = NestedObjects(using='default')
|
||||
collector.collect([self.object])
|
||||
|
||||
context['to_delete'] = collector.nested()
|
||||
|
||||
print(context['to_delete'])
|
||||
return context
|
||||
|
|
@ -31,7 +31,6 @@ CSRF_TRUSTED_ORIGINS = os.environ.get("DJANGO_CSRF_TRUSTED_ORIGINS", default="ht
|
|||
|
||||
DEFAULT_GROUP = 'default'
|
||||
GLOBAL_STREAM_NAMESPACE = 'live'
|
||||
ADVERTISED_RTMP_HOSTS = os.environ.get("ADVERTISED_RTMP_HOSTS", default="localhost").split(" ")
|
||||
|
||||
# Application definition
|
||||
|
||||
|
@ -47,6 +46,7 @@ INSTALLED_APPS = [
|
|||
'django_registration',
|
||||
'bootstrap4',
|
||||
'fontawesome_5',
|
||||
'django_celery_beat',
|
||||
'core.apps.CoreConfig',
|
||||
'config.apps.ConfigConfig',
|
||||
'concierge.apps.ConciergeConfig',
|
||||
|
@ -182,5 +182,41 @@ CELERY_RESULT_BACKEND = "redis://{}:{}".format(os.environ.get('REDIS_HOST', defa
|
|||
CELERY_ACCEPT_CONTENT = ['application/json']
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_BEAT_SCHEDULE = {
|
||||
'config:scrape_srs_servers': {
|
||||
'task': 'config.tasks.async_scrape_srs_servers',
|
||||
'schedule': 5.0
|
||||
},
|
||||
}
|
||||
# Fixes incompatibility with tzlocal and pytz
|
||||
DJANGO_CELERY_BEAT_TZ_AWARE = False
|
||||
|
||||
|
||||
DEFAULT_AUTO_FIELD = 'django.db.models.AutoField'
|
||||
|
||||
if DEBUG:
|
||||
LOGGING = {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'formatters': {
|
||||
'console': {
|
||||
'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler',
|
||||
'formatter': 'console',
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
'django': {
|
||||
'level': 'DEBUG',
|
||||
'handlers': ['console'],
|
||||
},
|
||||
'': {
|
||||
'level': 'DEBUG',
|
||||
'handlers': ['console'],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -3,9 +3,12 @@ django-registration>=3.1
|
|||
django-bootstrap4
|
||||
django-guardian
|
||||
django-fontawesome-5
|
||||
celery>=4.4
|
||||
django-celery-beat
|
||||
django-filter
|
||||
djangorestframework
|
||||
djangorestframework-guardian
|
||||
celery>=5.3
|
||||
gunicorn>=20
|
||||
psycopg2-binary
|
||||
djangorestframework
|
||||
django-filter
|
||||
djangorestframework-guardian
|
||||
requests
|
||||
redis
|
|
@ -5,8 +5,8 @@ from .views import StreamViewSet, RestreamViewSet
|
|||
|
||||
|
||||
router = routers.DefaultRouter()
|
||||
router.register(r'streams', StreamViewSet)
|
||||
router.register(r'restreams', RestreamViewSet)
|
||||
router.register(r'stream', StreamViewSet)
|
||||
router.register(r'restream', RestreamViewSet)
|
||||
|
||||
app_name = 'restapi'
|
||||
|
||||
|
|
|
@ -33,9 +33,9 @@ class RestreamSerializer(ObjectPermissionsAssignmentMixin, serializers.ModelSeri
|
|||
def get_permissions_map(self, created):
|
||||
current_user = self.context['request'].user
|
||||
return {
|
||||
'view_restreamconfig': [current_user],
|
||||
'change_restreamconfig': [current_user],
|
||||
'delete_restreamconfig': [current_user]
|
||||
'view_restream': [current_user],
|
||||
'change_restream': [current_user],
|
||||
'delete_restream': [current_user]
|
||||
}
|
||||
|
||||
def validate_stream(self, value):
|
||||
|
|
|
@ -29,6 +29,8 @@ initialize() {
|
|||
|
||||
wait_for_redis
|
||||
wait_for_database
|
||||
migrate
|
||||
initialize
|
||||
supervisord -n -c /etc/supervisord.conf
|
||||
if [ "${COMPONENT}x" = "webx" ]; then
|
||||
migrate
|
||||
initialize
|
||||
fi
|
||||
supervisord -n -c "/etc/supervisord-${COMPONENT:-web}.conf"
|
|
@ -22,7 +22,7 @@ var app = new Vue({
|
|||
},
|
||||
fetchData() {
|
||||
axios
|
||||
.get('/api/v1/restreams/')
|
||||
.get('/api/v1/restream/')
|
||||
.then(response => {
|
||||
this.cfgs = response.data
|
||||
this.isLoading = false
|
||||
|
|
|
@ -16,7 +16,7 @@ var app = new Vue({
|
|||
},
|
||||
fetchData() {
|
||||
axios
|
||||
.get('/api/v1/streams/')
|
||||
.get('/api/v1/stream/')
|
||||
.then(response => {
|
||||
this.streams = response.data
|
||||
this.isLoading = false
|
||||
|
@ -29,6 +29,6 @@ var app = new Vue({
|
|||
this.fetchData()
|
||||
setInterval(function () {
|
||||
this.fetchData();
|
||||
}.bind(this), 5000);
|
||||
}.bind(this), 1000);
|
||||
}
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue