parent
24c1a46360
commit
cf0e4b0577
|
@ -161,7 +161,7 @@ func handleMessages() {
|
||||||
data, _ := json.Marshal(message.Event.Data)
|
data, _ := json.Marshal(message.Event.Data)
|
||||||
nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data)
|
nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data)
|
||||||
case timerFinishedEventId:
|
case timerFinishedEventId:
|
||||||
nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3)
|
nats.PublishRequest(fmt.Sprintf("homeassistant.timers.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
@ -18,6 +19,7 @@ type NatsConnection struct {
|
||||||
jetOpts []jetstream.JetStreamOpt
|
jetOpts []jetstream.JetStreamOpt
|
||||||
Conn *nats.Conn
|
Conn *nats.Conn
|
||||||
JS jetstream.JetStream
|
JS jetstream.JetStream
|
||||||
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultNatsConnection() *NatsConnection {
|
func DefaultNatsConnection() *NatsConnection {
|
||||||
|
@ -27,6 +29,7 @@ func DefaultNatsConnection() *NatsConnection {
|
||||||
UseJetstream: true,
|
UseJetstream: true,
|
||||||
connOpts: []nats.Option{},
|
connOpts: []nats.Option{},
|
||||||
jetOpts: []jetstream.JetStreamOpt{},
|
jetOpts: []jetstream.JetStreamOpt{},
|
||||||
|
logger: slog.Default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +77,12 @@ func WithJetstreamnOption(opt jetstream.JetStreamOpt) NatsConnectionOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithLogger(logger *slog.Logger) NatsConnectionOption {
|
||||||
|
return func(nc *NatsConnection) {
|
||||||
|
nc.logger = logger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func New(optFuncs ...NatsConnectionOption) *NatsConnection {
|
func New(optFuncs ...NatsConnectionOption) *NatsConnection {
|
||||||
n := DefaultNatsConnection()
|
n := DefaultNatsConnection()
|
||||||
for _, optFn := range optFuncs {
|
for _, optFn := range optFuncs {
|
||||||
|
|
|
@ -2,213 +2,94 @@ package nats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
|
||||||
|
|
||||||
ha "code.jhot.me/jhot/hats/pkg/homeassistant"
|
ha "code.jhot.me/jhot/hats/pkg/homeassistant"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId string, handler func(ha.StateData) error) {
|
type TopicType string
|
||||||
if entityId == "" {
|
|
||||||
panic(errors.New("entity ID cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("homeassistant.states.%s.>", entityId)
|
|
||||||
l := logger.With("topic", topic, "entity_id", entityId)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
const StateTopicType TopicType = "states"
|
||||||
|
const TimerTopicType TopicType = "timers"
|
||||||
|
const ScheduleTopicType TopicType = "schedules"
|
||||||
|
const CommandTopicType TopicType = "command"
|
||||||
|
const NfcTopicType TopicType = "nfc"
|
||||||
|
const ZhaTopicType TopicType = "zha"
|
||||||
|
const ZwaveTopicType TopicType = "zwave"
|
||||||
|
const ZwaveSceneTopicType TopicType = "zwave-scene"
|
||||||
|
|
||||||
for msg := range ch {
|
func (t TopicType) GetTopic(id string) string {
|
||||||
|
switch t {
|
||||||
|
case StateTopicType:
|
||||||
|
return fmt.Sprintf("homeassistant.%s.%s.>", t, id)
|
||||||
|
case TimerTopicType:
|
||||||
|
return fmt.Sprintf("homeassistant.%s.%s.finished", t, id)
|
||||||
|
case ScheduleTopicType, CommandTopicType:
|
||||||
|
return fmt.Sprintf("%s.%s", t, id)
|
||||||
|
case NfcTopicType, ZhaTopicType, ZwaveTopicType, ZwaveSceneTopicType:
|
||||||
|
return fmt.Sprintf("homeassistant.%s.%s", t, id)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NatsConnection) addSub(topic string, cb nats.MsgHandler) (*nats.Subscription, error) {
|
||||||
|
return n.Conn.Subscribe(topic, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseEvent(msg *nats.Msg, sendResponse bool) (ha.EventData, error) {
|
||||||
|
if sendResponse {
|
||||||
go msg.Respond([]byte("got it"))
|
go msg.Respond([]byte("got it"))
|
||||||
var data ha.EventData
|
}
|
||||||
err = json.Unmarshal(msg.Data, &data)
|
var data ha.EventData
|
||||||
|
err := json.Unmarshal(msg.Data, &data)
|
||||||
|
return data, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NatsConnection) AddEventSub(t TopicType, entityId string, handler func(ha.EventData) error) (*nats.Subscription, error) {
|
||||||
|
if t == "" || entityId == "" {
|
||||||
|
panic("TopicType and entityId cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := t.GetTopic(entityId)
|
||||||
|
l := n.logger.With("topic", topic, "entity_id", entityId)
|
||||||
|
l.Debug("Subscribing to topic")
|
||||||
|
|
||||||
|
return n.addSub(topic, func(msg *nats.Msg) {
|
||||||
|
data, err := parseEvent(msg, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error("Error parsing message", "error", err)
|
l.Error("Error parsing message", "error", err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
l.Debug("Event state " + data.NewState.State)
|
l.Debug(fmt.Sprintf("%s data event fired", t))
|
||||||
|
err = handler(data)
|
||||||
|
if err != nil {
|
||||||
|
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NatsConnection) AddStateSub(t TopicType, entityId string, handler func(ha.StateData) error) (*nats.Subscription, error) {
|
||||||
|
if t == "" || entityId == "" {
|
||||||
|
panic("TopicType and entityId cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := t.GetTopic(entityId)
|
||||||
|
l := n.logger.With("topic", topic, "entity_id", entityId)
|
||||||
|
l.Debug("Subscribing to topic")
|
||||||
|
|
||||||
|
return n.addSub(topic, func(msg *nats.Msg) {
|
||||||
|
data, err := parseEvent(msg, true)
|
||||||
|
if err != nil {
|
||||||
|
l.Error("Error parsing message", "error", err, "message", string(msg.Data))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
l.Debug(fmt.Sprintf("%s state event fired", t), "newState", data.NewState.State)
|
||||||
err = handler(data.NewState)
|
err = handler(data.NewState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error("Error handling state event", "error", err)
|
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericTimerSubscriber(logger *slog.Logger, timerName string, handler func() error) {
|
|
||||||
if timerName == "" {
|
|
||||||
panic(errors.New("timer name cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("homeassistant.%s.finished", timerName)
|
|
||||||
l := logger.With("topic", topic, "timer", timerName)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
l.Debug("Timer ended", "name", timerName)
|
|
||||||
err = handler()
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling timer event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericScheduleSubscriber(logger *slog.Logger, scheduleName string, handler func() error) {
|
|
||||||
if scheduleName == "" {
|
|
||||||
panic(errors.New("schedule name cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("schedules.%s", scheduleName)
|
|
||||||
l := logger.With("topic", topic, "schedule", scheduleName)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
l.Debug("Schedule fired", "name", scheduleName)
|
|
||||||
err = handler()
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling schedule event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericCommandSubscriber(logger *slog.Logger, commandName string, handler func([]byte) error) {
|
|
||||||
if commandName == "" {
|
|
||||||
panic(errors.New("command name cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("command.%s", commandName)
|
|
||||||
l := logger.With("topic", topic, "command", commandName)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
l.Debug("Command fired")
|
|
||||||
err = handler(msg.Data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling command event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericNfcSubscriber(logger *slog.Logger, tagId string, handler func([]byte) error) {
|
|
||||||
if tagId == "" {
|
|
||||||
panic(errors.New("tag ID cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("homeassistant.nfc.%s", tagId)
|
|
||||||
l := logger.With("topic", topic, "tagId", tagId)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
l.Debug("NFC tag scanned")
|
|
||||||
err = handler(msg.Data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling NFC event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericZhaSubscriber(logger *slog.Logger, deviceIeee string, handler func(ha.EventData) error) {
|
|
||||||
if deviceIeee == "" {
|
|
||||||
panic(errors.New("device IEEE cannot be empty"))
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("homeassistant.zha.%s", deviceIeee)
|
|
||||||
l := logger.With("topic", topic, "ieee", deviceIeee)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
var data ha.EventData
|
|
||||||
err = json.Unmarshal(msg.Data, &data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error parsing message", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
l.Debug("ZHA event fired")
|
|
||||||
err = handler(data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling ZHA event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NatsConnection) GenericZwaveLSubscriber(logger *slog.Logger, deviceId string, scene bool, handler func(ha.EventData) error) {
|
|
||||||
if deviceId == "" {
|
|
||||||
panic(errors.New("device ID cannot be empty"))
|
|
||||||
}
|
|
||||||
scenePart := ""
|
|
||||||
if scene {
|
|
||||||
scenePart = "-scene"
|
|
||||||
}
|
|
||||||
topic := fmt.Sprintf("homeassistant.zwave%s.%s", scenePart, deviceId)
|
|
||||||
l := logger.With("topic", topic, "deviceId", deviceId)
|
|
||||||
l.Debug("Subscribing to topic")
|
|
||||||
sub, ch, err := n.Subscribe(topic)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error subscribing to topic", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
for msg := range ch {
|
|
||||||
go msg.Respond([]byte("got it"))
|
|
||||||
var data ha.EventData
|
|
||||||
err = json.Unmarshal(msg.Data, &data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error parsing message", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
l.Debug("Zwave event fired")
|
|
||||||
err = handler(data)
|
|
||||||
if err != nil {
|
|
||||||
l.Error("Error handling Zwave event", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue