import json import uuid from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.conf import settings from django.db import models from django.urls import reverse from django.utils.translation import gettext as _ from django.db.models.signals import pre_delete from portier.common import handlers from config import signals_shared from config.util import validate_stream_url class TranscodingProfile(models.Model): VIDEO_CODECS = ( ('copy', 'Copy'), ('h264', 'H.264'), ('h265', 'H.265'), ) AUDIO_CODECS = ( ('copy', 'Copy'), ('aac', 'AAC'), ) VIDEO_PIXEL_FORMATS = ( ('yuv420', 'YUV420'), ('yuv422', 'YUV422'), ('yuv444', 'YUV444'), ) VIDEO_BITRATE_MODES = ( ('cbr', 'CBR'), ('vbr', 'VBR'), ) name = models.CharField(max_length=100, help_text=_('transcodingprofile_name_help')) video_map_stream = models.PositiveIntegerField(default=0, help_text=_('transcodingprofile_video_map_stream_help')) video_codec = models.CharField(max_length=4, choices=VIDEO_CODECS, help_text=_('transcodingprofile_video_codec_help')) video_bitrate = models.PositiveIntegerField(default=6000, help_text=_('transcodingprofile_video_bitrate_help')) video_frame_rate = models.PositiveIntegerField(default=30, help_text=_('transcodingprofile_video_frame_rate_help')) video_resolution = models.CharField(max_length=9, help_text=_('transcodingprofile_video_resolution_help')) video_gop_size = models.PositiveIntegerField(default=60, help_text=_('transcodingprofile_video_gop_size_help')) video_pixel_format = models.CharField(max_length=10, choices=VIDEO_PIXEL_FORMATS, help_text=_('transcodingprofile_video_pixel_format_help')) video_bitrate_mode = models.CharField(max_length=10, choices=VIDEO_BITRATE_MODES, help_text=_('transcodingprofile_video_bitrate_mode_help')) audio_map_stream = models.PositiveIntegerField(default=0, help_text=_('transcodingprofile_audio_map_stream_help')) audio_codec = models.CharField(max_length=10, choices=AUDIO_CODECS, help_text=_('transcodingprofile_audio_codec_help')) audio_bitrate = models.PositiveIntegerField(default=160, help_text=_('transcodingprofile_audio_bitrate_help')) audio_channels = models.PositiveIntegerField(default=2, help_text=_('transcodingprofile_audio_channels_help')) audio_sample_rate = models.PositiveIntegerField(default=48000, help_text=_('transcodingprofile_audio_sample_rate_help')) class Meta: verbose_name = _('transcodingprofile_verbose_name') verbose_name_plural = _('transcodingprofile_verbose_name_plural') def class_name(self): return _('transcodingprofile_class_name') def get_absolute_url(self): return reverse('config:transcodingprofile_detail', kwargs={'pk': self.pk}) def __str__(self): return str(self.name) def get_concierge_configuration(self): return { 'config_version': 1, 'video_map_stream': self.video_map_stream, 'video_codec': self.video_codec, 'video_bitrate': self.video_bitrate, 'video_frame_rate': self.video_frame_rate, 'video_resolution': self.video_resolution, 'audio_map_stream': self.audio_map_stream, 'audio_codec': self.audio_codec, 'audio_bitrate': self.audio_bitrate, 'audio_channels': self.audio_channels, 'audio_sample_rate': self.audio_sample_rate, } class Stream(models.Model): stream = models.UUIDField(unique=True, default=uuid.uuid4, help_text=_('stream_stream_help')) name = models.CharField(max_length=100, help_text=_('stream_name_help')) # the same stream uuid can be published multiple times to different origin # servers. this is a valid scheme to achieve a failover on the origin layer. # thus we need to keep track if a stream is published at least once, # and only send signals when we are going to / coming from 0 published streams. publish_counter = models.PositiveIntegerField(default=0) def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.publish_counter > 0: signals_shared.stream_active.send(sender=self.__class__, stream=str(self.stream), param=None ) else: signals_shared.stream_inactive.send(sender=self.__class__, stream=str(self.stream), param=None ) def get_absolute_url(self): return reverse('config:stream_detail', kwargs={'pk': self.pk}) def class_name(self): return _('stream_class_name') def __str__(self): return str(self.name) class SRSNode(models.Model): name = models.CharField(max_length=100, help_text=_('srsnode_name_help')) api_base = models.CharField(max_length=256, help_text=_('srsnode_api_base_help')) rtmp_base = models.CharField(max_length=256, help_text=_('srsnode_rtmp_base_help')) active = models.BooleanField(help_text=_('srsnode_active_help')) class Meta: verbose_name = _('srsnode_verbose_name') verbose_name_plural = _('srsnode_verbose_name_plural') def __str__(self): return self.name class SRSStreamInstance(models.Model): node = models.ForeignKey(SRSNode, on_delete=models.CASCADE, help_text=_('srsstreaminstance_node_help')) stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('srsstreaminstance_stream_help')) last_update = models.DateTimeField(auto_now=True, help_text=_('srsstreaminstance_last_update_help')) statusdata = models.TextField(default="{}", help_text=_('srsstreaminstance_statusdata_help')) class Meta: verbose_name = _('srsstreaminstance_verbose_name') verbose_name_plural = _('srsstreaminstance_verbose_name_plural') def __str__(self): return f"{self.stream} on {self.node}" class Pull(models.Model): stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('pull_stream_help')) source = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('pull_source_help')) active = models.BooleanField(help_text=_('pull_activate_help')) name = models.CharField(max_length=100, help_text=_('pull_name_help')) transcodingprofile = models.ForeignKey(TranscodingProfile, null=True, blank=True, on_delete=models.PROTECT, help_text=_('restream_transcodingprofile_help')) class Meta: verbose_name = _('pull_verbose_name') verbose_name_plural = _('pull_verbose_name_plural') def class_name(self): return _('pull_class_name') def get_absolute_url(self): return reverse('config:pull_detail', kwargs={'pk': self.pk}) def __str__(self): return str(self.name) def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.active: signals_shared.pull_active.send( sender=self.__class__, pull_id=self.id, ) else: signals_shared.pull_inactive.send( sender=self.__class__, pull_id=self.id, ) def get_concierge_configuration(self): if not self.active: return None # Select a random node that is active to ingest the pulled stream. node = SRSNode.objects.filter(active=True).order_by('?').first() return { 'config_version': 1, "type": "pull", "active": self.active, "name": self.name, "source": self.source, "target": f"{node.rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}", 'transcoding_profile': self.transcodingprofile.get_concierge_configuration(), } class Restream(models.Model): FORMATS = ( ('flv', 'flv (RTMP)'), ('mpegts', 'mpegts (SRT)'), ) stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('restream_stream_help')) target = models.CharField(max_length=500, validators=[validate_stream_url], help_text=_('restream_target_help')) name = models.CharField(max_length=100, help_text=_('restream_name_help')) active = models.BooleanField(help_text=_('restream_activate_help')) format = models.CharField(max_length=6, choices=FORMATS, default='flv', help_text=_('restream_format_help')) transcodingprofile = models.ForeignKey(TranscodingProfile, null=True, blank=True, on_delete=models.PROTECT, help_text=_('restream_transcodingprofile_help')) class Meta: verbose_name = _('restream_verbose_name') verbose_name_plural = _('restream_verbose_name_plural') def class_name(self): return _('restream_class_name') def get_absolute_url(self): return reverse('config:restream_detail', kwargs={'pk': self.pk}) def __str__(self): return '{} to {}'.format(self.stream, self.name) def get_concierge_configuration(self): stream_instances = SRSStreamInstance.objects.filter(stream=self.stream).all() if not stream_instances: return None # We use the actual SRS node of the first stream instance to make sure # that we use the same node for the restreaming. This avoids unnecessary # traffic between the SRS nodes. In theory a clustered SRS setup could # provide the same stream on all edges, but that would mean east-west # traffic between the nodes. # This assumption breaks when a second stream instance appears on another # SRS node, and only afterwards the first stream instance is removed. # In that case, the restreaming would not be retriggered, and the edge # would automatically start pulling the stream from the other SRS node. # It's a tradeoff between stability and efficiency. We choose stability. rtmp_base = stream_instances[0].node.rtmp_base return { 'config_version': 1, 'stream_source_url': f"{rtmp_base}/{settings.GLOBAL_STREAM_NAMESPACE}/{self.stream.stream}", 'stream_target_url': self.target, 'stream_target_transport': self.format, 'transcoding_profile': self.transcodingprofile.get_concierge_configuration(), } pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Stream) pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Restream) pre_delete.connect(handlers.remove_obj_perms_connected_with_user, sender=Pull) ### Recording Storage configurations class RecordingStorage(models.Model): name = models.CharField(max_length=100, help_text=_('recordingstorage_name_help')) class Meta: abstract = True class LocalRecordingStorage(RecordingStorage): path = models.CharField(max_length=500, help_text=_('localrecordingstorage_path_help')) class Meta: verbose_name = _('localrecordingstorage_verbose_name') verbose_name_plural = _('localrecordingstorage_verbose_name_plural') def __str__(self): return self.name class S3RecordingStorage(RecordingStorage): bucket = models.CharField(max_length=100, help_text=_('s3recordingstorage_bucket_help')) access_key = models.CharField(max_length=100, help_text=_('s3recordingstorage_access_key_help')) secret_key = models.CharField(max_length=100, help_text=_('s3recordingstorage_secret_key_help')) region = models.CharField(max_length=100, help_text=_('s3recordingstorage_region_help')) class Meta: verbose_name = _('s3recordingstorage_verbose_name') verbose_name_plural = _('s3recordingstorage_verbose_name_plural') def __str__(self): return self.name class Recorder(models.Model): storage_type_models = models.Q(app_label = 'config', model = 'LocalRecordingStorage') | models.Q(app_label = 'app', model = 'S3RecordingStorage') stream = models.ForeignKey(Stream, on_delete=models.CASCADE, help_text=_('recorder_stream_help')) name = models.CharField(max_length=100, help_text=_('recorder_name_help')) active = models.BooleanField(help_text=_('recorder_activate_help')) storage_type = models.ForeignKey(ContentType, limit_choices_to=storage_type_models, on_delete=models.CASCADE) storage_config_id = models.PositiveIntegerField() storage_config = GenericForeignKey('storage_type', 'storage_config_id')