wing-monitor/main.go

364 lines
11 KiB
Go
Raw Permalink Normal View History

2024-12-14 21:52:32 +01:00
package main
import (
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"regexp"
"strconv"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/hypebeast/go-osc/osc"
)
type MQTTAlert struct {
Level string `json:"level"`
Message string `json:"msg"`
Component string `json:"component"`
}
type WingStatusWLiveState string
const (
WLSUnknown WingStatusWLiveState = "UNKNOWN"
WLSStopped = "STOP"
WLSPaused = "PAUSE"
WLSPlaying = "PLAY"
WLSRecording = "REC"
)
type WingClient struct {
oscClient *osc.Client
recvPort *int
ticker *time.Ticker
}
type WingNotOKReasonType string
const (
WLNORTUnknown WingNotOKReasonType = "UNKNOWN"
WLNORTNotRecording = "NOT_RECORDING"
WLNOFreeSpaceLow = "FREE_SPACE_LOW"
)
type WingNotOKReason struct {
Critical bool
Type WingNotOKReasonType
Slot int
AlertAfterSeconds time.Duration
RepeatAfterSeconds time.Duration
}
type WingStatusWLiveSlot struct {
Slot int
State WingStatusWLiveState
Remaining time.Duration
Current time.Duration
}
type WingCheckResult struct {
IsOK bool
NeedAttention bool
NotOKIterations int // Number of iterations the WING was not OK. We don't want to spam alerts.
NotOKReasons []WingNotOKReason
}
type WingStatus struct {
WLiveSlots []WingStatusWLiveSlot
}
// Initialize WING status struct, this must only be accessed in the main thread by
// the OSC server message handler(s).
// Any other threads/goroutines must not access this struct.
var wingStatus = &WingStatus{
WLiveSlots: []WingStatusWLiveSlot{
{Slot: 1, State: WLSUnknown, Remaining: -1, Current: -1},
{Slot: 2, State: WLSUnknown, Remaining: -1, Current: -1},
},
}
var minimumFreeSpace, _ = time.ParseDuration("10m")
func wingRecvMsg(msg *osc.Message) {
// Answers for wingReqWLIVEStatus
r := regexp.MustCompile("/cards/wlive/(?P<slot>[12])/\\$stat/(?P<type>(state|sdfree|etime|tracks))")
matches := r.FindStringSubmatch(msg.Address)
// Once more regexes are implemented, this handling needs to change
if matches == nil {
fmt.Println("Received unimplemented OSC Message.")
osc.PrintMessage(msg)
}
slot, _ := strconv.Atoi(matches[1])
index := slot - 1 // Slot is 1-based, index is 0-based
if matches[2] == "state" {
state := msg.Arguments[0].(string)
switch state {
case "STOP":
wingStatus.WLiveSlots[index].State = WLSStopped
case "PAUSE":
wingStatus.WLiveSlots[index].State = WLSPaused
case "PLAY":
wingStatus.WLiveSlots[index].State = WLSPlaying
case "REC":
wingStatus.WLiveSlots[index].State = WLSRecording
default:
wingStatus.WLiveSlots[index].State = WLSUnknown
}
}
if matches[2] == "sdfree" {
remaining, _ := time.ParseDuration(fmt.Sprintf("%dms", int(msg.Arguments[2].(float32))))
wingStatus.WLiveSlots[index].Remaining = remaining
}
}
func wingCheckState(wingCheckResult *WingCheckResult) {
// Check if the state of the WING is fine
// If not, send an alert to the monitoring system
seemsOK := true
notOKReasons := []WingNotOKReason{}
// If we don't know the state of any WLive slot, we're not OK.
for _, wlive := range wingStatus.WLiveSlots {
if wlive.State == WLSUnknown {
seemsOK = false
reason := WingNotOKReason{
Critical: true,
Type: WLNORTUnknown,
Slot: wlive.Slot,
AlertAfterSeconds: time.Duration(60 * time.Second),
RepeatAfterSeconds: time.Duration(5 * 60 * time.Second),
}
notOKReasons = append(notOKReasons, reason)
}
}
// At least one WLive slot must be in Recording state at all times. If not, we're not OK.
recording := false
for _, wlive := range wingStatus.WLiveSlots {
if wlive.State == WLSRecording {
recording = true
}
}
if !recording {
seemsOK = false
reason := WingNotOKReason{
Critical: true,
Type: WLNORTNotRecording,
Slot: -1,
AlertAfterSeconds: time.Duration(15 * time.Second),
RepeatAfterSeconds: time.Duration(60 * time.Second),
}
notOKReasons = append(notOKReasons, reason)
}
// All cards must have enough free space. If not, we're not OK.
for _, wlive := range wingStatus.WLiveSlots {
if wlive.State != WLSUnknown {
if wlive.Remaining < minimumFreeSpace {
seemsOK = false
reason := WingNotOKReason{
Critical: false,
Type: WLNOFreeSpaceLow,
Slot: wlive.Slot,
// Free Space drops to 0 when switching cards, so we don't want to spam alerts
AlertAfterSeconds: time.Duration(60 * time.Second),
RepeatAfterSeconds: time.Duration(60 * time.Second),
}
notOKReasons = append(notOKReasons, reason)
}
}
}
// If not OK, increment the counter and update the reasons
if !seemsOK {
wingCheckResult.NotOKIterations++
wingCheckResult.IsOK = false
wingCheckResult.NotOKReasons = notOKReasons
//fmt.Printf("WING is not OK for %d iterations!\n", wingCheckResult.NotOKIterations)
//for _, reason := range notOKReasons {
// fmt.Println(" Reason: ", reason)
//}
} else {
// If OK, reset the counter and clear the reasons
wingCheckResult.NotOKReasons = []WingNotOKReason{}
wingCheckResult.NotOKIterations = 0
wingCheckResult.IsOK = true
}
}
func wingSendSimpleOSCMsg(client *osc.Client, addr string) {
msg := osc.NewMessage(addr)
client.Send(msg)
}
func wingReqWLIVEStatus(wing *WingClient) {
for card := range 3 {
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/state", *wing.recvPort, card))
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/sdfree", *wing.recvPort, card))
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/etime", *wing.recvPort, card))
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/tracks", *wing.recvPort, card))
}
}
func wingLoop(wing *WingClient, MQTTClient mqtt.Client, MQTTTopic *string) {
var wingCheckResult WingCheckResult
for {
select {
case <-wing.ticker.C:
wingReqWLIVEStatus(wing)
// TODO: the wingCheckState function is invoked while the OSC server is still processing messages.
// Due to the nature of the OSC library, we are receiving messages asynchronously and might check
// an inconsistent wingStatus. Accessing the wingStatus struct from multiple goroutines is not safe.
// With the current type of information, this works. But it is bad practice.
// Ideally, we... :
// - keep track of outstanding requests and wait for them to be processed before checking the state
// - do the checks in a with safe access to the wingStatus struct
//
// for now, we're just going to ignore this. It's a monitoring tool, not a critical system.
wingCheckState(&wingCheckResult)
if !wingCheckResult.IsOK {
sendAlert := false
var msgs []string
level := "warn"
2024-12-14 21:52:32 +01:00
for _, reason := range wingCheckResult.NotOKReasons {
// First let's build the message(s) we want to send. We might have multiple reasons.
// Even if the reason we're iterating on right now should not be sent out yet, we should
// always include all current reasons in the message, so operators get the full picture.
if reason.Critical {
level = "error"
}
if reason.Type == WLNORTNotRecording {
msgs = append(msgs, "No backup recording is running. Please check the WING immediately!")
}
if reason.Type == WLNOFreeSpaceLow {
msgs = append(msgs, fmt.Sprintf("Card in WING SD slot %d is getting full. Please rotate it.", reason.Slot))
}
if reason.Type == WLNORTUnknown {
msgs = append(msgs, fmt.Sprintf("WING SD slot %d is in an unknown state. Please check the WING.", reason.Slot))
}
// Now we decide if we want to send an alert in this Tick iteration. This requires that at least
// one of the alerts has reached its minimum iterations, and we have elapsed enough iterations
// to repeat the alert.
minimumIterations := int(reason.AlertAfterSeconds.Seconds())
if wingCheckResult.NotOKIterations < minimumIterations {
fmt.Printf("Not alerting for reason %v, only %d iterations passed, need %d\n", reason, wingCheckResult.NotOKIterations, minimumIterations)
continue
}
alertingIterations := int(reason.RepeatAfterSeconds.Seconds())
if (wingCheckResult.NotOKIterations-minimumIterations)%alertingIterations != 0 {
fmt.Printf("Not yet repeating alert for reason %v, %d iterations passed\n", reason, wingCheckResult.NotOKIterations)
continue
}
// Looks like we have an alert that needs to be repeated. Let's send it.
sendAlert = true
}
if !sendAlert {
continue
}
// Combine all messages into one
msg := strings.Join(msgs, " | ")
fmt.Printf("Sending alert: %s\n", msg)
// Send an alert
alert := MQTTAlert{
Level: level,
Component: "audio/backuprecorder",
Message: msg,
}
sendMQTTAlert(MQTTClient, MQTTTopic, &alert)
}
}
}
}
func newMQTTClient(broker *string, username *string, password *string, clientID *string) mqtt.Client {
tlsConfig := tls.Config{
InsecureSkipVerify: false,
}
opts := mqtt.NewClientOptions()
opts.SetTLSConfig(&tlsConfig)
opts.AddBroker(*broker)
//opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
//opts.SetConnectRetryInterval(1 * time.Second)
opts.SetConnectTimeout(2 * time.Second)
2024-12-14 21:52:32 +01:00
opts.SetClientID(*clientID)
if *username != "" {
opts.SetUsername(*username)
}
if *password != "" {
opts.SetPassword(*password)
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
func sendMQTTAlert(client mqtt.Client, topic *string, alert *MQTTAlert) {
payload, err := json.Marshal(alert)
if err != nil {
fmt.Println("Error marshalling MQTT alert: ", err)
return
}
token := client.Publish(*topic, 0, false, payload)
token.Wait()
if token.Error() != nil {
fmt.Println("Error sending MQTT alert: ", token.Error())
}
}
func main() {
wingIP := flag.String("wing-ip", "127.0.0.1", "IP address of the WING")
recvPort := flag.Int("recv", 2224, "Port to listen on for OSC messages from the WING")
recvIP := flag.String("recvIP", "0.0.0.0", "IP address to listen on for OSC messages from the WING")
MQTTBroker := flag.String("mqttbroker", "127.0.0.1:1883", "MQTT broker to send alerts to")
MQTTTopic := flag.String("mqtttopic", "/voc/alert", "MQTT topic to send alerts to")
MQTTUsername := flag.String("mqttusername", "", "MQTT username")
MQTTPassword := flag.String("mqttpassword", "", "MQTT password")
MQTTClientID := flag.String("mqttclientid", "wing-monitor", "MQTT client ID")
flag.Parse()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
wing := WingClient{
oscClient: osc.NewClient(*wingIP, 2223),
recvPort: recvPort,
ticker: ticker,
}
MQTTClient := newMQTTClient(MQTTBroker, MQTTUsername, MQTTPassword, MQTTClientID)
// Trigger status requests via OSC every tick
go wingLoop(&wing, MQTTClient, MQTTTopic)
// OSC server to receive status messages from WING
d := osc.NewStandardDispatcher()
d.AddMsgHandler("*", wingRecvMsg)
OSCServer := &osc.Server{
Addr: fmt.Sprintf("%s:%d", *recvIP, *recvPort),
Dispatcher: d,
}
OSCServer.ListenAndServe()
}