1
0
Fork 0
hats/internal/nats/client.go

103 lines
2.1 KiB
Go
Raw Permalink Normal View History

2023-10-12 17:23:35 +00:00
package nats
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"code.jhot.me/jhot/hats/pkg/config"
n "code.jhot.me/jhot/hats/pkg/nats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
cfg *config.HatsConfig
client *n.NatsConnection
kv jetstream.KeyValue
ctx context.Context
logger *slog.Logger
)
func Close() {
client.Close()
}
func JetstreamConnect(parentContext context.Context, parentLogger *slog.Logger) error {
ctx = parentContext
logger = parentLogger
cfg = config.FromEnvironment()
var err error
client = n.DefaultNatsConnection().WithHostName(cfg.NatsHost).WithPort(cfg.NatsPort).WithConnectionOption(nats.Name(cfg.NatsClientName))
if cfg.NatsToken != "" {
client.WithConnectionOption(nats.Token(cfg.NatsToken))
}
logger.Debug("Connecting to nats")
err = client.Connect()
if err != nil {
return fmt.Errorf("%w: error connecting to nats server", err)
}
return nil
}
func KvConnect() error {
if client.JS == nil {
return errors.New("jetstream must be connected first")
}
logger.Debug("Looking for KV store")
listener := client.JS.KeyValueStoreNames(ctx)
found := false
for name := range listener.Name() {
if name == "KV_hats" {
found = true
}
}
var err error
if found {
logger.Debug("Connecting for KV store")
kv, err = client.JS.KeyValue(ctx, "hats")
} else {
logger.Debug("Creating for KV store")
kv, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "hats",
TTL: 2 * time.Hour,
})
}
return err
}
func Publish(subject string, message []byte) {
client.Publish(ctx, subject, message, jetstream.WithRetryAttempts(2), jetstream.WithRetryWait(500*time.Millisecond))
}
func PublishString(subject, message string) {
Publish(subject, []byte(message))
}
func GetKeyValue(key string) ([]byte, error) {
value, err := kv.Get(ctx, key)
if err != nil {
return []byte{}, err
}
return value.Value(), nil
}
func SetKeyValue(key string, value []byte) error {
_, err := kv.Put(ctx, key, value)
return err
}
func SetKeyValueString(key, value string) error {
_, err := kv.PutString(ctx, key, value)
return err
}