import json import logging from celery import shared_task import requests from config.models import Stream, SRSNode, SRSStreamInstance logger = logging.getLogger(__name__) def scrape_srs_servers(): for node in SRSNode.objects.filter(active=True): scrape_srs_server(node) update_stream_counters() def scrape_srs_server(node: SRSNode): try: response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2) response.raise_for_status() streams = response.json().get('streams', []) streamobjs = [] for streamjson in streams: # if there are consumers still connected to a stream, the stream will stay listed, # but the publish status will be false. we do not care about such (dead) streams. active = streamjson.get('publish', {}).get('active', False) if not active: continue # find the corresponding stream object by comparing the stream uuid stream = Stream.objects.get(stream=streamjson.get('name')) streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node) streaminstance.statusdata = json.dumps(streamjson) streamobjs.append(stream) # Delete the stream instances that are not in the response SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete() except requests.exceptions.RequestException as e: logger.error('Error while trying to scrape SRS server') logger.error(node) logger.error(e) def update_stream_counters(): for stream in Stream.objects.all(): stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all()) logger.error(stream.publish_counter) logger.error(SRSStreamInstance.objects.filter(stream=stream).all()) stream.save() @shared_task def async_scrape_srs_servers(): scrape_srs_servers()