From 6a4c1fb88fbdc4553a40bb3aac4ed3fe212cf04e Mon Sep 17 00:00:00 2001 From: Jordan Hotmann Date: Tue, 28 Nov 2023 09:47:11 -0700 Subject: [PATCH] Call multiple services and collect any errors --- pkg/homeassistant/rest.go | 34 ++++++++++++++++++++++++++++++++++ pkg/nats/subscribers.go | 26 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/pkg/homeassistant/rest.go b/pkg/homeassistant/rest.go index de81e6a..cee83ff 100644 --- a/pkg/homeassistant/rest.go +++ b/pkg/homeassistant/rest.go @@ -1,8 +1,10 @@ package homeassistant import ( + "errors" "fmt" "strings" + "sync" "github.com/go-resty/resty/v2" ) @@ -51,3 +53,35 @@ func (c *RestClient) CallServiceManual(domain string, entityId string, service s } return err } + +type CallServiceInput struct { + EntityID string + Service string + Extras map[string]any +} + +func (c *RestClient) CallServices(inputs ...*CallServiceInput) error { + errorChannel := make(chan error) + var wg sync.WaitGroup + + wg.Add(len(inputs)) + + go func() { + wg.Wait() + close(errorChannel) + }() + + for _, input := range inputs { + go func(input *CallServiceInput) { + defer wg.Done() + err := c.CallService(input.EntityID, input.Service, input.Extras) + errorChannel <- err + }(input) + } + + var returnErrors []error + for err := range errorChannel { + returnErrors = append(returnErrors, err) + } + return errors.Join(returnErrors...) +} diff --git a/pkg/nats/subscribers.go b/pkg/nats/subscribers.go index 37c3c4b..0d5a0c5 100644 --- a/pkg/nats/subscribers.go +++ b/pkg/nats/subscribers.go @@ -41,6 +41,32 @@ func (n *NatsConnection) GenericStateSubscriber(logger *slog.Logger, entityId st } } +func (n *NatsConnection) GenericTimerListener(logger *slog.Logger, timerName string, handler func() error) { + if timerName == "" { + panic(errors.New("timer name cannot be empty")) + } + topic := fmt.Sprintf("homeassistant.%s.finished", timerName) + l := logger.With("topic", topic, "timer", timerName) + l.Debug("Subscribing to topic") + sub, ch, err := n.Subscribe(topic) + if err != nil { + l.Error("Error subscribing to topic", "error", err) + return + } + + defer sub.Unsubscribe() + + for msg := range ch { + go msg.Ack() + l.Debug("Timer ended", "name", timerName) + err = handler() + if err != nil { + l.Error("Error handling timer event", "error", err) + continue + } + } +} + func (n *NatsConnection) GenericScheduleSubscriber(logger *slog.Logger, scheduleName string, handler func() error) { if scheduleName == "" { panic(errors.New("schedule name cannot be empty"))