1
0
Fork 0

More requests

main v0.16.1
Jordan Hotmann 2023-12-07 12:39:48 -07:00
parent fe87cd50c7
commit 653fb0e570
No known key found for this signature in database
GPG Key ID: 01B504170C2A2EA3
4 changed files with 37 additions and 34 deletions

View File

@ -27,6 +27,7 @@ const (
nfcEventId = 1003 nfcEventId = 1003
zwaveEventId = 1004 zwaveEventId = 1004
timerFinishedEventId = 1005 timerFinishedEventId = 1005
defaultTimeout = 250 * time.Millisecond
) )
func CloseSubscription() error { func CloseSubscription() error {
@ -132,6 +133,7 @@ func handleMessages() {
return return
} }
case ha.MessageType.Event: case ha.MessageType.Event:
go func() {
logger.Debug("Event received", "event", message.Event) logger.Debug("Event received", "event", message.Event)
switch message.Id { switch message.Id {
case stateChangeEventId: case stateChangeEventId:
@ -139,25 +141,26 @@ func handleMessages() {
if marshallErr != nil { if marshallErr != nil {
logger.Error("Error marshalling event data", "error", marshallErr) logger.Error("Error marshalling event data", "error", marshallErr)
} }
go nats.SetKeyValueString(fmt.Sprintf("homeassistant.states.%s", message.Event.Data.EntityId), message.Event.Data.NewState.State)
if message.Event.Data.NewState.State == message.Event.Data.OldState.State { if message.Event.Data.NewState.State == message.Event.Data.OldState.State {
logger.Debug("State unchanged, publishing to attributes topic") logger.Debug("State unchanged, publishing to attributes topic")
nats.Publish(fmt.Sprintf("homeassistant.attributues.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data) nats.PublishRequest(fmt.Sprintf("homeassistant.attributues.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data, defaultTimeout, 2)
} else { } else {
nats.Publish(fmt.Sprintf("homeassistant.states.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data) nats.Publish(fmt.Sprintf("homeassistant.states.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data)
} }
nats.SetKeyValueString(fmt.Sprintf("homeassistant.states.%s", message.Event.Data.EntityId), message.Event.Data.NewState.State)
case zhaEventId: case zhaEventId:
data, _ := json.Marshal(message.Event.Data) data, _ := json.Marshal(message.Event.Data)
nats.Publish(fmt.Sprintf("homeassistant.zha.%s", message.Event.Data.DeviceIeee), data) nats.Publish(fmt.Sprintf("homeassistant.zha.%s", message.Event.Data.DeviceIeee), data)
case nfcEventId: case nfcEventId:
data, _ := json.Marshal(message.Event.Data) data, _ := json.Marshal(message.Event.Data)
nats.Publish(fmt.Sprintf("homeassistant.nfc.%s", message.Event.Data.TagId), data) nats.PublishRequest(fmt.Sprintf("homeassistant.nfc.%s", message.Event.Data.TagId), data, defaultTimeout, 3)
case zwaveEventId: case zwaveEventId:
data, _ := json.Marshal(message.Event.Data) data, _ := json.Marshal(message.Event.Data)
nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data) nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data)
case timerFinishedEventId: case timerFinishedEventId:
go nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), 500*time.Millisecond, 3) nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3)
} }
}()
} }
} }
}() }()

View File

@ -94,7 +94,7 @@ func PublishRequest(subject string, message []byte, timeout time.Duration, retri
resp, err := client.Conn.Request(subject, message, timeout) resp, err := client.Conn.Request(subject, message, timeout)
if err == nil { if err == nil {
logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts) logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts, "subject", subject)
return return
} }
} }

View File

@ -14,7 +14,7 @@ var (
schedules = map[string]*gocron.Job{} schedules = map[string]*gocron.Job{}
fireSchedule = func(name string) { fireSchedule = func(name string) {
PublishString("schedules."+name, "fired") PublishRequest("schedules."+name, []byte("fired"), 500*time.Millisecond, 3)
} }
) )

View File

@ -25,7 +25,7 @@ func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId st
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
var data ha.EventData var data ha.EventData
err = json.Unmarshal(msg.Data, &data) err = json.Unmarshal(msg.Data, &data)
if err != nil { if err != nil {
@ -41,7 +41,7 @@ func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId st
} }
} }
func (n *NatsConnection) GenericTimerListener(logger *slog.Logger, timerName string, handler func() error) { func (n *NatsConnection) GenericTimerSubscriber(logger *slog.Logger, timerName string, handler func() error) {
if timerName == "" { if timerName == "" {
panic(errors.New("timer name cannot be empty")) panic(errors.New("timer name cannot be empty"))
} }
@ -83,7 +83,7 @@ func (n *NatsConnection) GenericScheduleSubscriber(logger *slog.Logger, schedule
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
l.Debug("Schedule fired", "name", scheduleName) l.Debug("Schedule fired", "name", scheduleName)
err = handler() err = handler()
if err != nil { if err != nil {
@ -109,7 +109,7 @@ func (n *NatsConnection) GenericCommandSubscriber(logger *slog.Logger, commandNa
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
l.Debug("Command fired") l.Debug("Command fired")
err = handler(msg.Data) err = handler(msg.Data)
if err != nil { if err != nil {
@ -135,7 +135,7 @@ func (n *NatsConnection) GenericNfcSubscriber(logger *slog.Logger, tagId string,
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
l.Debug("NFC tag scanned") l.Debug("NFC tag scanned")
err = handler(msg.Data) err = handler(msg.Data)
if err != nil { if err != nil {
@ -161,7 +161,7 @@ func (n *NatsConnection) GenericZhaSubscriber(logger *slog.Logger, deviceIeee st
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
var data ha.EventData var data ha.EventData
err = json.Unmarshal(msg.Data, &data) err = json.Unmarshal(msg.Data, &data)
if err != nil { if err != nil {
@ -197,7 +197,7 @@ func (n *NatsConnection) GenericZwaveLSubscriber(logger *slog.Logger, deviceId s
defer sub.Unsubscribe() defer sub.Unsubscribe()
for msg := range ch { for msg := range ch {
go msg.Ack() go msg.Respond([]byte("got it"))
var data ha.EventData var data ha.EventData
err = json.Unmarshal(msg.Data, &data) err = json.Unmarshal(msg.Data, &data)
if err != nil { if err != nil {