2020-04-26 14:35:24 +02:00
|
|
|
import os
|
|
|
|
import signal
|
|
|
|
import sys
|
|
|
|
import time
|
2020-04-27 10:09:07 +02:00
|
|
|
import requests
|
|
|
|
import random
|
|
|
|
from jinja2 import Environment, FileSystemLoader
|
2020-04-26 14:35:24 +02:00
|
|
|
|
|
|
|
# supervisord xml-rpc connection
|
2020-04-27 10:09:07 +02:00
|
|
|
import xmlrpc.client
|
|
|
|
svd = xmlrpc.client.ServerProxy('http://127.0.0.1:9001/RPC2')
|
|
|
|
identity = os.environ.get('CONCIERGE_IDENTITY')
|
|
|
|
portier_host = os.environ.get('PORTIER_HOST', default="portier.chaoswest.tv")
|
|
|
|
portier_scheme = os.environ.get('PORTIER_SCHEME', default="https")
|
|
|
|
base_url = '%s://%s/concierge/api/%s' % (portier_scheme, portier_host, identity)
|
|
|
|
|
|
|
|
skills = [
|
|
|
|
'restream'
|
|
|
|
]
|
|
|
|
|
|
|
|
edge_nodes = [
|
|
|
|
'rtmp://ingest-nbg.chaoswest.tv:1936/',
|
|
|
|
'rtmp://ingest-fsn.chaoswest.tv:1936/',
|
|
|
|
]
|
|
|
|
|
|
|
|
interval = 2
|
|
|
|
|
|
|
|
# runtime stuff
|
|
|
|
claims = []
|
|
|
|
|
2020-04-27 10:10:05 +02:00
|
|
|
|
2020-04-27 10:09:07 +02:00
|
|
|
def svd_update():
|
|
|
|
try:
|
|
|
|
r = svd.supervisor.reloadConfig()
|
|
|
|
except xmlrpc.client.Fault as e:
|
2020-04-27 10:10:05 +02:00
|
|
|
if e.faultCode == 6: # SHUTDOWN_STATE
|
2020-04-27 10:09:07 +02:00
|
|
|
print('svd shutting down')
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
|
|
|
|
added, changed, removed = r[0]
|
|
|
|
|
|
|
|
for group in added:
|
|
|
|
print('adding %s' % group)
|
|
|
|
svd.supervisor.addProcessGroup(group)
|
|
|
|
|
|
|
|
for group in changed:
|
|
|
|
svd.supervisor.stopProcessGroup(group)
|
|
|
|
svd.supervisor.removeProcessGroup(group)
|
|
|
|
svd.supervisor.addProcessGroup(group)
|
|
|
|
|
|
|
|
for group in removed:
|
|
|
|
# we don't want to remove ourselves by accident ;)
|
|
|
|
print('removing %s' % group)
|
|
|
|
if group == 'concierge':
|
|
|
|
print('wait, no! :D' % group)
|
|
|
|
continue
|
|
|
|
|
|
|
|
svd.supervisor.stopProcessGroup(group)
|
|
|
|
svd.supervisor.removeProcessGroup(group)
|
2020-04-26 14:35:24 +02:00
|
|
|
|
|
|
|
|
|
|
|
def sigterm_handler(signum, frame):
|
|
|
|
print("concierge shutting down.")
|
2020-04-27 10:09:07 +02:00
|
|
|
# if concierge dies, all tasks need to be released!
|
|
|
|
# supervisor has a eventlistener and will kill itself (and thus all running
|
|
|
|
# tasks) if concierge dies.
|
|
|
|
for claim in claims:
|
|
|
|
release_task(claim.get('uuid'))
|
2020-04-26 14:35:24 +02:00
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
2020-04-27 10:09:07 +02:00
|
|
|
def template_tasks():
|
|
|
|
j = Environment(loader=FileSystemLoader('tasks.templates'))
|
|
|
|
for claim in claims:
|
|
|
|
tpl = j.get_template('%s.conf.j2' % claim.get('type'))
|
|
|
|
with open("/app/tasks.d/%s.conf" % claim.get('uuid'), "w") as f:
|
|
|
|
f.write(tpl.render(edge=random.choice(edge_nodes), uuid=claim.get('uuid'), cfg=claim.get('configuration')))
|
|
|
|
|
|
|
|
|
|
|
|
def stop_task(uuid):
|
|
|
|
global claims
|
|
|
|
# remove from local claim list
|
|
|
|
remaining_claims = [claim for claim in claims if claim.get('uuid') != uuid]
|
|
|
|
claims = remaining_claims
|
|
|
|
|
|
|
|
# remove task config
|
|
|
|
file = '/app/tasks.d/%s.conf' % uuid
|
|
|
|
try:
|
|
|
|
os.remove(file)
|
2020-04-27 10:10:05 +02:00
|
|
|
except: # noqa
|
2020-04-27 10:09:07 +02:00
|
|
|
print('error deleting task configfile', file)
|
|
|
|
|
|
|
|
# reload supervisord config
|
|
|
|
svd_update()
|
|
|
|
|
|
|
|
|
|
|
|
def release_task(uuid):
|
|
|
|
global claims
|
|
|
|
r = requests.post('%s/release/%s' % (base_url, uuid))
|
|
|
|
|
|
|
|
if r.status_code != 200:
|
|
|
|
return
|
|
|
|
|
|
|
|
stop_task(uuid)
|
|
|
|
|
2020-04-27 10:10:05 +02:00
|
|
|
|
2020-04-27 10:09:07 +02:00
|
|
|
def claim_task(uuid):
|
|
|
|
global claims
|
|
|
|
r = requests.post('%s/claim/%s' % (base_url, uuid)).json()
|
|
|
|
claims.append({
|
|
|
|
'uuid': r.get('uuid'),
|
|
|
|
'type': r.get('type'),
|
|
|
|
'configuration': r.get('configuration')
|
|
|
|
})
|
|
|
|
|
|
|
|
# rewrite supervisord config files
|
|
|
|
template_tasks()
|
|
|
|
|
|
|
|
# reload supervisord config
|
|
|
|
svd_update()
|
|
|
|
|
|
|
|
|
2020-04-26 14:35:24 +02:00
|
|
|
def loop(config):
|
2020-04-27 10:09:07 +02:00
|
|
|
global claims
|
2020-04-26 14:35:24 +02:00
|
|
|
while True:
|
2020-04-27 10:09:07 +02:00
|
|
|
# portier heartbeat
|
|
|
|
r = requests.post('%s/heartbeat' % base_url)
|
|
|
|
resp = r.json()
|
|
|
|
|
|
|
|
# compare local list of claims with portiers list of claims
|
|
|
|
for pclaim in resp['claims']:
|
|
|
|
|
|
|
|
# search for claims we don't know of
|
|
|
|
known_claim = ['x' for claim in claims if claim.get('uuid') == pclaim.get('uuid')]
|
|
|
|
if not known_claim:
|
|
|
|
# portier thinks we claimed a task, but we don't know this claim.
|
|
|
|
# we need to release the task, so it can again be picked up.
|
|
|
|
print('releasing %s' % pclaim.get('uuid'))
|
|
|
|
release_task(pclaim.get('uuid'))
|
|
|
|
|
|
|
|
for claim in claims:
|
|
|
|
# search for claims that portier doesn't know of (maybe taken away on purpose)
|
|
|
|
known_claim = ['x' for pclaim in resp['claims'] if claim.get('uuid') == pclaim.get('uuid')]
|
|
|
|
if not known_claim:
|
|
|
|
print('stopping %s' % claim.get('uuid'))
|
|
|
|
stop_task(claim.get('uuid'))
|
|
|
|
|
|
|
|
# search for new available tasks that we can handle and try to claim one.
|
|
|
|
for task in resp['available']:
|
|
|
|
if task.get('type') in skills:
|
|
|
|
claim_task(task.get('uuid'))
|
|
|
|
break
|
|
|
|
|
|
|
|
time.sleep(interval)
|
2020-04-15 21:22:30 +02:00
|
|
|
|
|
|
|
|
2020-04-26 14:35:24 +02:00
|
|
|
def main():
|
|
|
|
# program setup
|
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
2020-04-15 21:44:56 +02:00
|
|
|
|
2020-04-26 14:35:24 +02:00
|
|
|
# check connection to supervisord
|
2020-04-27 10:09:07 +02:00
|
|
|
loop({})
|
2020-04-15 21:22:30 +02:00
|
|
|
|
2020-04-15 21:44:56 +02:00
|
|
|
|
2020-04-26 14:35:24 +02:00
|
|
|
main()
|