103 lines
2.1 KiB
Go
103 lines
2.1 KiB
Go
|
package nats
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log/slog"
|
||
|
"time"
|
||
|
|
||
|
"code.jhot.me/jhot/hats/pkg/config"
|
||
|
n "code.jhot.me/jhot/hats/pkg/nats"
|
||
|
"github.com/nats-io/nats.go"
|
||
|
"github.com/nats-io/nats.go/jetstream"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
cfg *config.HatsConfig
|
||
|
client *n.NatsConnection
|
||
|
kv jetstream.KeyValue
|
||
|
ctx context.Context
|
||
|
logger *slog.Logger
|
||
|
)
|
||
|
|
||
|
func Close() {
|
||
|
client.Close()
|
||
|
}
|
||
|
|
||
|
func JetstreamConnect(parentContext context.Context, parentLogger *slog.Logger) error {
|
||
|
ctx = parentContext
|
||
|
logger = parentLogger
|
||
|
cfg = config.FromEnvironment()
|
||
|
var err error
|
||
|
|
||
|
client = n.DefaultNatsConnection().WithHostName(cfg.NatsHost).WithPort(cfg.NatsPort).WithConnectionOption(nats.Name(cfg.NatsClientName))
|
||
|
|
||
|
if cfg.NatsToken != "" {
|
||
|
client.WithConnectionOption(nats.Token(cfg.NatsToken))
|
||
|
}
|
||
|
|
||
|
logger.Debug("Connecting to nats")
|
||
|
err = client.Connect()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("%w: error connecting to nats server", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func KvConnect() error {
|
||
|
if client.JS == nil {
|
||
|
return errors.New("jetstream must be connected first")
|
||
|
}
|
||
|
|
||
|
logger.Debug("Looking for KV store")
|
||
|
listener := client.JS.KeyValueStoreNames(ctx)
|
||
|
found := false
|
||
|
for name := range listener.Name() {
|
||
|
if name == "KV_hats" {
|
||
|
found = true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var err error
|
||
|
if found {
|
||
|
logger.Debug("Connecting for KV store")
|
||
|
kv, err = client.JS.KeyValue(ctx, "hats")
|
||
|
} else {
|
||
|
logger.Debug("Creating for KV store")
|
||
|
kv, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{
|
||
|
Bucket: "hats",
|
||
|
TTL: 2 * time.Hour,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func Publish(subject string, message []byte) {
|
||
|
client.Publish(ctx, subject, message, jetstream.WithRetryAttempts(2), jetstream.WithRetryWait(500*time.Millisecond))
|
||
|
}
|
||
|
|
||
|
func PublishString(subject, message string) {
|
||
|
Publish(subject, []byte(message))
|
||
|
}
|
||
|
|
||
|
func GetKeyValue(key string) ([]byte, error) {
|
||
|
value, err := kv.Get(ctx, key)
|
||
|
if err != nil {
|
||
|
return []byte{}, err
|
||
|
}
|
||
|
return value.Value(), nil
|
||
|
}
|
||
|
|
||
|
func SetKeyValue(key string, value []byte) error {
|
||
|
_, err := kv.Put(ctx, key, value)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func SetKeyValueString(key, value string) error {
|
||
|
_, err := kv.PutString(ctx, key, value)
|
||
|
return err
|
||
|
}
|