concierge/main.py

168 lines
4.6 KiB
Python

import os
import signal
import sys
import time
import requests
import random
from jinja2 import Environment, FileSystemLoader
# supervisord xml-rpc connection
import xmlrpc.client
svd = xmlrpc.client.ServerProxy('http://127.0.0.1:9001/RPC2')
identity = os.environ.get('CONCIERGE_IDENTITY')
portier_host = os.environ.get('PORTIER_HOST', default="portier.chaoswest.tv")
portier_scheme = os.environ.get('PORTIER_SCHEME', default="https")
base_url = '%s://%s/concierge/api/%s' % (portier_scheme, portier_host, identity)
skills = [
'restream'
]
edge_nodes = [
'rtmp://ingest-nbg.chaoswest.tv:1936/',
'rtmp://ingest-fsn.chaoswest.tv:1936/',
]
interval = 2
# runtime stuff
claims = []
def svd_update():
try:
r = svd.supervisor.reloadConfig()
except xmlrpc.client.Fault as e:
if e.faultCode == 6: # SHUTDOWN_STATE
print('svd shutting down')
return
else:
raise
added, changed, removed = r[0]
for group in added:
print('adding %s' % group)
svd.supervisor.addProcessGroup(group)
for group in changed:
svd.supervisor.stopProcessGroup(group)
svd.supervisor.removeProcessGroup(group)
svd.supervisor.addProcessGroup(group)
for group in removed:
# we don't want to remove ourselves by accident ;)
print('removing %s' % group)
if group == 'concierge':
print('wait, no! :D' % group)
continue
svd.supervisor.stopProcessGroup(group)
svd.supervisor.removeProcessGroup(group)
def sigterm_handler(signum, frame):
print("concierge shutting down.")
# if concierge dies, all tasks need to be released!
# supervisor has a eventlistener and will kill itself (and thus all running
# tasks) if concierge dies.
for claim in claims:
release_task(claim.get('uuid'))
sys.exit(0)
def template_tasks():
j = Environment(loader=FileSystemLoader('tasks.templates'))
for claim in claims:
tpl = j.get_template('%s.conf.j2' % claim.get('type'))
with open("/app/tasks.d/%s.conf" % claim.get('uuid'), "w") as f:
f.write(tpl.render(edge=random.choice(edge_nodes), uuid=claim.get('uuid'), cfg=claim.get('configuration')))
def stop_task(uuid):
global claims
# remove from local claim list
remaining_claims = [claim for claim in claims if claim.get('uuid') != uuid]
claims = remaining_claims
# remove task config
file = '/app/tasks.d/%s.conf' % uuid
try:
os.remove(file)
except: # noqa
print('error deleting task configfile', file)
# reload supervisord config
svd_update()
def release_task(uuid):
global claims
r = requests.post('%s/release/%s' % (base_url, uuid))
if r.status_code != 200:
return
stop_task(uuid)
def claim_task(uuid):
global claims
r = requests.post('%s/claim/%s' % (base_url, uuid)).json()
claims.append({
'uuid': r.get('uuid'),
'type': r.get('type'),
'configuration': r.get('configuration')
})
# rewrite supervisord config files
template_tasks()
# reload supervisord config
svd_update()
def loop(config):
global claims
while True:
# portier heartbeat
r = requests.post('%s/heartbeat' % base_url)
resp = r.json()
# compare local list of claims with portiers list of claims
for pclaim in resp['claims']:
# search for claims we don't know of
known_claim = ['x' for claim in claims if claim.get('uuid') == pclaim.get('uuid')]
if not known_claim:
# portier thinks we claimed a task, but we don't know this claim.
# we need to release the task, so it can again be picked up.
print('releasing %s' % pclaim.get('uuid'))
release_task(pclaim.get('uuid'))
for claim in claims:
# search for claims that portier doesn't know of (maybe taken away on purpose)
known_claim = ['x' for pclaim in resp['claims'] if claim.get('uuid') == pclaim.get('uuid')]
if not known_claim:
print('stopping %s' % claim.get('uuid'))
stop_task(claim.get('uuid'))
# search for new available tasks that we can handle and try to claim one.
for task in resp['available']:
if task.get('type') in skills:
claim_task(task.get('uuid'))
break
time.sleep(interval)
def main():
# program setup
signal.signal(signal.SIGTERM, sigterm_handler)
# check connection to supervisord
loop({})
main()