2023-10-12 17:23:35 +00:00
|
|
|
package nats
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"log/slog"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"code.jhot.me/jhot/hats/pkg/config"
|
|
|
|
n "code.jhot.me/jhot/hats/pkg/nats"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
cfg *config.HatsConfig
|
|
|
|
client *n.NatsConnection
|
|
|
|
kv jetstream.KeyValue
|
|
|
|
ctx context.Context
|
|
|
|
logger *slog.Logger
|
|
|
|
)
|
|
|
|
|
|
|
|
func Close() {
|
|
|
|
client.Close()
|
|
|
|
}
|
|
|
|
|
2023-12-20 18:29:23 +00:00
|
|
|
func JetstreamConnect(parentContext context.Context, parentLogger *slog.Logger, parentConfig *config.HatsConfig) error {
|
2023-10-12 17:23:35 +00:00
|
|
|
ctx = parentContext
|
|
|
|
logger = parentLogger
|
2023-12-20 18:29:23 +00:00
|
|
|
cfg = parentConfig
|
2023-10-12 17:23:35 +00:00
|
|
|
var err error
|
|
|
|
|
|
|
|
client = n.DefaultNatsConnection().WithHostName(cfg.NatsHost).WithPort(cfg.NatsPort).WithConnectionOption(nats.Name(cfg.NatsClientName))
|
|
|
|
|
|
|
|
if cfg.NatsToken != "" {
|
|
|
|
client.WithConnectionOption(nats.Token(cfg.NatsToken))
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Debug("Connecting to nats")
|
|
|
|
err = client.Connect()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("%w: error connecting to nats server", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func KvConnect() error {
|
|
|
|
if client.JS == nil {
|
|
|
|
return errors.New("jetstream must be connected first")
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Debug("Looking for KV store")
|
|
|
|
listener := client.JS.KeyValueStoreNames(ctx)
|
|
|
|
found := false
|
|
|
|
for name := range listener.Name() {
|
|
|
|
if name == "KV_hats" {
|
|
|
|
found = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
if found {
|
2023-10-13 20:31:23 +00:00
|
|
|
logger.Debug("Connecting to KV store")
|
2023-10-12 17:23:35 +00:00
|
|
|
kv, err = client.JS.KeyValue(ctx, "hats")
|
|
|
|
} else {
|
2023-10-13 20:31:23 +00:00
|
|
|
logger.Debug("Creating KV store")
|
2023-10-12 17:23:35 +00:00
|
|
|
kv, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{
|
|
|
|
Bucket: "hats",
|
|
|
|
TTL: 2 * time.Hour,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func Publish(subject string, message []byte) {
|
|
|
|
client.Publish(ctx, subject, message, jetstream.WithRetryAttempts(2), jetstream.WithRetryWait(500*time.Millisecond))
|
|
|
|
}
|
|
|
|
|
|
|
|
func PublishString(subject, message string) {
|
|
|
|
Publish(subject, []byte(message))
|
|
|
|
}
|
|
|
|
|
2023-12-07 05:19:52 +00:00
|
|
|
func PublishRequest(subject string, message []byte, timeout time.Duration, retries int) {
|
|
|
|
attempts := 0
|
|
|
|
for {
|
|
|
|
attempts += 1
|
|
|
|
if attempts > retries {
|
2023-12-08 23:37:54 +00:00
|
|
|
logger.Warn("Request retries exceeded", "subject", subject)
|
2023-12-07 05:25:13 +00:00
|
|
|
return
|
2023-12-07 05:19:52 +00:00
|
|
|
}
|
|
|
|
resp, err := client.Conn.Request(subject, message, timeout)
|
|
|
|
|
|
|
|
if err == nil {
|
2023-12-07 19:39:48 +00:00
|
|
|
logger.Debug("Request response received", "response", string(resp.Data), "attempts", attempts, "subject", subject)
|
2023-12-07 05:25:13 +00:00
|
|
|
return
|
2023-12-07 05:19:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-12 17:23:35 +00:00
|
|
|
func GetKeyValue(key string) ([]byte, error) {
|
|
|
|
value, err := kv.Get(ctx, key)
|
|
|
|
if err != nil {
|
|
|
|
return []byte{}, err
|
|
|
|
}
|
|
|
|
return value.Value(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetKeyValue(key string, value []byte) error {
|
|
|
|
_, err := kv.Put(ctx, key, value)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func SetKeyValueString(key, value string) error {
|
|
|
|
_, err := kv.PutString(ctx, key, value)
|
|
|
|
return err
|
|
|
|
}
|