diff --git a/Dockerfile b/Dockerfile index a23b5fc..afe81da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,9 @@ RUN pip install -r requirements.txt # add user RUN addgroup -S concierge && adduser -S concierge -G concierge +# add supervisord tasks directories +RUN mkdir -p /app/tasks.d /app/tasks.logs && chown concierge:concierge /app/tasks.d /app/tasks.logs + # add code ADD . /app diff --git a/kill.py b/kill.py new file mode 100755 index 0000000..5369901 --- /dev/null +++ b/kill.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +import sys +import os +import signal + +from supervisor.childutils import listener + + +def write_stdout(s): + sys.stdout.write(s) + sys.stdout.flush() + + +def write_stderr(s): + sys.stderr.write(s) + sys.stderr.flush() + + +def main(): + while True: + headers, body = listener.wait(sys.stdin, sys.stdout) + body = dict([pair.split(":") for pair in body.split(" ")]) + + if body["groupname"] == "concierge": + try: + pidfile = open('/run/supervisor.pid','r') + pid = int(pidfile.readline()); + os.kill(pid, signal.SIGQUIT) + except Exception as e: + write_stdout('could not kill supervisor: %s\n' % e.strerror) + + write_stdout('RESULT 2\nOK') + + +if __name__ == '__main__': + main() diff --git a/main.py b/main.py index eac86cc..340b8e0 100644 --- a/main.py +++ b/main.py @@ -2,26 +2,157 @@ import os import signal import sys import time +import requests +import random +from jinja2 import Environment, FileSystemLoader # supervisord xml-rpc connection -from xmlrpc.client import ServerProxy -svd = ServerProxy('http://127.0.0.1:9001/RPC2') -identity = os.environ.get('CONCIERGE_IDENTITY', default="develop") +import xmlrpc.client +svd = xmlrpc.client.ServerProxy('http://127.0.0.1:9001/RPC2') +identity = os.environ.get('CONCIERGE_IDENTITY') +portier_host = os.environ.get('PORTIER_HOST', default="portier.chaoswest.tv") +portier_scheme = os.environ.get('PORTIER_SCHEME', default="https") +base_url = '%s://%s/concierge/api/%s' % (portier_scheme, portier_host, identity) + +skills = [ + 'restream' +] + +edge_nodes = [ + 'rtmp://ingest-nbg.chaoswest.tv:1936/', + 'rtmp://ingest-fsn.chaoswest.tv:1936/', +] + +interval = 2 + +# runtime stuff +claims = [] + +def svd_update(): + try: + r = svd.supervisor.reloadConfig() + except xmlrpc.client.Fault as e: + if e.faultCode == 6: # SHUTDOWN_STATE + print('svd shutting down') + return + else: + raise + + added, changed, removed = r[0] + + for group in added: + print('adding %s' % group) + svd.supervisor.addProcessGroup(group) + + for group in changed: + svd.supervisor.stopProcessGroup(group) + svd.supervisor.removeProcessGroup(group) + svd.supervisor.addProcessGroup(group) + + for group in removed: + # we don't want to remove ourselves by accident ;) + print('removing %s' % group) + if group == 'concierge': + print('wait, no! :D' % group) + continue + + svd.supervisor.stopProcessGroup(group) + svd.supervisor.removeProcessGroup(group) def sigterm_handler(signum, frame): print("concierge shutting down.") - # if concierge dies, all tasks need to die as well! - + # if concierge dies, all tasks need to be released! + # supervisor has a eventlistener and will kill itself (and thus all running + # tasks) if concierge dies. + for claim in claims: + release_task(claim.get('uuid')) sys.exit(0) -def loop(config): - while True: - # do stuff - print(svd.supervisor.getAllProcessInfo()) +def template_tasks(): + j = Environment(loader=FileSystemLoader('tasks.templates')) + for claim in claims: + tpl = j.get_template('%s.conf.j2' % claim.get('type')) + with open("/app/tasks.d/%s.conf" % claim.get('uuid'), "w") as f: + f.write(tpl.render(edge=random.choice(edge_nodes), uuid=claim.get('uuid'), cfg=claim.get('configuration'))) - time.sleep(1) + +def stop_task(uuid): + global claims + # remove from local claim list + remaining_claims = [claim for claim in claims if claim.get('uuid') != uuid] + claims = remaining_claims + + # remove task config + file = '/app/tasks.d/%s.conf' % uuid + try: + os.remove(file) + except: + print('error deleting task configfile', file) + + # reload supervisord config + svd_update() + + +def release_task(uuid): + global claims + r = requests.post('%s/release/%s' % (base_url, uuid)) + + if r.status_code != 200: + return + + stop_task(uuid) + +def claim_task(uuid): + global claims + r = requests.post('%s/claim/%s' % (base_url, uuid)).json() + claims.append({ + 'uuid': r.get('uuid'), + 'type': r.get('type'), + 'configuration': r.get('configuration') + }) + + # rewrite supervisord config files + template_tasks() + + # reload supervisord config + svd_update() + + +def loop(config): + global claims + while True: + # portier heartbeat + r = requests.post('%s/heartbeat' % base_url) + resp = r.json() + + # compare local list of claims with portiers list of claims + for pclaim in resp['claims']: + + # search for claims we don't know of + known_claim = ['x' for claim in claims if claim.get('uuid') == pclaim.get('uuid')] + if not known_claim: + # portier thinks we claimed a task, but we don't know this claim. + # we need to release the task, so it can again be picked up. + print('releasing %s' % pclaim.get('uuid')) + release_task(pclaim.get('uuid')) + + for claim in claims: + # search for claims that portier doesn't know of (maybe taken away on purpose) + known_claim = ['x' for pclaim in resp['claims'] if claim.get('uuid') == pclaim.get('uuid')] + if not known_claim: + print('stopping %s' % claim.get('uuid')) + stop_task(claim.get('uuid')) + + # search for new available tasks that we can handle and try to claim one. + for task in resp['available']: + if task.get('type') in skills: + claim_task(task.get('uuid')) + break + + + time.sleep(interval) def main(): @@ -29,8 +160,7 @@ def main(): signal.signal(signal.SIGTERM, sigterm_handler) # check connection to supervisord - print(svd.supervisor.getState()) - loop() + loop({}) main() diff --git a/requirements.txt b/requirements.txt index 522f202..9c58798 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -celery>=4.4 -redis +requests +jinja2 diff --git a/supervisord.conf b/supervisord.conf index 343d0d3..194092a 100644 --- a/supervisord.conf +++ b/supervisord.conf @@ -1,4 +1,5 @@ [supervisord] +pidfile=/run/supervisor.pid logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log) logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB) logfile_backups=10 ; (num of main logfile rotation backups;default 10) @@ -9,6 +10,12 @@ user=root [inet_http_server] port=127.0.0.1:9001 +[unix_http_server] +file=/run/supervisor.sock ; the path to the socket file + +[supervisorctl] +serverurl=unix:///run/supervisor.sock ; use a unix:// URL for a unix socket + [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface @@ -25,3 +32,10 @@ stderr_logfile_maxbytes=0 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 user=concierge + +[eventlistener:concierge_exit] +command=/app/kill.py +events=PROCESS_STATE_FATAL,PROCESS_STATE_STOPPED + +[include] +files = /app/tasks.d/*.conf diff --git a/tasks.templates/restream.conf.j2 b/tasks.templates/restream.conf.j2 new file mode 100644 index 0000000..21e475f --- /dev/null +++ b/tasks.templates/restream.conf.j2 @@ -0,0 +1,8 @@ +[program:{{ uuid }}] +directory=/app +command=/usr/bin/ffmpeg -loglevel repeat+level+info -i {{ edge }}{{ cfg["app"] }}/{{ cfg["stream"] }} -c copy -f flv {{ cfg["target"] }} +autostart=true +autorestart=true +stderr_logfile=/app/tasks.logs/{{ uuid }}.stderr +stdout_logfile=/app/tasks.logs/{{ uuid }}.stdout +user=concierge