diff --git a/internal/homeassistant/subscriber.go b/internal/homeassistant/subscriber.go index 4fa4a13..fed8e44 100644 --- a/internal/homeassistant/subscriber.go +++ b/internal/homeassistant/subscriber.go @@ -27,6 +27,7 @@ const ( nfcEventId = 1003 zwaveEventId = 1004 timerFinishedEventId = 1005 + defaultTimeout = 250 * time.Millisecond ) func CloseSubscription() error { @@ -132,32 +133,34 @@ func handleMessages() { return } case ha.MessageType.Event: - logger.Debug("Event received", "event", message.Event) - switch message.Id { - case stateChangeEventId: - data, marshallErr := json.Marshal(message.Event.Data) - if marshallErr != nil { - logger.Error("Error marshalling event data", "error", marshallErr) + go func() { + logger.Debug("Event received", "event", message.Event) + switch message.Id { + case stateChangeEventId: + data, marshallErr := json.Marshal(message.Event.Data) + if marshallErr != nil { + logger.Error("Error marshalling event data", "error", marshallErr) + } + go nats.SetKeyValueString(fmt.Sprintf("homeassistant.states.%s", message.Event.Data.EntityId), message.Event.Data.NewState.State) + if message.Event.Data.NewState.State == message.Event.Data.OldState.State { + logger.Debug("State unchanged, publishing to attributes topic") + nats.PublishRequest(fmt.Sprintf("homeassistant.attributues.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data, defaultTimeout, 2) + } else { + nats.Publish(fmt.Sprintf("homeassistant.states.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data) + } + case zhaEventId: + data, _ := json.Marshal(message.Event.Data) + nats.Publish(fmt.Sprintf("homeassistant.zha.%s", message.Event.Data.DeviceIeee), data) + case nfcEventId: + data, _ := json.Marshal(message.Event.Data) + nats.PublishRequest(fmt.Sprintf("homeassistant.nfc.%s", message.Event.Data.TagId), data, defaultTimeout, 3) + case zwaveEventId: + data, _ := json.Marshal(message.Event.Data) + nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data) + case timerFinishedEventId: + nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), defaultTimeout, 3) } - if message.Event.Data.NewState.State == message.Event.Data.OldState.State { - logger.Debug("State unchanged, publishing to attributes topic") - nats.Publish(fmt.Sprintf("homeassistant.attributues.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data) - } else { - nats.Publish(fmt.Sprintf("homeassistant.states.%s.%s", message.Event.Data.EntityId, message.Event.Data.NewState.State), data) - } - nats.SetKeyValueString(fmt.Sprintf("homeassistant.states.%s", message.Event.Data.EntityId), message.Event.Data.NewState.State) - case zhaEventId: - data, _ := json.Marshal(message.Event.Data) - nats.Publish(fmt.Sprintf("homeassistant.zha.%s", message.Event.Data.DeviceIeee), data) - case nfcEventId: - data, _ := json.Marshal(message.Event.Data) - nats.Publish(fmt.Sprintf("homeassistant.nfc.%s", message.Event.Data.TagId), data) - case zwaveEventId: - data, _ := json.Marshal(message.Event.Data) - nats.Publish(fmt.Sprintf("homeassistant.zwave-scene.%s", message.Event.Data.DeviceId), data) - case timerFinishedEventId: - go nats.PublishRequest(fmt.Sprintf("homeassistant.%s.finished", message.Event.Data.EntityId), []byte("finished"), 500*time.Millisecond, 3) - } + }() } } }() diff --git a/internal/nats/client.go b/internal/nats/client.go index db341f8..5e61c7e 100644 --- a/internal/nats/client.go +++ b/internal/nats/client.go @@ -94,7 +94,7 @@ func PublishRequest(subject string, message []byte, timeout time.Duration, retri resp, err := client.Conn.Request(subject, message, timeout) if err == nil { - logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts) + logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts, "subject", subject) return } } diff --git a/internal/nats/schedule.go b/internal/nats/schedule.go index d54786f..9ce64f5 100644 --- a/internal/nats/schedule.go +++ b/internal/nats/schedule.go @@ -14,7 +14,7 @@ var ( schedules = map[string]*gocron.Job{} fireSchedule = func(name string) { - PublishString("schedules."+name, "fired") + PublishRequest("schedules."+name, []byte("fired"), 500*time.Millisecond, 3) } ) diff --git a/pkg/nats/subscribers.go b/pkg/nats/subscribers.go index b5a685e..9b2c5f0 100644 --- a/pkg/nats/subscribers.go +++ b/pkg/nats/subscribers.go @@ -25,7 +25,7 @@ func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId st defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) var data ha.EventData err = json.Unmarshal(msg.Data, &data) if err != nil { @@ -41,7 +41,7 @@ func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId st } } -func (n *NatsConnection) GenericTimerListener(logger *slog.Logger, timerName string, handler func() error) { +func (n *NatsConnection) GenericTimerSubscriber(logger *slog.Logger, timerName string, handler func() error) { if timerName == "" { panic(errors.New("timer name cannot be empty")) } @@ -83,7 +83,7 @@ func (n *NatsConnection) GenericScheduleSubscriber(logger *slog.Logger, schedule defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) l.Debug("Schedule fired", "name", scheduleName) err = handler() if err != nil { @@ -109,7 +109,7 @@ func (n *NatsConnection) GenericCommandSubscriber(logger *slog.Logger, commandNa defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) l.Debug("Command fired") err = handler(msg.Data) if err != nil { @@ -135,7 +135,7 @@ func (n *NatsConnection) GenericNfcSubscriber(logger *slog.Logger, tagId string, defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) l.Debug("NFC tag scanned") err = handler(msg.Data) if err != nil { @@ -161,7 +161,7 @@ func (n *NatsConnection) GenericZhaSubscriber(logger *slog.Logger, deviceIeee st defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) var data ha.EventData err = json.Unmarshal(msg.Data, &data) if err != nil { @@ -197,7 +197,7 @@ func (n *NatsConnection) GenericZwaveLSubscriber(logger *slog.Logger, deviceId s defer sub.Unsubscribe() for msg := range ch { - go msg.Ack() + go msg.Respond([]byte("got it")) var data ha.EventData err = json.Unmarshal(msg.Data, &data) if err != nil {