2024-12-14 21:52:32 +01:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/tls"
|
|
|
|
"encoding/json"
|
|
|
|
"flag"
|
|
|
|
"fmt"
|
|
|
|
"regexp"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
|
|
|
|
"github.com/hypebeast/go-osc/osc"
|
|
|
|
)
|
|
|
|
|
|
|
|
type MQTTAlert struct {
|
|
|
|
Level string `json:"level"`
|
|
|
|
Message string `json:"msg"`
|
|
|
|
Component string `json:"component"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type WingStatusWLiveState string
|
|
|
|
|
|
|
|
const (
|
|
|
|
WLSUnknown WingStatusWLiveState = "UNKNOWN"
|
|
|
|
WLSStopped = "STOP"
|
|
|
|
WLSPaused = "PAUSE"
|
|
|
|
WLSPlaying = "PLAY"
|
|
|
|
WLSRecording = "REC"
|
|
|
|
)
|
|
|
|
|
|
|
|
type WingClient struct {
|
|
|
|
oscClient *osc.Client
|
|
|
|
recvPort *int
|
|
|
|
ticker *time.Ticker
|
|
|
|
}
|
|
|
|
|
|
|
|
type WingNotOKReasonType string
|
|
|
|
|
|
|
|
const (
|
|
|
|
WLNORTUnknown WingNotOKReasonType = "UNKNOWN"
|
|
|
|
WLNORTNotRecording = "NOT_RECORDING"
|
|
|
|
WLNOFreeSpaceLow = "FREE_SPACE_LOW"
|
|
|
|
)
|
|
|
|
|
|
|
|
type WingNotOKReason struct {
|
|
|
|
Critical bool
|
|
|
|
Type WingNotOKReasonType
|
|
|
|
Slot int
|
|
|
|
AlertAfterSeconds time.Duration
|
|
|
|
RepeatAfterSeconds time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
type WingStatusWLiveSlot struct {
|
|
|
|
Slot int
|
|
|
|
State WingStatusWLiveState
|
|
|
|
Remaining time.Duration
|
|
|
|
Current time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
type WingCheckResult struct {
|
|
|
|
IsOK bool
|
|
|
|
NeedAttention bool
|
|
|
|
NotOKIterations int // Number of iterations the WING was not OK. We don't want to spam alerts.
|
|
|
|
NotOKReasons []WingNotOKReason
|
|
|
|
}
|
|
|
|
|
|
|
|
type WingStatus struct {
|
|
|
|
WLiveSlots []WingStatusWLiveSlot
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize WING status struct, this must only be accessed in the main thread by
|
|
|
|
// the OSC server message handler(s).
|
|
|
|
// Any other threads/goroutines must not access this struct.
|
|
|
|
var wingStatus = &WingStatus{
|
|
|
|
WLiveSlots: []WingStatusWLiveSlot{
|
|
|
|
{Slot: 1, State: WLSUnknown, Remaining: -1, Current: -1},
|
|
|
|
{Slot: 2, State: WLSUnknown, Remaining: -1, Current: -1},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
var minimumFreeSpace, _ = time.ParseDuration("10m")
|
|
|
|
|
|
|
|
func wingRecvMsg(msg *osc.Message) {
|
|
|
|
// Answers for wingReqWLIVEStatus
|
|
|
|
r := regexp.MustCompile("/cards/wlive/(?P<slot>[12])/\\$stat/(?P<type>(state|sdfree|etime|tracks))")
|
|
|
|
matches := r.FindStringSubmatch(msg.Address)
|
|
|
|
|
|
|
|
// Once more regexes are implemented, this handling needs to change
|
|
|
|
if matches == nil {
|
|
|
|
fmt.Println("Received unimplemented OSC Message.")
|
|
|
|
osc.PrintMessage(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
slot, _ := strconv.Atoi(matches[1])
|
|
|
|
index := slot - 1 // Slot is 1-based, index is 0-based
|
|
|
|
|
|
|
|
if matches[2] == "state" {
|
|
|
|
state := msg.Arguments[0].(string)
|
|
|
|
switch state {
|
|
|
|
case "STOP":
|
|
|
|
wingStatus.WLiveSlots[index].State = WLSStopped
|
|
|
|
case "PAUSE":
|
|
|
|
wingStatus.WLiveSlots[index].State = WLSPaused
|
|
|
|
case "PLAY":
|
|
|
|
wingStatus.WLiveSlots[index].State = WLSPlaying
|
|
|
|
case "REC":
|
|
|
|
wingStatus.WLiveSlots[index].State = WLSRecording
|
|
|
|
default:
|
|
|
|
wingStatus.WLiveSlots[index].State = WLSUnknown
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if matches[2] == "sdfree" {
|
|
|
|
remaining, _ := time.ParseDuration(fmt.Sprintf("%dms", int(msg.Arguments[2].(float32))))
|
|
|
|
wingStatus.WLiveSlots[index].Remaining = remaining
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func wingCheckState(wingCheckResult *WingCheckResult) {
|
|
|
|
// Check if the state of the WING is fine
|
|
|
|
// If not, send an alert to the monitoring system
|
|
|
|
|
|
|
|
seemsOK := true
|
|
|
|
notOKReasons := []WingNotOKReason{}
|
|
|
|
// If we don't know the state of any WLive slot, we're not OK.
|
|
|
|
for _, wlive := range wingStatus.WLiveSlots {
|
|
|
|
if wlive.State == WLSUnknown {
|
|
|
|
seemsOK = false
|
|
|
|
reason := WingNotOKReason{
|
|
|
|
Critical: true,
|
|
|
|
Type: WLNORTUnknown,
|
|
|
|
Slot: wlive.Slot,
|
|
|
|
AlertAfterSeconds: time.Duration(60 * time.Second),
|
|
|
|
RepeatAfterSeconds: time.Duration(5 * 60 * time.Second),
|
|
|
|
}
|
|
|
|
notOKReasons = append(notOKReasons, reason)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// At least one WLive slot must be in Recording state at all times. If not, we're not OK.
|
|
|
|
recording := false
|
|
|
|
for _, wlive := range wingStatus.WLiveSlots {
|
|
|
|
if wlive.State == WLSRecording {
|
|
|
|
recording = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !recording {
|
|
|
|
seemsOK = false
|
|
|
|
reason := WingNotOKReason{
|
|
|
|
Critical: true,
|
|
|
|
Type: WLNORTNotRecording,
|
|
|
|
Slot: -1,
|
|
|
|
AlertAfterSeconds: time.Duration(15 * time.Second),
|
|
|
|
RepeatAfterSeconds: time.Duration(60 * time.Second),
|
|
|
|
}
|
|
|
|
notOKReasons = append(notOKReasons, reason)
|
|
|
|
}
|
|
|
|
|
|
|
|
// All cards must have enough free space. If not, we're not OK.
|
|
|
|
for _, wlive := range wingStatus.WLiveSlots {
|
|
|
|
if wlive.State != WLSUnknown {
|
|
|
|
if wlive.Remaining < minimumFreeSpace {
|
|
|
|
seemsOK = false
|
|
|
|
reason := WingNotOKReason{
|
|
|
|
Critical: false,
|
|
|
|
Type: WLNOFreeSpaceLow,
|
|
|
|
Slot: wlive.Slot,
|
|
|
|
// Free Space drops to 0 when switching cards, so we don't want to spam alerts
|
|
|
|
AlertAfterSeconds: time.Duration(60 * time.Second),
|
|
|
|
RepeatAfterSeconds: time.Duration(60 * time.Second),
|
|
|
|
}
|
|
|
|
notOKReasons = append(notOKReasons, reason)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If not OK, increment the counter and update the reasons
|
|
|
|
if !seemsOK {
|
|
|
|
wingCheckResult.NotOKIterations++
|
|
|
|
wingCheckResult.IsOK = false
|
|
|
|
wingCheckResult.NotOKReasons = notOKReasons
|
|
|
|
//fmt.Printf("WING is not OK for %d iterations!\n", wingCheckResult.NotOKIterations)
|
|
|
|
//for _, reason := range notOKReasons {
|
|
|
|
// fmt.Println(" Reason: ", reason)
|
|
|
|
//}
|
|
|
|
} else {
|
|
|
|
// If OK, reset the counter and clear the reasons
|
|
|
|
wingCheckResult.NotOKReasons = []WingNotOKReason{}
|
|
|
|
wingCheckResult.NotOKIterations = 0
|
|
|
|
wingCheckResult.IsOK = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func wingSendSimpleOSCMsg(client *osc.Client, addr string) {
|
|
|
|
msg := osc.NewMessage(addr)
|
|
|
|
client.Send(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func wingReqWLIVEStatus(wing *WingClient) {
|
|
|
|
for card := range 3 {
|
|
|
|
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/state", *wing.recvPort, card))
|
|
|
|
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/sdfree", *wing.recvPort, card))
|
|
|
|
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/etime", *wing.recvPort, card))
|
|
|
|
wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/tracks", *wing.recvPort, card))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func wingLoop(wing *WingClient, MQTTClient mqtt.Client, MQTTTopic *string) {
|
|
|
|
var wingCheckResult WingCheckResult
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-wing.ticker.C:
|
|
|
|
wingReqWLIVEStatus(wing)
|
|
|
|
// TODO: the wingCheckState function is invoked while the OSC server is still processing messages.
|
|
|
|
// Due to the nature of the OSC library, we are receiving messages asynchronously and might check
|
|
|
|
// an inconsistent wingStatus. Accessing the wingStatus struct from multiple goroutines is not safe.
|
|
|
|
// With the current type of information, this works. But it is bad practice.
|
|
|
|
|
|
|
|
// Ideally, we... :
|
|
|
|
// - keep track of outstanding requests and wait for them to be processed before checking the state
|
|
|
|
// - do the checks in a with safe access to the wingStatus struct
|
|
|
|
//
|
|
|
|
// for now, we're just going to ignore this. It's a monitoring tool, not a critical system.
|
|
|
|
wingCheckState(&wingCheckResult)
|
|
|
|
if !wingCheckResult.IsOK {
|
|
|
|
sendAlert := false
|
|
|
|
var msgs []string
|
2024-12-16 17:04:00 +01:00
|
|
|
level := "warn"
|
2024-12-14 21:52:32 +01:00
|
|
|
for _, reason := range wingCheckResult.NotOKReasons {
|
|
|
|
// First let's build the message(s) we want to send. We might have multiple reasons.
|
|
|
|
// Even if the reason we're iterating on right now should not be sent out yet, we should
|
|
|
|
// always include all current reasons in the message, so operators get the full picture.
|
|
|
|
if reason.Critical {
|
|
|
|
level = "error"
|
|
|
|
}
|
|
|
|
if reason.Type == WLNORTNotRecording {
|
|
|
|
msgs = append(msgs, "No backup recording is running. Please check the WING immediately!")
|
|
|
|
}
|
|
|
|
if reason.Type == WLNOFreeSpaceLow {
|
|
|
|
msgs = append(msgs, fmt.Sprintf("Card in WING SD slot %d is getting full. Please rotate it.", reason.Slot))
|
|
|
|
}
|
|
|
|
if reason.Type == WLNORTUnknown {
|
|
|
|
msgs = append(msgs, fmt.Sprintf("WING SD slot %d is in an unknown state. Please check the WING.", reason.Slot))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now we decide if we want to send an alert in this Tick iteration. This requires that at least
|
|
|
|
// one of the alerts has reached its minimum iterations, and we have elapsed enough iterations
|
|
|
|
// to repeat the alert.
|
|
|
|
minimumIterations := int(reason.AlertAfterSeconds.Seconds())
|
|
|
|
|
|
|
|
if wingCheckResult.NotOKIterations < minimumIterations {
|
|
|
|
fmt.Printf("Not alerting for reason %v, only %d iterations passed, need %d\n", reason, wingCheckResult.NotOKIterations, minimumIterations)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
alertingIterations := int(reason.RepeatAfterSeconds.Seconds())
|
|
|
|
|
|
|
|
if (wingCheckResult.NotOKIterations-minimumIterations)%alertingIterations != 0 {
|
|
|
|
fmt.Printf("Not yet repeating alert for reason %v, %d iterations passed\n", reason, wingCheckResult.NotOKIterations)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Looks like we have an alert that needs to be repeated. Let's send it.
|
|
|
|
sendAlert = true
|
|
|
|
}
|
|
|
|
|
|
|
|
if !sendAlert {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// Combine all messages into one
|
|
|
|
msg := strings.Join(msgs, " | ")
|
|
|
|
|
|
|
|
fmt.Printf("Sending alert: %s\n", msg)
|
|
|
|
|
|
|
|
// Send an alert
|
|
|
|
alert := MQTTAlert{
|
|
|
|
Level: level,
|
|
|
|
Component: "audio/backuprecorder",
|
|
|
|
Message: msg,
|
|
|
|
}
|
|
|
|
|
|
|
|
sendMQTTAlert(MQTTClient, MQTTTopic, &alert)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newMQTTClient(broker *string, username *string, password *string, clientID *string) mqtt.Client {
|
|
|
|
tlsConfig := tls.Config{
|
|
|
|
InsecureSkipVerify: false,
|
|
|
|
}
|
|
|
|
|
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
|
opts.SetTLSConfig(&tlsConfig)
|
|
|
|
opts.AddBroker(*broker)
|
2024-12-16 17:04:00 +01:00
|
|
|
//opts.SetConnectRetry(true)
|
|
|
|
//opts.SetAutoReconnect(true)
|
|
|
|
//opts.SetConnectRetryInterval(1 * time.Second)
|
|
|
|
opts.SetConnectTimeout(2 * time.Second)
|
2024-12-14 21:52:32 +01:00
|
|
|
opts.SetClientID(*clientID)
|
|
|
|
if *username != "" {
|
|
|
|
opts.SetUsername(*username)
|
|
|
|
}
|
|
|
|
if *password != "" {
|
|
|
|
opts.SetPassword(*password)
|
|
|
|
}
|
|
|
|
client := mqtt.NewClient(opts)
|
|
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
|
panic(token.Error())
|
|
|
|
}
|
|
|
|
return client
|
|
|
|
}
|
|
|
|
|
|
|
|
func sendMQTTAlert(client mqtt.Client, topic *string, alert *MQTTAlert) {
|
|
|
|
payload, err := json.Marshal(alert)
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println("Error marshalling MQTT alert: ", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
token := client.Publish(*topic, 0, false, payload)
|
|
|
|
token.Wait()
|
|
|
|
if token.Error() != nil {
|
|
|
|
fmt.Println("Error sending MQTT alert: ", token.Error())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
wingIP := flag.String("wing-ip", "127.0.0.1", "IP address of the WING")
|
|
|
|
recvPort := flag.Int("recv", 2224, "Port to listen on for OSC messages from the WING")
|
|
|
|
recvIP := flag.String("recvIP", "0.0.0.0", "IP address to listen on for OSC messages from the WING")
|
|
|
|
MQTTBroker := flag.String("mqttbroker", "127.0.0.1:1883", "MQTT broker to send alerts to")
|
|
|
|
MQTTTopic := flag.String("mqtttopic", "/voc/alert", "MQTT topic to send alerts to")
|
|
|
|
MQTTUsername := flag.String("mqttusername", "", "MQTT username")
|
|
|
|
MQTTPassword := flag.String("mqttpassword", "", "MQTT password")
|
|
|
|
MQTTClientID := flag.String("mqttclientid", "wing-monitor", "MQTT client ID")
|
|
|
|
|
|
|
|
flag.Parse()
|
|
|
|
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
wing := WingClient{
|
|
|
|
oscClient: osc.NewClient(*wingIP, 2223),
|
|
|
|
recvPort: recvPort,
|
|
|
|
ticker: ticker,
|
|
|
|
}
|
|
|
|
|
|
|
|
MQTTClient := newMQTTClient(MQTTBroker, MQTTUsername, MQTTPassword, MQTTClientID)
|
|
|
|
|
|
|
|
// Trigger status requests via OSC every tick
|
|
|
|
go wingLoop(&wing, MQTTClient, MQTTTopic)
|
|
|
|
|
|
|
|
// OSC server to receive status messages from WING
|
|
|
|
d := osc.NewStandardDispatcher()
|
|
|
|
d.AddMsgHandler("*", wingRecvMsg)
|
|
|
|
OSCServer := &osc.Server{
|
|
|
|
Addr: fmt.Sprintf("%s:%d", *recvIP, *recvPort),
|
|
|
|
Dispatcher: d,
|
|
|
|
}
|
|
|
|
OSCServer.ListenAndServe()
|
|
|
|
}
|