52 lines
1.6 KiB
Python
52 lines
1.6 KiB
Python
|
import json
|
||
|
import logging
|
||
|
|
||
|
from celery import shared_task
|
||
|
import requests
|
||
|
|
||
|
from config.models import Stream, SRSNode, SRSStreamInstance
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def scrape_srs_servers():
|
||
|
for node in SRSNode.objects.filter(active=True):
|
||
|
scrape_srs_server(node)
|
||
|
update_stream_counters()
|
||
|
|
||
|
|
||
|
def scrape_srs_server(node: SRSNode):
|
||
|
try:
|
||
|
response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2)
|
||
|
response.raise_for_status()
|
||
|
streams = response.json().get('streams', [])
|
||
|
|
||
|
streamobjs = []
|
||
|
|
||
|
for streamjson in streams:
|
||
|
# find the corresponding stream object by comparing the stream uuid
|
||
|
stream = Stream.objects.get(stream=streamjson.get('name'))
|
||
|
streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node)
|
||
|
streaminstance.statusdata = json.dumps(streamjson)
|
||
|
streamobjs.append(stream)
|
||
|
|
||
|
# Delete the stream instances that are not in the response
|
||
|
SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete()
|
||
|
except requests.exceptions.RequestException as e:
|
||
|
logger.error('Error while trying to scrape SRS server')
|
||
|
logger.error(node)
|
||
|
logger.error(e)
|
||
|
|
||
|
|
||
|
def update_stream_counters():
|
||
|
for stream in Stream.objects.all():
|
||
|
stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all())
|
||
|
logger.error(stream.publish_counter)
|
||
|
logger.error(SRSStreamInstance.objects.filter(stream=stream).all())
|
||
|
stream.save()
|
||
|
|
||
|
|
||
|
@shared_task
|
||
|
def async_scrape_srs_servers():
|
||
|
scrape_srs_servers()
|