From cf0e4b05770c740847315a92575351c6ddd40244 Mon Sep 17 00:00:00 2001 From: Jordan Hotmann Date: Mon, 17 Jun 2024 09:12:21 -0600 Subject: [PATCH] Publish timers to better topic, start sub refactor --- internal/homeassistant/subscriber.go | 2 +- pkg/nats/client.go | 9 + pkg/nats/subscribers.go | 271 ++++++++------------------- 3 files changed, 86 insertions(+), 196 deletions(-) diff --git a/internal/homeassistant/subscriber.go b/internal/homeassistant/subscriber.go index 96e4e18..3feb69c 100644 --- a/internal/homeassistant/subscriber.go +++ b/internal/homeassistant/subscriber.go @@ -161,7 +161,7 @@ func handleMessages() { data, _ := json.Marshal(message.Event.Data) nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data) case timerFinishedEventId: - nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3) + nats.PublishRequest(fmt.Sprintf("homeassistant.timers.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3) } }() } diff --git a/pkg/nats/client.go b/pkg/nats/client.go index 2d94611..721fb48 100644 --- a/pkg/nats/client.go +++ b/pkg/nats/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "strings" "github.com/nats-io/nats.go" @@ -18,6 +19,7 @@ type NatsConnection struct { jetOpts []jetstream.JetStreamOpt Conn *nats.Conn JS jetstream.JetStream + logger *slog.Logger } func DefaultNatsConnection() *NatsConnection { @@ -27,6 +29,7 @@ func DefaultNatsConnection() *NatsConnection { UseJetstream: true, connOpts: []nats.Option{}, jetOpts: []jetstream.JetStreamOpt{}, + logger: slog.Default(), } } @@ -74,6 +77,12 @@ func WithJetstreamnOption(opt jetstream.JetStreamOpt) NatsConnectionOption { } } +func WithLogger(logger *slog.Logger) NatsConnectionOption { + return func(nc *NatsConnection) { + nc.logger = logger + } +} + func New(optFuncs ...NatsConnectionOption) *NatsConnection { n := DefaultNatsConnection() for _, optFn := range optFuncs { diff --git a/pkg/nats/subscribers.go b/pkg/nats/subscribers.go index 9b2c5f0..fe24195 100644 --- a/pkg/nats/subscribers.go +++ b/pkg/nats/subscribers.go @@ -2,213 +2,94 @@ package nats import ( "encoding/json" - "errors" "fmt" - "log/slog" ha "code.jhot.me/jhot/hats/pkg/homeassistant" + "github.com/nats-io/nats.go" ) -func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId string, handler func(ha.StateData) error) { - if entityId == "" { - panic(errors.New("entity ID cannot be empty")) - } - topic := fmt.Sprintf("homeassistant.states.%s.>", entityId) - l := logger.With("topic", topic, "entity_id", entityId) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } +type TopicType string - defer sub.Unsubscribe() +const StateTopicType TopicType = "states" +const TimerTopicType TopicType = "timers" +const ScheduleTopicType TopicType = "schedules" +const CommandTopicType TopicType = "command" +const NfcTopicType TopicType = "nfc" +const ZhaTopicType TopicType = "zha" +const ZwaveTopicType TopicType = "zwave" +const ZwaveSceneTopicType TopicType = "zwave-scene" - for msg := range ch { +func (t TopicType) GetTopic(id string) string { + switch t { + case StateTopicType: + return fmt.Sprintf("homeassistant.%s.%s.>", t, id) + case TimerTopicType: + return fmt.Sprintf("homeassistant.%s.%s.finished", t, id) + case ScheduleTopicType, CommandTopicType: + return fmt.Sprintf("%s.%s", t, id) + case NfcTopicType, ZhaTopicType, ZwaveTopicType, ZwaveSceneTopicType: + return fmt.Sprintf("homeassistant.%s.%s", t, id) + } + return "" +} + +func (n *NatsConnection) addSub(topic string, cb nats.MsgHandler) (*nats.Subscription, error) { + return n.Conn.Subscribe(topic, cb) +} + +func parseEvent(msg *nats.Msg, sendResponse bool) (ha.EventData, error) { + if sendResponse { go msg.Respond([]byte("got it")) - var data ha.EventData - err = json.Unmarshal(msg.Data, &data) + } + var data ha.EventData + err := json.Unmarshal(msg.Data, &data) + return data, err +} + +func (n *NatsConnection) AddEventSub(t TopicType, entityId string, handler func(ha.EventData) error) (*nats.Subscription, error) { + if t == "" || entityId == "" { + panic("TopicType and entityId cannot be empty") + } + + topic := t.GetTopic(entityId) + l := n.logger.With("topic", topic, "entity_id", entityId) + l.Debug("Subscribing to topic") + + return n.addSub(topic, func(msg *nats.Msg) { + data, err := parseEvent(msg, true) if err != nil { l.Error("Error parsing message", "error", err) - continue + return } - l.Debug("Event state " + data.NewState.State) + l.Debug(fmt.Sprintf("%s data event fired", t)) + err = handler(data) + if err != nil { + l.Error(fmt.Sprintf("Error handling %s event", t), "error", err) + return + } + }) +} + +func (n *NatsConnection) AddStateSub(t TopicType, entityId string, handler func(ha.StateData) error) (*nats.Subscription, error) { + if t == "" || entityId == "" { + panic("TopicType and entityId cannot be empty") + } + + topic := t.GetTopic(entityId) + l := n.logger.With("topic", topic, "entity_id", entityId) + l.Debug("Subscribing to topic") + + return n.addSub(topic, func(msg *nats.Msg) { + data, err := parseEvent(msg, true) + if err != nil { + l.Error("Error parsing message", "error", err, "message", string(msg.Data)) + return + } + l.Debug(fmt.Sprintf("%s state event fired", t), "newState", data.NewState.State) err = handler(data.NewState) if err != nil { - l.Error("Error handling state event", "error", err) - continue + l.Error(fmt.Sprintf("Error handling %s event", t), "error", err) + return } - } -} - -func (n *NatsConnection) GenericTimerSubscriber(logger *slog.Logger, timerName string, handler func() error) { - if timerName == "" { - panic(errors.New("timer name cannot be empty")) - } - topic := fmt.Sprintf("homeassistant.%s.finished", timerName) - l := logger.With("topic", topic, "timer", timerName) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - l.Debug("Timer ended", "name", timerName) - err = handler() - if err != nil { - l.Error("Error handling timer event", "error", err) - continue - } - } -} - -func (n *NatsConnection) GenericScheduleSubscriber(logger *slog.Logger, scheduleName string, handler func() error) { - if scheduleName == "" { - panic(errors.New("schedule name cannot be empty")) - } - topic := fmt.Sprintf("schedules.%s", scheduleName) - l := logger.With("topic", topic, "schedule", scheduleName) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - l.Debug("Schedule fired", "name", scheduleName) - err = handler() - if err != nil { - l.Error("Error handling schedule event", "error", err) - continue - } - } -} - -func (n *NatsConnection) GenericCommandSubscriber(logger *slog.Logger, commandName string, handler func([]byte) error) { - if commandName == "" { - panic(errors.New("command name cannot be empty")) - } - topic := fmt.Sprintf("command.%s", commandName) - l := logger.With("topic", topic, "command", commandName) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - l.Debug("Command fired") - err = handler(msg.Data) - if err != nil { - l.Error("Error handling command event", "error", err) - continue - } - } -} - -func (n *NatsConnection) GenericNfcSubscriber(logger *slog.Logger, tagId string, handler func([]byte) error) { - if tagId == "" { - panic(errors.New("tag ID cannot be empty")) - } - topic := fmt.Sprintf("homeassistant.nfc.%s", tagId) - l := logger.With("topic", topic, "tagId", tagId) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - l.Debug("NFC tag scanned") - err = handler(msg.Data) - if err != nil { - l.Error("Error handling NFC event", "error", err) - continue - } - } -} - -func (n *NatsConnection) GenericZhaSubscriber(logger *slog.Logger, deviceIeee string, handler func(ha.EventData) error) { - if deviceIeee == "" { - panic(errors.New("device IEEE cannot be empty")) - } - topic := fmt.Sprintf("homeassistant.zha.%s", deviceIeee) - l := logger.With("topic", topic, "ieee", deviceIeee) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - var data ha.EventData - err = json.Unmarshal(msg.Data, &data) - if err != nil { - l.Error("Error parsing message", "error", err) - continue - } - l.Debug("ZHA event fired") - err = handler(data) - if err != nil { - l.Error("Error handling ZHA event", "error", err) - continue - } - } -} - -func (n *NatsConnection) GenericZwaveLSubscriber(logger *slog.Logger, deviceId string, scene bool, handler func(ha.EventData) error) { - if deviceId == "" { - panic(errors.New("device ID cannot be empty")) - } - scenePart := "" - if scene { - scenePart = "-scene" - } - topic := fmt.Sprintf("homeassistant.zwave%s.%s", scenePart, deviceId) - l := logger.With("topic", topic, "deviceId", deviceId) - l.Debug("Subscribing to topic") - sub, ch, err := n.Subscribe(topic) - if err != nil { - l.Error("Error subscribing to topic", "error", err) - return - } - - defer sub.Unsubscribe() - - for msg := range ch { - go msg.Respond([]byte("got it")) - var data ha.EventData - err = json.Unmarshal(msg.Data, &data) - if err != nil { - l.Error("Error parsing message", "error", err) - continue - } - l.Debug("Zwave event fired") - err = handler(data) - if err != nil { - l.Error("Error handling Zwave event", "error", err) - continue - } - } + }) }