implement active srs sync and celery
reworks the entire logic how active streams are being tracked.
instead of keeping a counter by listening only to srs callback
hooks, portier will now actively scrape the http api of known
srs nodes to retrieve information about all currently existing
streams on a srs node. this prevents portier from being wrong
about active stream counts due to drift, and allows us to show
more information about stream data to users in the future,
as the srs api will also expose information about used codecs,
stream resolution and data rates as seen by srs itself.
to implement this, the previous remains of celery have been
made active again, and it is now required to run exactly one
celery beat instance and one or more celery workers beside
portier itself. these will make sure that every 5 seconds all srs
nodes are actively being scraped, on top of the scrape that is
triggered by every srs callback hook.
this keeps the data always superfresh.
the celery beat function allows us to implement cron-based
automation for many other functions (restream, pull, etc) in
the future as well, so it's okay to pull in something more heavy
here rather than just using a system cron and executing a
custom management command all the time.
2024-02-29 18:33:52 +01:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
|
|
|
|
from celery import shared_task
|
|
|
|
import requests
|
|
|
|
|
|
|
|
from config.models import Stream, SRSNode, SRSStreamInstance
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_srs_servers():
|
|
|
|
for node in SRSNode.objects.filter(active=True):
|
|
|
|
scrape_srs_server(node)
|
|
|
|
update_stream_counters()
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_srs_server(node: SRSNode):
|
|
|
|
try:
|
|
|
|
response = requests.get(f"{node.api_base}/api/v1/streams/", timeout=2)
|
|
|
|
response.raise_for_status()
|
|
|
|
streams = response.json().get('streams', [])
|
|
|
|
|
|
|
|
streamobjs = []
|
|
|
|
|
|
|
|
for streamjson in streams:
|
2024-03-02 14:17:25 +01:00
|
|
|
# if there are consumers still connected to a stream, the stream will stay listed,
|
|
|
|
# but the publish status will be false. we do not care about such (dead) streams.
|
|
|
|
active = streamjson.get('publish', {}).get('active', False)
|
|
|
|
if not active:
|
|
|
|
continue
|
implement active srs sync and celery
reworks the entire logic how active streams are being tracked.
instead of keeping a counter by listening only to srs callback
hooks, portier will now actively scrape the http api of known
srs nodes to retrieve information about all currently existing
streams on a srs node. this prevents portier from being wrong
about active stream counts due to drift, and allows us to show
more information about stream data to users in the future,
as the srs api will also expose information about used codecs,
stream resolution and data rates as seen by srs itself.
to implement this, the previous remains of celery have been
made active again, and it is now required to run exactly one
celery beat instance and one or more celery workers beside
portier itself. these will make sure that every 5 seconds all srs
nodes are actively being scraped, on top of the scrape that is
triggered by every srs callback hook.
this keeps the data always superfresh.
the celery beat function allows us to implement cron-based
automation for many other functions (restream, pull, etc) in
the future as well, so it's okay to pull in something more heavy
here rather than just using a system cron and executing a
custom management command all the time.
2024-02-29 18:33:52 +01:00
|
|
|
# find the corresponding stream object by comparing the stream uuid
|
|
|
|
stream = Stream.objects.get(stream=streamjson.get('name'))
|
|
|
|
streaminstance, _ = SRSStreamInstance.objects.get_or_create(stream=stream, node=node)
|
|
|
|
streaminstance.statusdata = json.dumps(streamjson)
|
|
|
|
streamobjs.append(stream)
|
|
|
|
|
|
|
|
# Delete the stream instances that are not in the response
|
|
|
|
SRSStreamInstance.objects.filter(node=node).exclude(stream__in=streamobjs).delete()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
logger.error('Error while trying to scrape SRS server')
|
|
|
|
logger.error(node)
|
|
|
|
logger.error(e)
|
|
|
|
|
|
|
|
|
|
|
|
def update_stream_counters():
|
|
|
|
for stream in Stream.objects.all():
|
|
|
|
stream.publish_counter = len(SRSStreamInstance.objects.filter(stream=stream).all())
|
|
|
|
logger.error(stream.publish_counter)
|
|
|
|
logger.error(SRSStreamInstance.objects.filter(stream=stream).all())
|
|
|
|
stream.save()
|
|
|
|
|
|
|
|
|
|
|
|
@shared_task
|
|
|
|
def async_scrape_srs_servers():
|
|
|
|
scrape_srs_servers()
|