super hacky MVP that actually WORKS!
This commit is contained in:
		
							parent
							
								
									019a17ae98
								
							
						
					
					
						commit
						a9e1e9b1b4
					
				| 
						 | 
					@ -17,6 +17,9 @@ RUN pip install -r requirements.txt
 | 
				
			||||||
# add user
 | 
					# add user
 | 
				
			||||||
RUN addgroup -S concierge && adduser -S concierge -G concierge
 | 
					RUN addgroup -S concierge && adduser -S concierge -G concierge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# add supervisord tasks directories
 | 
				
			||||||
 | 
					RUN mkdir -p /app/tasks.d /app/tasks.logs && chown concierge:concierge /app/tasks.d /app/tasks.logs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# add code
 | 
					# add code
 | 
				
			||||||
ADD . /app
 | 
					ADD . /app
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,36 @@
 | 
				
			||||||
 | 
					#!/usr/bin/env python
 | 
				
			||||||
 | 
					import sys
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import signal
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from supervisor.childutils import listener
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def write_stdout(s):
 | 
				
			||||||
 | 
					    sys.stdout.write(s)
 | 
				
			||||||
 | 
					    sys.stdout.flush()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def write_stderr(s):
 | 
				
			||||||
 | 
					    sys.stderr.write(s)
 | 
				
			||||||
 | 
					    sys.stderr.flush()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def main():
 | 
				
			||||||
 | 
					    while True:
 | 
				
			||||||
 | 
					        headers, body = listener.wait(sys.stdin, sys.stdout)
 | 
				
			||||||
 | 
					        body = dict([pair.split(":") for pair in body.split(" ")])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if body["groupname"] == "concierge":
 | 
				
			||||||
 | 
					           try:
 | 
				
			||||||
 | 
					                   pidfile = open('/run/supervisor.pid','r')
 | 
				
			||||||
 | 
					                   pid = int(pidfile.readline());
 | 
				
			||||||
 | 
					                   os.kill(pid, signal.SIGQUIT)
 | 
				
			||||||
 | 
					           except Exception as e:
 | 
				
			||||||
 | 
					                   write_stdout('could not kill supervisor: %s\n' % e.strerror)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        write_stdout('RESULT 2\nOK')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__':
 | 
				
			||||||
 | 
					   main()
 | 
				
			||||||
							
								
								
									
										154
									
								
								main.py
								
								
								
								
							
							
						
						
									
										154
									
								
								main.py
								
								
								
								
							| 
						 | 
					@ -2,26 +2,157 @@ import os
 | 
				
			||||||
import signal
 | 
					import signal
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
 | 
					import requests
 | 
				
			||||||
 | 
					import random
 | 
				
			||||||
 | 
					from jinja2 import Environment, FileSystemLoader
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# supervisord xml-rpc connection
 | 
					# supervisord xml-rpc connection
 | 
				
			||||||
from xmlrpc.client import ServerProxy
 | 
					import xmlrpc.client
 | 
				
			||||||
svd = ServerProxy('http://127.0.0.1:9001/RPC2')
 | 
					svd = xmlrpc.client.ServerProxy('http://127.0.0.1:9001/RPC2')
 | 
				
			||||||
identity = os.environ.get('CONCIERGE_IDENTITY', default="develop")
 | 
					identity = os.environ.get('CONCIERGE_IDENTITY')
 | 
				
			||||||
 | 
					portier_host = os.environ.get('PORTIER_HOST', default="portier.chaoswest.tv")
 | 
				
			||||||
 | 
					portier_scheme = os.environ.get('PORTIER_SCHEME', default="https")
 | 
				
			||||||
 | 
					base_url = '%s://%s/concierge/api/%s' % (portier_scheme, portier_host, identity)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					skills = [
 | 
				
			||||||
 | 
					    'restream'
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					edge_nodes = [
 | 
				
			||||||
 | 
					    'rtmp://ingest-nbg.chaoswest.tv:1936/',
 | 
				
			||||||
 | 
					    'rtmp://ingest-fsn.chaoswest.tv:1936/',
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					interval = 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# runtime stuff
 | 
				
			||||||
 | 
					claims = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def svd_update():
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        r = svd.supervisor.reloadConfig()
 | 
				
			||||||
 | 
					    except xmlrpc.client.Fault as e:
 | 
				
			||||||
 | 
					        if e.faultCode == 6: # SHUTDOWN_STATE
 | 
				
			||||||
 | 
					            print('svd shutting down')
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    added, changed, removed = r[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for group in added:
 | 
				
			||||||
 | 
					        print('adding %s' % group)
 | 
				
			||||||
 | 
					        svd.supervisor.addProcessGroup(group)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for group in changed:
 | 
				
			||||||
 | 
					        svd.supervisor.stopProcessGroup(group)
 | 
				
			||||||
 | 
					        svd.supervisor.removeProcessGroup(group)
 | 
				
			||||||
 | 
					        svd.supervisor.addProcessGroup(group)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for group in removed:
 | 
				
			||||||
 | 
					        # we don't want to remove ourselves by accident ;)
 | 
				
			||||||
 | 
					        print('removing %s' % group)
 | 
				
			||||||
 | 
					        if group == 'concierge':
 | 
				
			||||||
 | 
					            print('wait, no! :D' % group)
 | 
				
			||||||
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        svd.supervisor.stopProcessGroup(group)
 | 
				
			||||||
 | 
					        svd.supervisor.removeProcessGroup(group)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def sigterm_handler(signum, frame):
 | 
					def sigterm_handler(signum, frame):
 | 
				
			||||||
    print("concierge shutting down.")
 | 
					    print("concierge shutting down.")
 | 
				
			||||||
    # if concierge dies, all tasks need to die as well!
 | 
					    # if concierge dies, all tasks need to be released!
 | 
				
			||||||
 | 
					    # supervisor has a eventlistener and will kill itself (and thus all running
 | 
				
			||||||
 | 
					    # tasks) if concierge dies.
 | 
				
			||||||
 | 
					    for claim in claims:
 | 
				
			||||||
 | 
					        release_task(claim.get('uuid'))
 | 
				
			||||||
    sys.exit(0)
 | 
					    sys.exit(0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def loop(config):
 | 
					def template_tasks():
 | 
				
			||||||
    while True:
 | 
					    j = Environment(loader=FileSystemLoader('tasks.templates'))
 | 
				
			||||||
        # do stuff
 | 
					    for claim in claims:
 | 
				
			||||||
        print(svd.supervisor.getAllProcessInfo())
 | 
					        tpl = j.get_template('%s.conf.j2' % claim.get('type'))
 | 
				
			||||||
 | 
					        with open("/app/tasks.d/%s.conf" % claim.get('uuid'), "w") as f:
 | 
				
			||||||
 | 
					            f.write(tpl.render(edge=random.choice(edge_nodes), uuid=claim.get('uuid'), cfg=claim.get('configuration')))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        time.sleep(1)
 | 
					
 | 
				
			||||||
 | 
					def stop_task(uuid):
 | 
				
			||||||
 | 
					    global claims
 | 
				
			||||||
 | 
					    # remove from local claim list
 | 
				
			||||||
 | 
					    remaining_claims = [claim for claim in claims if claim.get('uuid') != uuid]
 | 
				
			||||||
 | 
					    claims = remaining_claims
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # remove task config
 | 
				
			||||||
 | 
					    file = '/app/tasks.d/%s.conf' % uuid
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        os.remove(file)
 | 
				
			||||||
 | 
					    except:
 | 
				
			||||||
 | 
					        print('error deleting task configfile', file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # reload supervisord config
 | 
				
			||||||
 | 
					    svd_update()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def release_task(uuid):
 | 
				
			||||||
 | 
					    global claims
 | 
				
			||||||
 | 
					    r = requests.post('%s/release/%s' % (base_url, uuid))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if r.status_code != 200:
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    stop_task(uuid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def claim_task(uuid):
 | 
				
			||||||
 | 
					    global claims
 | 
				
			||||||
 | 
					    r = requests.post('%s/claim/%s' % (base_url, uuid)).json()
 | 
				
			||||||
 | 
					    claims.append({
 | 
				
			||||||
 | 
					        'uuid': r.get('uuid'),
 | 
				
			||||||
 | 
					        'type': r.get('type'),
 | 
				
			||||||
 | 
					        'configuration': r.get('configuration')
 | 
				
			||||||
 | 
					    })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # rewrite supervisord config files
 | 
				
			||||||
 | 
					    template_tasks()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # reload supervisord config
 | 
				
			||||||
 | 
					    svd_update()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def loop(config):
 | 
				
			||||||
 | 
					    global claims
 | 
				
			||||||
 | 
					    while True:
 | 
				
			||||||
 | 
					        # portier heartbeat
 | 
				
			||||||
 | 
					        r = requests.post('%s/heartbeat' % base_url)
 | 
				
			||||||
 | 
					        resp = r.json()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # compare local list of claims with portiers list of claims
 | 
				
			||||||
 | 
					        for pclaim in resp['claims']:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # search for claims we don't know of
 | 
				
			||||||
 | 
					            known_claim = ['x' for claim in claims if claim.get('uuid') == pclaim.get('uuid')]
 | 
				
			||||||
 | 
					            if not known_claim:
 | 
				
			||||||
 | 
					                # portier thinks we claimed a task, but we don't know this claim.
 | 
				
			||||||
 | 
					                # we need to release the task, so it can again be picked up.
 | 
				
			||||||
 | 
					                print('releasing %s' % pclaim.get('uuid'))
 | 
				
			||||||
 | 
					                release_task(pclaim.get('uuid'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for claim in claims:
 | 
				
			||||||
 | 
					            # search for claims that portier doesn't know of (maybe taken away on purpose)
 | 
				
			||||||
 | 
					            known_claim = ['x' for pclaim in resp['claims'] if claim.get('uuid') == pclaim.get('uuid')]
 | 
				
			||||||
 | 
					            if not known_claim:
 | 
				
			||||||
 | 
					                print('stopping %s' % claim.get('uuid'))
 | 
				
			||||||
 | 
					                stop_task(claim.get('uuid'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # search for new available tasks that we can handle and try to claim one.
 | 
				
			||||||
 | 
					        for task in resp['available']:
 | 
				
			||||||
 | 
					            if task.get('type') in skills:
 | 
				
			||||||
 | 
					                claim_task(task.get('uuid'))
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        time.sleep(interval)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def main():
 | 
					def main():
 | 
				
			||||||
| 
						 | 
					@ -29,8 +160,7 @@ def main():
 | 
				
			||||||
    signal.signal(signal.SIGTERM, sigterm_handler)
 | 
					    signal.signal(signal.SIGTERM, sigterm_handler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # check connection to supervisord
 | 
					    # check connection to supervisord
 | 
				
			||||||
    print(svd.supervisor.getState())
 | 
					    loop({})
 | 
				
			||||||
    loop()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
main()
 | 
					main()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,2 +1,2 @@
 | 
				
			||||||
celery>=4.4
 | 
					requests
 | 
				
			||||||
redis
 | 
					jinja2
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,4 +1,5 @@
 | 
				
			||||||
[supervisord]
 | 
					[supervisord]
 | 
				
			||||||
 | 
					pidfile=/run/supervisor.pid
 | 
				
			||||||
logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log)
 | 
					logfile=/var/log/supervisord.log ; (main log file;default $CWD/supervisord.log)
 | 
				
			||||||
logfile_maxbytes=50MB        ; (max main logfile bytes b4 rotation;default 50MB)
 | 
					logfile_maxbytes=50MB        ; (max main logfile bytes b4 rotation;default 50MB)
 | 
				
			||||||
logfile_backups=10           ; (num of main logfile rotation backups;default 10)
 | 
					logfile_backups=10           ; (num of main logfile rotation backups;default 10)
 | 
				
			||||||
| 
						 | 
					@ -9,6 +10,12 @@ user=root
 | 
				
			||||||
[inet_http_server]
 | 
					[inet_http_server]
 | 
				
			||||||
port=127.0.0.1:9001
 | 
					port=127.0.0.1:9001
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[unix_http_server]
 | 
				
			||||||
 | 
					file=/run/supervisor.sock   ; the path to the socket file
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[supervisorctl]
 | 
				
			||||||
 | 
					serverurl=unix:///run/supervisor.sock ; use a unix:// URL  for a unix socket
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[rpcinterface:supervisor]
 | 
					[rpcinterface:supervisor]
 | 
				
			||||||
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 | 
					supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -25,3 +32,10 @@ stderr_logfile_maxbytes=0
 | 
				
			||||||
stdout_logfile=/dev/stdout
 | 
					stdout_logfile=/dev/stdout
 | 
				
			||||||
stdout_logfile_maxbytes=0
 | 
					stdout_logfile_maxbytes=0
 | 
				
			||||||
user=concierge
 | 
					user=concierge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[eventlistener:concierge_exit]
 | 
				
			||||||
 | 
					command=/app/kill.py
 | 
				
			||||||
 | 
					events=PROCESS_STATE_FATAL,PROCESS_STATE_STOPPED
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[include]
 | 
				
			||||||
 | 
					files = /app/tasks.d/*.conf
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,8 @@
 | 
				
			||||||
 | 
					[program:{{ uuid }}]
 | 
				
			||||||
 | 
					directory=/app
 | 
				
			||||||
 | 
					command=/usr/bin/ffmpeg -loglevel repeat+level+info -i {{ edge }}{{ cfg["app"] }}/{{ cfg["stream"] }} -c copy -f flv {{ cfg["target"] }}
 | 
				
			||||||
 | 
					autostart=true
 | 
				
			||||||
 | 
					autorestart=true
 | 
				
			||||||
 | 
					stderr_logfile=/app/tasks.logs/{{ uuid }}.stderr
 | 
				
			||||||
 | 
					stdout_logfile=/app/tasks.logs/{{ uuid }}.stdout
 | 
				
			||||||
 | 
					user=concierge
 | 
				
			||||||
		Loading…
	
		Reference in New Issue