1
0
Fork 0
hats/pkg/nats/subscribers.go

121 lines
3.6 KiB
Go
Raw Normal View History

2023-11-28 04:40:32 +00:00
package nats
import (
"encoding/json"
"fmt"
ha "code.jhot.me/jhot/hats/pkg/homeassistant"
"github.com/nats-io/nats.go"
2023-11-28 04:40:32 +00:00
)
type TopicType string
const StateTopicType TopicType = "states"
2024-09-19 17:54:37 +00:00
const AttributeTopicType TopicType = "attributes"
const TimerTopicType TopicType = "timers"
const ScheduleTopicType TopicType = "schedules"
const CommandTopicType TopicType = "command"
const NfcTopicType TopicType = "nfc"
const ZhaTopicType TopicType = "zha"
const ZwaveTopicType TopicType = "zwave"
const ZwaveSceneTopicType TopicType = "zwave-scene"
func (t TopicType) GetTopic(id string) string {
switch t {
case StateTopicType:
return fmt.Sprintf("homeassistant.%s.%s.>", t, id)
2024-09-19 17:54:37 +00:00
case AttributeTopicType:
return fmt.Sprintf("homeassistant.%s.%s.>", t, id)
case TimerTopicType:
return fmt.Sprintf("homeassistant.%s.%s.finished", t, id)
case ScheduleTopicType, CommandTopicType:
return fmt.Sprintf("%s.%s", t, id)
case NfcTopicType, ZhaTopicType, ZwaveTopicType, ZwaveSceneTopicType:
return fmt.Sprintf("homeassistant.%s.%s", t, id)
}
return ""
2023-11-28 04:40:32 +00:00
}
func (n *NatsConnection) addSub(topic string, cb nats.MsgHandler) (*nats.Subscription, error) {
return n.Conn.Subscribe(topic, cb)
}
func parseEvent(msg *nats.Msg, sendResponse bool) (ha.EventData, error) {
if sendResponse {
2023-12-07 19:39:48 +00:00
go msg.Respond([]byte("got it"))
2023-11-28 04:40:32 +00:00
}
var data ha.EventData
err := json.Unmarshal(msg.Data, &data)
return data, err
2023-11-28 04:40:32 +00:00
}
2024-09-19 18:11:08 +00:00
// AddEventSub: Add a subscriber to the NATS client that receives the full event payload
func (n *NatsConnection) AddEventSub(t TopicType, entityId string, handler func(ha.EventData) error) (*nats.Subscription, error) {
if t == "" || entityId == "" {
panic("TopicType and entityId cannot be empty")
2023-11-28 04:40:32 +00:00
}
topic := t.GetTopic(entityId)
l := n.logger.With("topic", topic, "entity_id", entityId)
2023-11-28 04:40:32 +00:00
l.Debug("Subscribing to topic")
return n.addSub(topic, func(msg *nats.Msg) {
data, err := parseEvent(msg, true)
2023-11-28 04:40:32 +00:00
if err != nil {
l.Error("Error parsing message", "error", err)
return
2023-11-28 04:40:32 +00:00
}
l.Debug(fmt.Sprintf("%s data event fired", t))
2023-11-28 04:40:32 +00:00
err = handler(data)
if err != nil {
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
return
2023-11-28 04:40:32 +00:00
}
})
2023-11-28 04:40:32 +00:00
}
2024-09-19 18:11:08 +00:00
// AddStateSub: Add a subscriber to the NATS client that receives the new state payload from an event
func (n *NatsConnection) AddStateSub(t TopicType, entityId string, handler func(ha.StateData) error) (*nats.Subscription, error) {
if t == "" || entityId == "" {
panic("TopicType and entityId cannot be empty")
2023-11-28 04:40:32 +00:00
}
topic := t.GetTopic(entityId)
l := n.logger.With("topic", topic, "entity_id", entityId)
l.Debug("Subscribing to topic")
2023-11-28 04:40:32 +00:00
return n.addSub(topic, func(msg *nats.Msg) {
data, err := parseEvent(msg, true)
2023-11-28 04:40:32 +00:00
if err != nil {
l.Error("Error parsing message", "error", err, "message", string(msg.Data))
return
2023-11-28 04:40:32 +00:00
}
l.Debug(fmt.Sprintf("%s state event fired", t), "newState", data.NewState.State)
err = handler(data.NewState)
2023-11-28 04:40:32 +00:00
if err != nil {
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
return
2023-11-28 04:40:32 +00:00
}
})
2023-11-28 04:40:32 +00:00
}
2024-09-19 18:11:08 +00:00
// AddSub: Add a subscriber to the NATS client that doesn't receive any data from the event
func (n *NatsConnection) AddSub(t TopicType, entityId string, handler func() error) (*nats.Subscription, error) {
if t == "" || entityId == "" {
panic("TopicType and entityId cannot be empty")
}
topic := t.GetTopic(entityId)
l := n.logger.With("topic", topic, "entity_id", entityId)
l.Debug("Subscribing to topic")
return n.addSub(topic, func(msg *nats.Msg) {
l.Debug(fmt.Sprintf("%s event fired", t))
err := handler()
if err != nil {
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
return
}
})
}