1
0
Fork 0
hats/internal/nats/schedule.go

114 lines
2.3 KiB
Go

package nats
import (
"errors"
"time"
"github.com/go-co-op/gocron"
"github.com/nats-io/nats.go/jetstream"
)
var (
scheduleStore jetstream.KeyValue
scheduler = gocron.NewScheduler(time.Local)
schedules = map[string]*gocron.Job{}
fireSchedule = func(name string) {
PublishRequest("schedules."+name, []byte("fired"), 500*time.Millisecond, 3)
}
)
func ScheduleStoreConnect() error {
if client.JS == nil {
return errors.New("jetstream must be connected first")
}
logger.Debug("Looking for schedule KV store")
listener := client.JS.KeyValueStoreNames(ctx)
found := false
for name := range listener.Name() {
if name == "KV_hats_schedules" {
found = true
}
}
var err error
if found {
logger.Debug("Connecting to Schedules KV store")
scheduleStore, err = client.JS.KeyValue(ctx, "hats_schedules")
} else {
logger.Debug("Creating Schedules KV store")
scheduleStore, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "hats_schedules",
})
}
return err
}
func GetExistingSchedules() {
scheduler.StartAsync()
existing, _ := scheduleStore.Keys(ctx, jetstream.IgnoreDeletes())
for _, name := range existing {
sched, err := GetSchedule(name)
if err != nil {
continue
}
sched.Activate()
}
}
type HatsSchedule struct {
Name string
Cron string
}
func NewSchedule(name, cron string) *HatsSchedule {
return &HatsSchedule{
Name: name,
Cron: cron,
}
}
func GetSchedule(name string) (*HatsSchedule, error) {
value, err := scheduleStore.Get(ctx, name)
if err != nil {
return nil, err
}
return NewSchedule(name, string(value.Value())), nil
}
func (t *HatsSchedule) GetNext() string {
if job, exists := schedules[t.Name]; exists {
return job.NextRun().String()
}
return ""
}
func (t *HatsSchedule) Activate() error {
job, err := scheduler.CronWithSeconds(t.Cron).Do(fireSchedule, t.Name)
if err != nil {
return err
}
if existing, found := schedules[t.Name]; found {
scheduler.RemoveByID(existing)
}
schedules[t.Name] = job
scheduleStore.PutString(ctx, t.Name, t.Cron)
return nil
}
func (t *HatsSchedule) Cancel() {
if job, exists := schedules[t.Name]; exists {
scheduler.RemoveByID(job)
}
scheduleStore.Purge(ctx, t.Name)
}
func StopSchedules() {
if scheduler != nil {
scheduler.Stop()
}
}