parent
1128814e2a
commit
148156ce8a
|
@ -49,6 +49,27 @@ func parseEvent(msg *nats.Msg, sendResponse bool) (ha.EventData, error) {
|
||||||
return data, err
|
return data, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddEventSub: Add a subscriber to the NATS client that receives the raw event payload in bytes
|
||||||
|
func (n *NatsConnection) AddRawSub(t TopicType, entityId string, handler func([]byte) error) (*nats.Subscription, error) {
|
||||||
|
if t == "" || entityId == "" {
|
||||||
|
panic("TopicType and entityId cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := t.GetTopic(entityId)
|
||||||
|
l := n.logger.With("topic", topic, "entity_id", entityId)
|
||||||
|
l.Debug("Subscribing to topic")
|
||||||
|
|
||||||
|
return n.addSub(topic, func(msg *nats.Msg) {
|
||||||
|
go msg.Respond([]byte("got it"))
|
||||||
|
l.Debug(fmt.Sprintf("%s event fired", t))
|
||||||
|
err := handler(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
l.Error(fmt.Sprintf("Error handling %s event", t), "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// AddEventSub: Add a subscriber to the NATS client that receives the full event payload
|
// AddEventSub: Add a subscriber to the NATS client that receives the full event payload
|
||||||
func (n *NatsConnection) AddEventSub(t TopicType, entityId string, handler func(ha.EventData) error) (*nats.Subscription, error) {
|
func (n *NatsConnection) AddEventSub(t TopicType, entityId string, handler func(ha.EventData) error) (*nats.Subscription, error) {
|
||||||
if t == "" || entityId == "" {
|
if t == "" || entityId == "" {
|
||||||
|
@ -110,6 +131,7 @@ func (n *NatsConnection) AddSub(t TopicType, entityId string, handler func() err
|
||||||
l.Debug("Subscribing to topic")
|
l.Debug("Subscribing to topic")
|
||||||
|
|
||||||
return n.addSub(topic, func(msg *nats.Msg) {
|
return n.addSub(topic, func(msg *nats.Msg) {
|
||||||
|
go msg.Respond([]byte("got it"))
|
||||||
l.Debug(fmt.Sprintf("%s event fired", t))
|
l.Debug(fmt.Sprintf("%s event fired", t))
|
||||||
err := handler()
|
err := handler()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue