#4 change some model fields to proper uuid type; add api to claim and release tasks
This commit is contained in:
parent
ca65c4eaa7
commit
5feca3ff62
|
@ -1,5 +1,5 @@
|
||||||
from django.contrib import admin
|
from django.contrib import admin
|
||||||
from .models import Identity, Task, Claim
|
from .models import Identity, Task
|
||||||
|
|
||||||
|
|
||||||
class IdentityAdmin(admin.ModelAdmin):
|
class IdentityAdmin(admin.ModelAdmin):
|
||||||
|
@ -7,13 +7,8 @@ class IdentityAdmin(admin.ModelAdmin):
|
||||||
|
|
||||||
|
|
||||||
class TaskAdmin(admin.ModelAdmin):
|
class TaskAdmin(admin.ModelAdmin):
|
||||||
fields = ['stream', 'type', 'configuration']
|
fields = ['stream', 'type', 'configuration', 'claimed_by']
|
||||||
|
|
||||||
|
|
||||||
class ClaimAdmin(admin.ModelAdmin):
|
|
||||||
fields = ['owner', 'task']
|
|
||||||
|
|
||||||
|
|
||||||
admin.site.register(Identity, IdentityAdmin)
|
admin.site.register(Identity, IdentityAdmin)
|
||||||
admin.site.register(Task, TaskAdmin)
|
admin.site.register(Task, TaskAdmin)
|
||||||
admin.site.register(Claim, ClaimAdmin)
|
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-26 18:34
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('concierge', '0001_initial'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='claim',
|
||||||
|
name='id',
|
||||||
|
field=models.CharField(default=uuid.uuid4, max_length=36, primary_key=True, serialize=False, unique=True),
|
||||||
|
),
|
||||||
|
]
|
|
@ -0,0 +1,23 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-26 18:35
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('concierge', '0002_auto_20200426_1834'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='identity',
|
||||||
|
name='heartbeat',
|
||||||
|
field=models.DateTimeField(blank=True, null=True),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='identity',
|
||||||
|
name='notes',
|
||||||
|
field=models.TextField(blank=True),
|
||||||
|
),
|
||||||
|
]
|
|
@ -0,0 +1,28 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-26 19:12
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import django.db.models.deletion
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('concierge', '0003_auto_20200426_1835'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='task',
|
||||||
|
name='claimed_by',
|
||||||
|
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity'),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='task',
|
||||||
|
name='uuid',
|
||||||
|
field=models.UUIDField(default=uuid.uuid4, serialize=False, unique=True),
|
||||||
|
),
|
||||||
|
migrations.DeleteModel(
|
||||||
|
name='Claim',
|
||||||
|
),
|
||||||
|
]
|
|
@ -0,0 +1,30 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-26 20:07
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import django.db.models.deletion
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('concierge', '0004_auto_20200426_1912'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='identity',
|
||||||
|
name='identity',
|
||||||
|
field=models.UUIDField(default=uuid.uuid4, unique=True),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='task',
|
||||||
|
name='claimed_by',
|
||||||
|
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='concierge.Identity'),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='task',
|
||||||
|
name='uuid',
|
||||||
|
field=models.UUIDField(default=uuid.uuid4, unique=True),
|
||||||
|
),
|
||||||
|
]
|
|
@ -6,23 +6,20 @@ from rtmp.models import Stream
|
||||||
class Identity(models.Model):
|
class Identity(models.Model):
|
||||||
# models a concierge identity. every running concierge needs to have a
|
# models a concierge identity. every running concierge needs to have a
|
||||||
# unique identity that is being used for task claims, etc.
|
# unique identity that is being used for task claims, etc.
|
||||||
identity = models.CharField(max_length=36, unique=True, default=uuid.uuid4)
|
identity = models.UUIDField(unique=True, default=uuid.uuid4)
|
||||||
name = models.CharField(max_length=100)
|
name = models.CharField(max_length=100)
|
||||||
notes = models.TextField()
|
notes = models.TextField(blank=True)
|
||||||
|
|
||||||
# heartbeat indicates last point in time that this identity was seen.
|
# heartbeat indicates last point in time that this identity was seen.
|
||||||
# some cronjob should scan the heartbeats and release all claims by
|
# some cronjob should scan the heartbeats and release all claims by
|
||||||
# identities that have not been seen in a while. this interval should
|
# identities that have not been seen in a while. this interval should
|
||||||
# be quite short so that the tasks can be claimed by other identities asap.
|
# be quite short so that the tasks can be claimed by other identities asap.
|
||||||
heartbeat = models.DateTimeField(blank=True)
|
heartbeat = models.DateTimeField(blank=True, null=True)
|
||||||
|
|
||||||
|
|
||||||
class Task(models.Model):
|
class Task(models.Model):
|
||||||
|
uuid = models.UUIDField(unique=True, default=uuid.uuid4)
|
||||||
stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
|
stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
|
||||||
type = models.CharField(max_length=100)
|
type = models.CharField(max_length=100)
|
||||||
configuration = models.TextField()
|
configuration = models.TextField()
|
||||||
|
claimed_by = models.ForeignKey(Identity, null=True, blank=True, on_delete=models.CASCADE)
|
||||||
|
|
||||||
class Claim(models.Model):
|
|
||||||
owner = models.ForeignKey(Identity, on_delete=models.CASCADE)
|
|
||||||
task = models.ForeignKey(Task, on_delete=models.CASCADE)
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
from django.urls import path
|
||||||
|
from . import views
|
||||||
|
|
||||||
|
urlpatterns = [
|
||||||
|
path('api/<uuid:identity>/heartbeat', views.heartbeat, name='heartbeat'),
|
||||||
|
path('api/<uuid:identity>/claim/<uuid:task_uuid>', views.claim, name='claim'),
|
||||||
|
path('api/<uuid:identity>/release/<uuid:task_uuid>', views.release, name='release'),
|
||||||
|
]
|
|
@ -1,3 +1,95 @@
|
||||||
from django.shortcuts import render # noqa
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
# Create your views here.
|
from django.db import transaction
|
||||||
|
from django.http import JsonResponse
|
||||||
|
from django.views.decorators.csrf import csrf_exempt
|
||||||
|
from django.views.decorators.http import require_POST
|
||||||
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
|
from django.utils.timezone import now
|
||||||
|
|
||||||
|
from .models import Identity, Task
|
||||||
|
|
||||||
|
|
||||||
|
@csrf_exempt
|
||||||
|
@require_POST
|
||||||
|
def heartbeat(request, identity):
|
||||||
|
try:
|
||||||
|
id = Identity.objects.get(identity=identity)
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return JsonResponse({'error': 'identity unknown'}, status=403)
|
||||||
|
|
||||||
|
# update heartbeat
|
||||||
|
id.heartbeat = now()
|
||||||
|
id.save()
|
||||||
|
|
||||||
|
# get current claims and available tasks
|
||||||
|
claims = Task.objects.filter(claimed_by=id).all()
|
||||||
|
available = Task.objects.filter(claimed_by=None).all()
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'success': True,
|
||||||
|
'claims': [{'uuid': str(o.uuid)} for o in list(claims)],
|
||||||
|
'available': [{'uuid': str(o.uuid), 'type': o.type} for o in list(available)],
|
||||||
|
}
|
||||||
|
|
||||||
|
return JsonResponse(data)
|
||||||
|
|
||||||
|
|
||||||
|
@csrf_exempt
|
||||||
|
@require_POST
|
||||||
|
def claim(request, identity, task_uuid):
|
||||||
|
try:
|
||||||
|
id = Identity.objects.get(identity=identity)
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return JsonResponse({'error': 'identity unknown'}, status=403)
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
task = Task.objects.get(uuid=task_uuid)
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return JsonResponse({'error': 'task unknown'}, status=404)
|
||||||
|
|
||||||
|
if task.claimed_by:
|
||||||
|
return JsonResponse({'error': 'task already claimed'}, status=423)
|
||||||
|
|
||||||
|
task.claimed_by = id
|
||||||
|
task.save()
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'success': True,
|
||||||
|
'uuid': task.uuid,
|
||||||
|
'type': task.type,
|
||||||
|
'configuration': json.loads(task.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
return JsonResponse(data)
|
||||||
|
|
||||||
|
|
||||||
|
@csrf_exempt
|
||||||
|
@require_POST
|
||||||
|
def release(request, identity, task_uuid):
|
||||||
|
try:
|
||||||
|
id = Identity.objects.get(identity=identity)
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return JsonResponse({'error': 'identity unknown'}, status=403)
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
try:
|
||||||
|
task = Task.objects.get(uuid=task_uuid)
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return JsonResponse({'error': 'task unknown'}, status=404)
|
||||||
|
|
||||||
|
if task.claimed_by != id:
|
||||||
|
return JsonResponse({'error': 'task claimed by other identity'}, status=403)
|
||||||
|
|
||||||
|
task.claimed_by = None
|
||||||
|
task.save()
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'success': True,
|
||||||
|
'uuid': task.uuid,
|
||||||
|
'type': task.type,
|
||||||
|
}
|
||||||
|
|
||||||
|
return JsonResponse(data)
|
||||||
|
|
|
@ -19,5 +19,6 @@ from django.urls import include, path
|
||||||
urlpatterns = [
|
urlpatterns = [
|
||||||
path('admin/', admin.site.urls),
|
path('admin/', admin.site.urls),
|
||||||
path('rtmp/', include('rtmp.urls')),
|
path('rtmp/', include('rtmp.urls')),
|
||||||
|
path('concierge/', include('concierge.urls')),
|
||||||
path('', include('portal.urls')),
|
path('', include('portal.urls')),
|
||||||
]
|
]
|
||||||
|
|
|
@ -3,12 +3,21 @@ from rtmp.signals import stream_active
|
||||||
from .models import RestreamConfig
|
from .models import RestreamConfig
|
||||||
from rtmp.models import Stream
|
from rtmp.models import Stream
|
||||||
from concierge.models import Task
|
from concierge.models import Task
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
@receiver(stream_active)
|
@receiver(stream_active)
|
||||||
def create_tasks(sender, **kwargs):
|
def create_tasks(sender, **kwargs):
|
||||||
stream = Stream.objects.get(stream=kwargs['stream'])
|
stream = Stream.objects.get(stream=kwargs['stream'])
|
||||||
configs = RestreamConfig.objects.filter(stream=stream)
|
instances = RestreamConfig.objects.filter(active=True, stream=stream)
|
||||||
for config in configs:
|
for inst in instances:
|
||||||
task = Task(stream=stream, type='restream', configuration='{}')
|
config = {
|
||||||
|
'name': inst.name,
|
||||||
|
'app': inst.stream.application.name,
|
||||||
|
'stream': str(inst.stream.stream),
|
||||||
|
'target': inst.target
|
||||||
|
}
|
||||||
|
|
||||||
|
json_config = json.dumps(config)
|
||||||
|
task = Task(stream=stream, type='restream', configuration=json_config)
|
||||||
task.save()
|
task.save()
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-26 18:34
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('rtmp', '0002_stream_publish_counter'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='stream',
|
||||||
|
name='stream',
|
||||||
|
field=models.UUIDField(default=uuid.uuid4, unique=True),
|
||||||
|
),
|
||||||
|
]
|
|
@ -14,7 +14,7 @@ class Application(models.Model):
|
||||||
|
|
||||||
class Stream(models.Model):
|
class Stream(models.Model):
|
||||||
application = models.ForeignKey(Application, on_delete=models.CASCADE)
|
application = models.ForeignKey(Application, on_delete=models.CASCADE)
|
||||||
stream = models.CharField(max_length=64, unique=True, default=uuid.uuid4)
|
stream = models.UUIDField(unique=True, default=uuid.uuid4)
|
||||||
name = models.CharField(max_length=100)
|
name = models.CharField(max_length=100)
|
||||||
|
|
||||||
# the same stream uuid can be published multiple times to different origin
|
# the same stream uuid can be published multiple times to different origin
|
||||||
|
@ -28,7 +28,7 @@ class Stream(models.Model):
|
||||||
# is now being considered active
|
# is now being considered active
|
||||||
if self.publish_counter < 1:
|
if self.publish_counter < 1:
|
||||||
signals.stream_active.send(sender=self.__class__,
|
signals.stream_active.send(sender=self.__class__,
|
||||||
stream=self.stream,
|
stream=str(self.stream),
|
||||||
param=param
|
param=param
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ class Stream(models.Model):
|
||||||
# considered inactive
|
# considered inactive
|
||||||
if self.publish_counter < 1:
|
if self.publish_counter < 1:
|
||||||
signals.stream_inactive.send(sender=self.__class__,
|
signals.stream_inactive.send(sender=self.__class__,
|
||||||
stream=self.stream,
|
stream=str(self.stream),
|
||||||
param=param
|
param=param
|
||||||
)
|
)
|
||||||
self.save()
|
self.save()
|
||||||
|
|
Loading…
Reference in New Issue