portier/source/config/models.py

183 lines
6.9 KiB
Python

import json
import uuid
from django.conf import settings
from django.db import models
from django.urls import reverse
from django.utils.translation import gettext as _
from django.db.models.signals import pre_delete
from portier.common import handlers
from config import signals_shared
from config.util import validate_stream_url
class Stream(models.Model):
stream = models.UUIDField(unique=True, default=uuid.uuid4, help_text=_('stream_stream_help'))
name = models.CharField(max_length=100, help_text=_('stream_name_help'))
# the same stream uuid can be published multiple times to different origin
# servers. this is a valid scheme to achieve a failover on the origin layer.
# thus we need to keep track if a stream is published at least once,
# and only send signals when we are going to / coming from 0 published streams.
publish_counter = models.PositiveIntegerField(default=0)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.publish_counter > 0:
signals_shared.stream_active.send(sender=self.__class__,
stream=str(self.stream),
param=None
)
else:
signals_shared.stream_inactive.send(sender=self.__class__,
stream=str(self.stream),
param=None
)
def get_absolute_url(self):
return reverse('config:stream_detail', kwargs={'pk': self.pk})
def class_name(self):
return _('stream_class_name')
def __str__(self):
return str(self.name)
pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream)
class SRSNode(models.Model):
name = models.CharField(max_length=100, help_text=_('srsnode_name_help'))
api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help'))
rtmp_base = models.CharField(max_length=256, help_text=_('srsnode_rtmp_base_help'))
active = models.BooleanField(help_text=_('srsnode_active_help'))
class Meta:
verbose_name = _('srsnode_verbose_name')
verbose_name_plural = _('srsnode_verbose_name_plural')
def __str__(self):
return self.name
class SRSStreamInstance(models.Model):
node = models.ForeignKey(SRSNode, on_delete=models.CASCADE, help_text=_('srsstreaminstance_node_help'))
stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('srsstreaminstance_stream_help'))
last_update = models.DateTimeField(auto_now=True, help_text=_('srsstreaminstance_last_update_help'))
statusdata = models.TextField(default="{}", help_text=_('srsstreaminstance_statusdata_help'))
class Meta:
verbose_name = _('srsstreaminstance_verbose_name')
verbose_name_plural = _('srsstreaminstance_verbose_name_plural')
def __str__(self):
return f"{self.stream} on {self.node}"
class Pull(models.Model):
stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('pull_stream_help'))
source = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('pull_source_help'))
active = models.BooleanField(help_text=_('pull_activate_help'))
name = models.CharField(max_length=100, help_text=_('pull_name_help'))
class Meta:
verbose_name = _('pull_verbose_name')
verbose_name_plural = _('pull_verbose_name_plural')
def class_name(self):
return _('pull_class_name')
def get_absolute_url(self):
return reverse('config:pull_detail', kwargs={'pk': self.pk})
def __str__(self):
return str(self.name)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.active:
signals_shared.pull_active.send(
sender=self.__class__,
pull_id=self.id,
)
else:
signals_shared.pull_inactive.send(
sender=self.__class__,
pull_id=self.id,
)
def get_concierge_configuration(self):
if not self.active:
return None
# Select a random node that is active to ingest the pulled stream.
node = SRSNode.objects.filter(active=True).order_by('?').first()
return {
"type": "pull",
"active": self.active,
"name": self.name,
"source": self.source,
"target": f"{node.rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}",
}
pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Pull)
class Restream(models.Model):
FORMATS = (
('flv', 'flv (RTMP)'),
('mpegts', 'mpegts (SRT)'),
)
stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('restream_stream_help'))
target = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('restream_target_help'))
name = models.CharField(max_length=100, help_text=_('restream_name_help'))
active = models.BooleanField(help_text=_('restream_activate_help'))
format = models.CharField(max_length=6, choices=FORMATS, default='flv', help_text=_('restream_format_help'))
class Meta:
verbose_name = _('restream_verbose_name')
verbose_name_plural = _('restream_verbose_name_plural')
def class_name(self):
return _('restream_class_name')
def get_absolute_url(self):
return reverse('config:restream_detail', kwargs={'pk': self.pk})
def __str__(self):
return '{} to {}'.format(self.stream, self.name)
def get_concierge_configuration(self):
stream_instances = SRSStreamInstance.objects.filter(stream=self.stream).all()
if not stream_instances:
return None
# We use the actual SRS node of the first stream instance to make sure
# that we use the same node for the restreaming. This avoids unnecessary
# traffic between the SRS nodes. In theory a clustered SRS setup could
# provide the same stream on all edges, but that would mean east-west
# traffic between the nodes.
# This assumption breaks when a second stream instance appears on another
# SRS node, and only afterwards the first stream instance is removed.
# In that case, the restreaming would not be retriggered, and the edge
# would automatically start pulling the stream from the other SRS node.
# It's a tradeoff between stability and efficiency. We choose stability.
rtmp_base = stream_instances[0].node.rtmp_base
return {
'config_version': 1,
'stream_source_url': f"{rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}",
'stream_target_url': self.target,
'stream_target_transport': self.format,
}
pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream)