1
0
Fork 0
hats/pkg/nats/client.go

119 lines
2.8 KiB
Go

package nats
import (
"context"
"errors"
"fmt"
"strings"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type NatsConnection struct {
HostName string
Port string
UseJetstream bool
connOpts []nats.Option
jetOpts []jetstream.JetStreamOpt
Conn *nats.Conn
JS jetstream.JetStream
}
func DefaultNatsConnection() *NatsConnection {
return &NatsConnection{
HostName: "127.0.0.1",
Port: "4222",
UseJetstream: true,
connOpts: []nats.Option{},
jetOpts: []jetstream.JetStreamOpt{},
}
}
func (n *NatsConnection) WithHostName(hostname string) *NatsConnection {
n.HostName = hostname
return n
}
func (n *NatsConnection) WithPort(port string) *NatsConnection {
n.Port = port
return n
}
func (n *NatsConnection) WithJetstream(jetstream bool) *NatsConnection {
n.UseJetstream = jetstream
return n
}
func (n *NatsConnection) WithConnectionOption(opt nats.Option) *NatsConnection {
n.connOpts = append(n.connOpts, opt)
return n
}
func (n *NatsConnection) WithJetstreamOption(opt jetstream.JetStreamOpt) *NatsConnection {
n.jetOpts = append(n.jetOpts, opt)
return n
}
func (n *NatsConnection) Connect() error {
var err error
n.Conn, err = nats.Connect(fmt.Sprintf("nats://%s:%s", n.HostName, n.Port), n.connOpts...)
if err != nil {
return err
}
if n.UseJetstream {
n.JS, err = jetstream.New(n.Conn, n.jetOpts...)
if err != nil {
return err
}
}
return nil
}
func (n *NatsConnection) Close() {
if n.Conn != nil {
n.Conn.Close()
}
}
func (n *NatsConnection) Publish(ctx context.Context, subject string, payload []byte, opts ...jetstream.PublishOpt) {
if n.UseJetstream {
n.JS.PublishAsync(subject, payload, opts...)
// n.JS.Publish(ctx, subject, payload, opts...)
} else {
n.Conn.Publish(subject, payload)
}
}
func (n *NatsConnection) Subscribe(subject string) (sub *nats.Subscription, ch chan *nats.Msg, err error) {
if !n.UseJetstream {
ch = make(chan *nats.Msg, 64)
sub, err = n.Conn.ChanSubscribe(subject, ch)
return sub, ch, err
}
return nil, nil, errors.New("jetstream in use, you should use Stream instead")
}
func (n *NatsConnection) Stream(ctx context.Context, subject string) (stream jetstream.Stream, consumer jetstream.Consumer, err error) {
if n.UseJetstream {
stream, err = n.JS.CreateStream(ctx, jetstream.StreamConfig{
Name: strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(subject, ".", "_"), "*", "any"), ">", "arrow"),
Subjects: []string{subject},
})
if err != nil {
return nil, nil, err
}
consumer, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{})
if err != nil {
return nil, nil, err
}
return stream, consumer, nil
}
return nil, nil, errors.New("jetstream not in use, you should use Subscribe instead")
}