durchsage/durchsage.py

214 lines
6.9 KiB
Python
Raw Normal View History

2021-10-16 15:22:35 +02:00
#!/usr/bin/env python3
import argparse
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import configparser
import datetime
import dateutil.parser
import json
import logging
import multiprocessing
import re
import requests
import signal
import sys
from typing import Tuple
from paho.mqtt import publish as mqtt_publish
config = configparser.ConfigParser()
l = logging.getLogger(__name__)
lh = logging.StreamHandler(sys.stdout)
lh.setFormatter(logging.Formatter("[%(levelname)s]: %(message)s"))
l.addHandler(lh)
s = BlockingScheduler()
manager = multiprocessing.Manager()
versions = manager.dict()
def signal_handler(sig, frame):
l.info("shutting down")
s.shutdown()
sys.exit(0)
def mqtt_send(topic: str, msg: str) -> None:
username = config.get('mqtt', 'username', fallback=None)
password = config.get('mqtt', 'password', fallback=None)
auth = {
'username': username,
'password': password,
} if username is not None and password is not None else None
tls = {
'ca_certs': config.get('mqtt', 'ca_certs', fallback='/etc/ssl/certs/ca-certificates.crt'),
'insecure': not config.getboolean('mqtt', 'tls_verify', fallback=True)
} if config.getboolean('mqtt', 'tls', fallback=False) else None
mqtt_publish.single(
topic=topic,
payload=msg,
hostname=config.get('mqtt', 'host', fallback='localhost'),
port=config.getint('mqtt', 'port', fallback=1883),
tls=tls,
auth=auth,
)
def event_is_in_future(event: dict) -> bool:
date = dateutil.parser.isoparse(event['date'])
now = datetime.datetime.now(date.tzinfo)
return date > now
def fetch_schedule_json(url: str) -> dict:
"""
Fetches the schedule as JSON from the provided url and returns it as a dict.
"""
ua = {
'User-Agent': 'fahrplan durchsage (github.com/chaoswest-tv/durchsage)',
}
try:
r = requests.get(url, timeout=10, headers=ua)
r.raise_for_status()
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.RequestException) as e:
l.error("error fetching the schedule.json: %s" % str(e))
raise e
try:
d = r.json()
except json.decoder.JSONDecodeError as e:
l.error("error parsing returned json: %s" % str(e))
raise e
return d
def parse_schedule(data: dict) -> Tuple[list, str]:
"""
Parses the schedule dict to find all future events and the schedule version.
"""
version = data['schedule']['version']
events = []
# extract all events across all days and rooms
for day in data['schedule']['conference']['days']:
for room in day['rooms']:
events.extend(day['rooms'][room])
# filter out events that are not in the future
events = list(filter(event_is_in_future, events))
return events, version
def update_event_jobs(fahrplan: str) -> None:
"""
Loads the current schedule from the configured url. If the schedule version changes,
removes all pending event jobs and adds all events from the fetched schedule as new jobs.
This routine will be periodically executed by a cron trigger.
"""
l.debug("%s - updating schedule. local state has version '%s'." %
(fahrplan, versions.get(fahrplan, None)))
fc = config["fahrplan:%s" % fahrplan]
pre_announce_mins = fc.getint('pre_announce', fallback=0)
try:
d = fetch_schedule_json(fc.get('url'))
events, version = parse_schedule(d)
except Exception as _:
l.error("%s - updating the schedule failed." % fahrplan)
return
# if the version did not change we don't need to do anything.
if versions.get(fahrplan, None) == version:
return
# remove all old scheduled events
for job in [x for x in s.get_jobs() if x.name.startswith("%s_event_" % fahrplan)]:
job.remove()
# add all new events to scheduler
for event in events:
date = dateutil.parser.isoparse(event['date'])
s.add_job(announce_event, args=[fahrplan, event], trigger='date',
run_date=date, name="%s_event_%s" % (fahrplan, event['guid']))
l.debug("announcing %s at %s" % (event['guid'], date))
if pre_announce_mins != 0:
pre_date = date - datetime.timedelta(minutes=pre_announce_mins)
s.add_job(announce_event, args=[fahrplan, event, True], trigger='date',
run_date=pre_date, name="%s_event_pre_%s" % (fahrplan, event['guid']))
l.debug("pre-announcing %s at %s" % (event['guid'], pre_date))
versions[fahrplan] = version
l.info("%s - imported schedule version '%s' with %d upcoming events." %
(fahrplan, versions[fahrplan], len(events)))
mqtt_send('fahrplan/%s/version' % fahrplan, versions[fahrplan])
def announce_event(fahrplan: str, event: dict, pre: bool = False) -> None:
"""
Takes an event dict and announces it by sending it onto the configured MQTT topic.
"""
l.info("%s - announcing event %s" % (fahrplan, event['guid']))
2021-10-16 15:22:35 +02:00
sub_topic = 'soon' if pre else 'now'
msg = json.dumps(event)
mqtt_send("fahrplan/%s/%s" % (fahrplan, sub_topic), msg)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
parser = argparse.ArgumentParser(description='fahrplan durchsage daemon')
parser.add_argument('-c', '--config', help='path to config file',
type=str, default='config.ini', required=False)
parser.add_argument(
'-v', '--verbose', help='increase verbosity level', action='count', default=0)
args = parser.parse_args()
# set logging verbosity
loglevel = max(logging.WARNING - (10 * args.verbose), 10)
l.setLevel(loglevel)
# import configuration file
config.read(args.config)
# find all fahrplan sections from configfile.
sections = [section.lstrip('fahrplan:') for section in config.sections(
) if section.startswith('fahrplan:')]
for fahrplan in sections:
fc = config["fahrplan:%s" % fahrplan]
# names need to start with letter or number, and can contain letters, numbers, dashes - or underscores _
if re.fullmatch('^[a-z0-9][a-z0-9\-\_]*$', fahrplan) is None:
l.error(
"config section [fahrplan:%s] has invalid name. will skip section." % fahrplan)
continue
if fc.get('url', fallback=None) is None:
l.error(
"config section [fahrplan:%s] is missing 'url'. will skip section." % fahrplan)
continue
cron = CronTrigger.from_crontab(fc.get('cron', fallback='*/5 * * * *'))
s.add_job(update_event_jobs, args=[
fahrplan], name="update_event_jobs_%s" % fahrplan, trigger=cron)
update_event_jobs(fahrplan)
# hand over to scheduler, this will block until scheduler is shut down
s.start()