portier/source/config/tasks.py

57 lines
1.9 KiB
Python
Raw Normal View History

import json
import logging
from celery import shared_task
import requests
from config.models import Stream, SRSNode, SRSStreamInstance
logger = logging.getLogger(__name__)
def scrape_srs_servers():
for node in SRSNode.objects.filter(active=True):
scrape_srs_server(node)
update_stream_counters()
def scrape_srs_server(node: SRSNode):
try:
response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2)
response.raise_for_status()
streams = response.json().get('streams', [])
streamobjs = []
for streamjson in streams:
# if there are consumers still connected to a stream, the stream will stay listed,
# but the publish status will be false. we do not care about such (dead) streams.
active = streamjson.get('publish', {}).get('active', False)
if not active:
continue
# find the corresponding stream object by comparing the stream uuid
stream = Stream.objects.get(stream=streamjson.get('name'))
streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node)
streaminstance.statusdata = json.dumps(streamjson)
streamobjs.append(stream)
# Delete the stream instances that are not in the response
SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete()
except requests.exceptions.RequestException as e:
logger.error('Error while trying to scrape SRS server')
logger.error(node)
logger.error(e)
def update_stream_counters():
for stream in Stream.objects.all():
stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all())
logger.error(stream.publish_counter)
logger.error(SRSStreamInstance.objects.filter(stream=stream).all())
stream.save()
@shared_task
def async_scrape_srs_servers():
scrape_srs_servers()