1
0
Fork 0
hats/internal/nats/client.go

115 lines
2.4 KiB
Go

package nats
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"code.jhot.me/jhot/hats/pkg/config"
n "code.jhot.me/jhot/hats/pkg/nats"
"github.com/nats-io/nats.go/jetstream"
)
var (
cfg *config.HatsConfig
client *n.NatsConnection
kv jetstream.KeyValue
ctx context.Context
logger *slog.Logger
)
func Close() {
client.Close()
}
func JetstreamConnect(parentContext context.Context, parentLogger *slog.Logger, parentConfig *config.HatsConfig) error {
ctx = parentContext
logger = parentLogger
cfg = parentConfig
var err error
client = n.New(n.WithHostName(cfg.NatsHost), n.WithPort(cfg.NatsPort), n.WithClientName(cfg.NatsClientName), n.WithToken(cfg.NatsToken))
logger.Debug("Connecting to nats")
err = client.Connect()
if err != nil {
return fmt.Errorf("%w: error connecting to nats server", err)
}
return nil
}
func KvConnect() error {
if client.JS == nil {
return errors.New("jetstream must be connected first")
}
logger.Debug("Looking for KV store")
listener := client.JS.KeyValueStoreNames(ctx)
found := false
for name := range listener.Name() {
if name == "KV_hats" {
found = true
}
}
var err error
if found {
logger.Debug("Connecting to KV store")
kv, err = client.JS.KeyValue(ctx, "hats")
} else {
logger.Debug("Creating KV store")
kv, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "hats",
TTL: 2 * time.Hour,
})
}
return err
}
func Publish(subject string, message []byte) {
client.Publish(ctx, subject, message, jetstream.WithRetryAttempts(2), jetstream.WithRetryWait(500*time.Millisecond))
}
func PublishString(subject, message string) {
Publish(subject, []byte(message))
}
func PublishRequest(subject string, message []byte, timeout time.Duration, retries int) {
attempts := 0
for {
attempts += 1
if attempts > retries {
logger.Warn("Request retries exceeded", "subject", subject)
return
}
resp, err := client.Conn.Request(subject, message, timeout)
if err == nil {
logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts, "subject", subject)
return
}
}
}
func GetKeyValue(key string) ([]byte, error) {
value, err := kv.Get(ctx, key)
if err != nil {
return []byte{}, err
}
return value.Value(), nil
}
func SetKeyValue(key string, value []byte) error {
_, err := kv.Put(ctx, key, value)
return err
}
func SetKeyValueString(key, value string) error {
_, err := kv.PutString(ctx, key, value)
return err
}