package main import ( "crypto/tls" "encoding/json" "flag" "fmt" "regexp" "strconv" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/hypebeast/go-osc/osc" ) type MQTTAlert struct { Level string `json:"level"` Message string `json:"msg"` Component string `json:"component"` } type WingStatusWLiveState string const ( WLSUnknown WingStatusWLiveState = "UNKNOWN" WLSStopped = "STOP" WLSPaused = "PAUSE" WLSPlaying = "PLAY" WLSRecording = "REC" ) type WingClient struct { oscClient *osc.Client recvPort *int ticker *time.Ticker } type WingNotOKReasonType string const ( WLNORTUnknown WingNotOKReasonType = "UNKNOWN" WLNORTNotRecording = "NOT_RECORDING" WLNOFreeSpaceLow = "FREE_SPACE_LOW" ) type WingNotOKReason struct { Critical bool Type WingNotOKReasonType Slot int AlertAfterSeconds time.Duration RepeatAfterSeconds time.Duration } type WingStatusWLiveSlot struct { Slot int State WingStatusWLiveState Remaining time.Duration Current time.Duration } type WingCheckResult struct { IsOK bool NeedAttention bool NotOKIterations int // Number of iterations the WING was not OK. We don't want to spam alerts. NotOKReasons []WingNotOKReason } type WingStatus struct { WLiveSlots []WingStatusWLiveSlot } // Initialize WING status struct, this must only be accessed in the main thread by // the OSC server message handler(s). // Any other threads/goroutines must not access this struct. var wingStatus = &WingStatus{ WLiveSlots: []WingStatusWLiveSlot{ {Slot: 1, State: WLSUnknown, Remaining: -1, Current: -1}, {Slot: 2, State: WLSUnknown, Remaining: -1, Current: -1}, }, } var minimumFreeSpace, _ = time.ParseDuration("10m") func wingRecvMsg(msg *osc.Message) { // Answers for wingReqWLIVEStatus r := regexp.MustCompile("/cards/wlive/(?P[12])/\\$stat/(?P(state|sdfree|etime|tracks))") matches := r.FindStringSubmatch(msg.Address) // Once more regexes are implemented, this handling needs to change if matches == nil { fmt.Println("Received unimplemented OSC Message.") osc.PrintMessage(msg) } slot, _ := strconv.Atoi(matches[1]) index := slot - 1 // Slot is 1-based, index is 0-based if matches[2] == "state" { state := msg.Arguments[0].(string) switch state { case "STOP": wingStatus.WLiveSlots[index].State = WLSStopped case "PAUSE": wingStatus.WLiveSlots[index].State = WLSPaused case "PLAY": wingStatus.WLiveSlots[index].State = WLSPlaying case "REC": wingStatus.WLiveSlots[index].State = WLSRecording default: wingStatus.WLiveSlots[index].State = WLSUnknown } } if matches[2] == "sdfree" { remaining, _ := time.ParseDuration(fmt.Sprintf("%dms", int(msg.Arguments[2].(float32)))) wingStatus.WLiveSlots[index].Remaining = remaining } } func wingCheckState(wingCheckResult *WingCheckResult) { // Check if the state of the WING is fine // If not, send an alert to the monitoring system seemsOK := true notOKReasons := []WingNotOKReason{} // If we don't know the state of any WLive slot, we're not OK. for _, wlive := range wingStatus.WLiveSlots { if wlive.State == WLSUnknown { seemsOK = false reason := WingNotOKReason{ Critical: true, Type: WLNORTUnknown, Slot: wlive.Slot, AlertAfterSeconds: time.Duration(60 * time.Second), RepeatAfterSeconds: time.Duration(5 * 60 * time.Second), } notOKReasons = append(notOKReasons, reason) } } // At least one WLive slot must be in Recording state at all times. If not, we're not OK. recording := false for _, wlive := range wingStatus.WLiveSlots { if wlive.State == WLSRecording { recording = true } } if !recording { seemsOK = false reason := WingNotOKReason{ Critical: true, Type: WLNORTNotRecording, Slot: -1, AlertAfterSeconds: time.Duration(15 * time.Second), RepeatAfterSeconds: time.Duration(60 * time.Second), } notOKReasons = append(notOKReasons, reason) } // All cards must have enough free space. If not, we're not OK. for _, wlive := range wingStatus.WLiveSlots { if wlive.State != WLSUnknown { if wlive.Remaining < minimumFreeSpace { seemsOK = false reason := WingNotOKReason{ Critical: false, Type: WLNOFreeSpaceLow, Slot: wlive.Slot, // Free Space drops to 0 when switching cards, so we don't want to spam alerts AlertAfterSeconds: time.Duration(60 * time.Second), RepeatAfterSeconds: time.Duration(60 * time.Second), } notOKReasons = append(notOKReasons, reason) } } } // If not OK, increment the counter and update the reasons if !seemsOK { wingCheckResult.NotOKIterations++ wingCheckResult.IsOK = false wingCheckResult.NotOKReasons = notOKReasons //fmt.Printf("WING is not OK for %d iterations!\n", wingCheckResult.NotOKIterations) //for _, reason := range notOKReasons { // fmt.Println(" Reason: ", reason) //} } else { // If OK, reset the counter and clear the reasons wingCheckResult.NotOKReasons = []WingNotOKReason{} wingCheckResult.NotOKIterations = 0 wingCheckResult.IsOK = true } } func wingSendSimpleOSCMsg(client *osc.Client, addr string) { msg := osc.NewMessage(addr) client.Send(msg) } func wingReqWLIVEStatus(wing *WingClient) { for card := range 3 { wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/state", *wing.recvPort, card)) wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/sdfree", *wing.recvPort, card)) wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/etime", *wing.recvPort, card)) wingSendSimpleOSCMsg(wing.oscClient, fmt.Sprintf("/%%%d/cards/wlive/%d/$stat/tracks", *wing.recvPort, card)) } } func wingLoop(wing *WingClient, MQTTClient mqtt.Client, MQTTTopic *string) { var wingCheckResult WingCheckResult for { select { case <-wing.ticker.C: wingReqWLIVEStatus(wing) // TODO: the wingCheckState function is invoked while the OSC server is still processing messages. // Due to the nature of the OSC library, we are receiving messages asynchronously and might check // an inconsistent wingStatus. Accessing the wingStatus struct from multiple goroutines is not safe. // With the current type of information, this works. But it is bad practice. // Ideally, we... : // - keep track of outstanding requests and wait for them to be processed before checking the state // - do the checks in a with safe access to the wingStatus struct // // for now, we're just going to ignore this. It's a monitoring tool, not a critical system. wingCheckState(&wingCheckResult) if !wingCheckResult.IsOK { sendAlert := false var msgs []string level := "warn" for _, reason := range wingCheckResult.NotOKReasons { // First let's build the message(s) we want to send. We might have multiple reasons. // Even if the reason we're iterating on right now should not be sent out yet, we should // always include all current reasons in the message, so operators get the full picture. if reason.Critical { level = "error" } if reason.Type == WLNORTNotRecording { msgs = append(msgs, "No backup recording is running. Please check the WING immediately!") } if reason.Type == WLNOFreeSpaceLow { msgs = append(msgs, fmt.Sprintf("Card in WING SD slot %d is getting full. Please rotate it.", reason.Slot)) } if reason.Type == WLNORTUnknown { msgs = append(msgs, fmt.Sprintf("WING SD slot %d is in an unknown state. Please check the WING.", reason.Slot)) } // Now we decide if we want to send an alert in this Tick iteration. This requires that at least // one of the alerts has reached its minimum iterations, and we have elapsed enough iterations // to repeat the alert. minimumIterations := int(reason.AlertAfterSeconds.Seconds()) if wingCheckResult.NotOKIterations < minimumIterations { fmt.Printf("Not alerting for reason %v, only %d iterations passed, need %d\n", reason, wingCheckResult.NotOKIterations, minimumIterations) continue } alertingIterations := int(reason.RepeatAfterSeconds.Seconds()) if (wingCheckResult.NotOKIterations-minimumIterations)%alertingIterations != 0 { fmt.Printf("Not yet repeating alert for reason %v, %d iterations passed\n", reason, wingCheckResult.NotOKIterations) continue } // Looks like we have an alert that needs to be repeated. Let's send it. sendAlert = true } if !sendAlert { continue } // Combine all messages into one msg := strings.Join(msgs, " | ") fmt.Printf("Sending alert: %s\n", msg) // Send an alert alert := MQTTAlert{ Level: level, Component: "audio/backuprecorder", Message: msg, } sendMQTTAlert(MQTTClient, MQTTTopic, &alert) } } } } func newMQTTClient(broker *string, username *string, password *string, clientID *string) mqtt.Client { tlsConfig := tls.Config{ InsecureSkipVerify: false, } opts := mqtt.NewClientOptions() opts.SetTLSConfig(&tlsConfig) opts.AddBroker(*broker) //opts.SetConnectRetry(true) //opts.SetAutoReconnect(true) //opts.SetConnectRetryInterval(1 * time.Second) opts.SetConnectTimeout(2 * time.Second) opts.SetClientID(*clientID) if *username != "" { opts.SetUsername(*username) } if *password != "" { opts.SetPassword(*password) } client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return client } func sendMQTTAlert(client mqtt.Client, topic *string, alert *MQTTAlert) { payload, err := json.Marshal(alert) if err != nil { fmt.Println("Error marshalling MQTT alert: ", err) return } token := client.Publish(*topic, 0, false, payload) token.Wait() if token.Error() != nil { fmt.Println("Error sending MQTT alert: ", token.Error()) } } func main() { wingIP := flag.String("wing-ip", "127.0.0.1", "IP address of the WING") recvPort := flag.Int("recv", 2224, "Port to listen on for OSC messages from the WING") recvIP := flag.String("recvIP", "0.0.0.0", "IP address to listen on for OSC messages from the WING") MQTTBroker := flag.String("mqttbroker", "127.0.0.1:1883", "MQTT broker to send alerts to") MQTTTopic := flag.String("mqtttopic", "/voc/alert", "MQTT topic to send alerts to") MQTTUsername := flag.String("mqttusername", "", "MQTT username") MQTTPassword := flag.String("mqttpassword", "", "MQTT password") MQTTClientID := flag.String("mqttclientid", "wing-monitor", "MQTT client ID") flag.Parse() ticker := time.NewTicker(time.Second) defer ticker.Stop() wing := WingClient{ oscClient: osc.NewClient(*wingIP, 2223), recvPort: recvPort, ticker: ticker, } MQTTClient := newMQTTClient(MQTTBroker, MQTTUsername, MQTTPassword, MQTTClientID) // Trigger status requests via OSC every tick go wingLoop(&wing, MQTTClient, MQTTTopic) // OSC server to receive status messages from WING d := osc.NewStandardDispatcher() d.AddMsgHandler("*", wingRecvMsg) OSCServer := &osc.Server{ Addr: fmt.Sprintf("%s:%d", *recvIP, *recvPort), Dispatcher: d, } OSCServer.ListenAndServe() }