diff --git a/concierge/admin.py b/concierge/admin.py index 8b9cc05..887586f 100644 --- a/concierge/admin.py +++ b/concierge/admin.py @@ -1,5 +1,5 @@ from django.contrib import admin -from .models import Identity, Task, Claim +from .models import Identity, Task class IdentityAdmin(admin.ModelAdmin): @@ -7,13 +7,8 @@ class IdentityAdmin(admin.ModelAdmin): class TaskAdmin(admin.ModelAdmin): - fields = ['stream', 'type', 'configuration'] - - -class ClaimAdmin(admin.ModelAdmin): - fields = ['owner', 'task'] + fields = ['stream', 'type', 'configuration', 'claimed_by'] admin.site.register(Identity, IdentityAdmin) admin.site.register(Task, TaskAdmin) -admin.site.register(Claim, ClaimAdmin) diff --git a/concierge/migrations/0002_auto_20200426_1834.py b/concierge/migrations/0002_auto_20200426_1834.py new file mode 100644 index 0000000..a627380 --- /dev/null +++ b/concierge/migrations/0002_auto_20200426_1834.py @@ -0,0 +1,19 @@ +# Generated by Django 3.0.5 on 2020-04-26 18:34 + +from django.db import migrations, models +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('concierge', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='claim', + name='id', + field=models.CharField(default=uuid.uuid4, max_length=36, primary_key=True, serialize=False, unique=True), + ), + ] diff --git a/concierge/migrations/0003_auto_20200426_1835.py b/concierge/migrations/0003_auto_20200426_1835.py new file mode 100644 index 0000000..7c78b95 --- /dev/null +++ b/concierge/migrations/0003_auto_20200426_1835.py @@ -0,0 +1,23 @@ +# Generated by Django 3.0.5 on 2020-04-26 18:35 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('concierge', '0002_auto_20200426_1834'), + ] + + operations = [ + migrations.AlterField( + model_name='identity', + name='heartbeat', + field=models.DateTimeField(blank=True, null=True), + ), + migrations.AlterField( + model_name='identity', + name='notes', + field=models.TextField(blank=True), + ), + ] diff --git a/concierge/migrations/0004_auto_20200426_1912.py b/concierge/migrations/0004_auto_20200426_1912.py new file mode 100644 index 0000000..ea8829c --- /dev/null +++ b/concierge/migrations/0004_auto_20200426_1912.py @@ -0,0 +1,28 @@ +# Generated by Django 3.0.5 on 2020-04-26 19:12 + +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('concierge', '0003_auto_20200426_1835'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='claimed_by', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity'), + ), + migrations.AddField( + model_name='task', + name='uuid', + field=models.UUIDField(default=uuid.uuid4, serialize=False, unique=True), + ), + migrations.DeleteModel( + name='Claim', + ), + ] diff --git a/concierge/migrations/0005_auto_20200426_2007.py b/concierge/migrations/0005_auto_20200426_2007.py new file mode 100644 index 0000000..15293c8 --- /dev/null +++ b/concierge/migrations/0005_auto_20200426_2007.py @@ -0,0 +1,30 @@ +# Generated by Django 3.0.5 on 2020-04-26 20:07 + +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('concierge', '0004_auto_20200426_1912'), + ] + + operations = [ + migrations.AlterField( + model_name='identity', + name='identity', + field=models.UUIDField(default=uuid.uuid4, unique=True), + ), + migrations.AlterField( + model_name='task', + name='claimed_by', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity'), + ), + migrations.AlterField( + model_name='task', + name='uuid', + field=models.UUIDField(default=uuid.uuid4, unique=True), + ), + ] diff --git a/concierge/models.py b/concierge/models.py index 6e210ee..c809964 100644 --- a/concierge/models.py +++ b/concierge/models.py @@ -6,23 +6,20 @@ from rtmp.models import Stream class Identity(models.Model): # models a concierge identity. every running concierge needs to have a # unique identity that is being used for task claims, etc. - identity = models.CharField(max_length=36, unique=True, default=uuid.uuid4) + identity = models.UUIDField(unique=True, default=uuid.uuid4) name = models.CharField(max_length=100) - notes = models.TextField() + notes = models.TextField(blank=True) # heartbeat indicates last point in time that this identity was seen. # some cronjob should scan the heartbeats and release all claims by # identities that have not been seen in a while. this interval should # be quite short so that the tasks can be claimed by other identities asap. - heartbeat = models.DateTimeField(blank=True) + heartbeat = models.DateTimeField(blank=True, null=True) class Task(models.Model): + uuid = models.UUIDField(unique=True, default=uuid.uuid4) stream = models.ForeignKey(Stream, on_delete=models.CASCADE) type = models.CharField(max_length=100) configuration = models.TextField() - - -class Claim(models.Model): - owner = models.ForeignKey(Identity, on_delete=models.CASCADE) - task = models.ForeignKey(Task, on_delete=models.CASCADE) + claimed_by = models.ForeignKey(Identity, null=True, blank=True, on_delete=models.CASCADE) diff --git a/concierge/urls.py b/concierge/urls.py new file mode 100644 index 0000000..0c33e30 --- /dev/null +++ b/concierge/urls.py @@ -0,0 +1,8 @@ +from django.urls import path +from . import views + +urlpatterns = [ + path('api//heartbeat', views.heartbeat, name='heartbeat'), + path('api//claim/', views.claim, name='claim'), + path('api//release/', views.release, name='release'), +] diff --git a/concierge/views.py b/concierge/views.py index 6100593..bc6afbb 100644 --- a/concierge/views.py +++ b/concierge/views.py @@ -1,3 +1,95 @@ -from django.shortcuts import render # noqa +import json +import logging -# Create your views here. +from django.db import transaction +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_POST +from django.core.exceptions import ObjectDoesNotExist +from django.utils.timezone import now + +from .models import Identity, Task + + +@csrf_exempt +@require_POST +def heartbeat(request, identity): + try: + id = Identity.objects.get(identity=identity) + except ObjectDoesNotExist: + return JsonResponse({'error': 'identity unknown'}, status=403) + + # update heartbeat + id.heartbeat = now() + id.save() + + # get current claims and available tasks + claims = Task.objects.filter(claimed_by=id).all() + available = Task.objects.filter(claimed_by=None).all() + + data = { + 'success': True, + 'claims': [{'uuid': str(o.uuid)} for o in list(claims)], + 'available': [{'uuid': str(o.uuid), 'type': o.type} for o in list(available)], + } + + return JsonResponse(data) + + +@csrf_exempt +@require_POST +def claim(request, identity, task_uuid): + try: + id = Identity.objects.get(identity=identity) + except ObjectDoesNotExist: + return JsonResponse({'error': 'identity unknown'}, status=403) + + with transaction.atomic(): + try: + task = Task.objects.get(uuid=task_uuid) + except ObjectDoesNotExist: + return JsonResponse({'error': 'task unknown'}, status=404) + + if task.claimed_by: + return JsonResponse({'error': 'task already claimed'}, status=423) + + task.claimed_by = id + task.save() + + data = { + 'success': True, + 'uuid': task.uuid, + 'type': task.type, + 'configuration': json.loads(task.configuration) + } + + return JsonResponse(data) + + +@csrf_exempt +@require_POST +def release(request, identity, task_uuid): + try: + id = Identity.objects.get(identity=identity) + except ObjectDoesNotExist: + return JsonResponse({'error': 'identity unknown'}, status=403) + + with transaction.atomic(): + try: + task = Task.objects.get(uuid=task_uuid) + except ObjectDoesNotExist: + return JsonResponse({'error': 'task unknown'}, status=404) + + if task.claimed_by != id: + return JsonResponse({'error': 'task claimed by other identity'}, status=403) + + task.claimed_by = None + task.save() + + data = { + 'success': True, + 'uuid': task.uuid, + 'type': task.type, + } + + return JsonResponse(data) diff --git a/portier/urls.py b/portier/urls.py index 416fdbd..c97f89b 100644 --- a/portier/urls.py +++ b/portier/urls.py @@ -19,5 +19,6 @@ from django.urls import include, path urlpatterns = [ path('admin/', admin.site.urls), path('rtmp/', include('rtmp.urls')), + path('concierge/', include('concierge.urls')), path('', include('portal.urls')), ] diff --git a/restream/signals.py b/restream/signals.py index 0331bfa..fba6a33 100644 --- a/restream/signals.py +++ b/restream/signals.py @@ -3,12 +3,21 @@ from rtmp.signals import stream_active from .models import RestreamConfig from rtmp.models import Stream from concierge.models import Task +import json @receiver(stream_active) def create_tasks(sender, **kwargs): stream = Stream.objects.get(stream=kwargs['stream']) - configs = RestreamConfig.objects.filter(stream=stream) - for config in configs: - task = Task(stream=stream, type='restream', configuration='{}') + instances = RestreamConfig.objects.filter(active=True, stream=stream) + for inst in instances: + config = { + 'name': inst.name, + 'app': inst.stream.application.name, + 'stream': str(inst.stream.stream), + 'target': inst.target + } + + json_config = json.dumps(config) + task = Task(stream=stream, type='restream', configuration=json_config) task.save() diff --git a/rtmp/migrations/0003_auto_20200426_1834.py b/rtmp/migrations/0003_auto_20200426_1834.py new file mode 100644 index 0000000..b6d0ee8 --- /dev/null +++ b/rtmp/migrations/0003_auto_20200426_1834.py @@ -0,0 +1,19 @@ +# Generated by Django 3.0.5 on 2020-04-26 18:34 + +from django.db import migrations, models +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('rtmp', '0002_stream_publish_counter'), + ] + + operations = [ + migrations.AlterField( + model_name='stream', + name='stream', + field=models.UUIDField(default=uuid.uuid4, unique=True), + ), + ] diff --git a/rtmp/models.py b/rtmp/models.py index 8ab82a6..0828e99 100644 --- a/rtmp/models.py +++ b/rtmp/models.py @@ -14,7 +14,7 @@ class Application(models.Model): class Stream(models.Model): application = models.ForeignKey(Application, on_delete=models.CASCADE) - stream = models.CharField(max_length=64, unique=True, default=uuid.uuid4) + stream = models.UUIDField(unique=True, default=uuid.uuid4) name = models.CharField(max_length=100) # the same stream uuid can be published multiple times to different origin @@ -28,7 +28,7 @@ class Stream(models.Model): # is now being considered active if self.publish_counter < 1: signals.stream_active.send(sender=self.__class__, - stream=self.stream, + stream=str(self.stream), param=param ) @@ -45,7 +45,7 @@ class Stream(models.Model): # considered inactive if self.publish_counter < 1: signals.stream_inactive.send(sender=self.__class__, - stream=self.stream, + stream=str(self.stream), param=param ) self.save()