package nats import ( "errors" "time" "github.com/go-co-op/gocron" "github.com/nats-io/nats.go/jetstream" ) var ( scheduleStore jetstream.KeyValue scheduler = gocron.NewScheduler(time.Local) schedules = map[string]*gocron.Job{} fireSchedule = func(name string) { PublishRequest("schedules."+name, []byte("fired"), 500*time.Millisecond, 3) } ) func ScheduleStoreConnect() error { if client.JS == nil { return errors.New("jetstream must be connected first") } logger.Debug("Looking for schedule KV store") listener := client.JS.KeyValueStoreNames(ctx) found := false for name := range listener.Name() { if name == "KV_hats_schedules" { found = true } } var err error if found { logger.Debug("Connecting to Schedules KV store") scheduleStore, err = client.JS.KeyValue(ctx, "hats_schedules") } else { logger.Debug("Creating Schedules KV store") scheduleStore, err = client.JS.CreateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: "hats_schedules", }) } return err } func GetExistingSchedules() { scheduler.StartAsync() existing, _ := scheduleStore.Keys(ctx, jetstream.IgnoreDeletes()) for _, name := range existing { sched, err := GetSchedule(name) if err != nil { continue } sched.Activate() } } type HatsSchedule struct { Name string Cron string } func NewSchedule(name, cron string) *HatsSchedule { return &HatsSchedule{ Name: name, Cron: cron, } } func GetSchedule(name string) (*HatsSchedule, error) { value, err := scheduleStore.Get(ctx, name) if err != nil { return nil, err } return NewSchedule(name, string(value.Value())), nil } func (t *HatsSchedule) GetNext() string { if job, exists := schedules[t.Name]; exists { return job.NextRun().String() } return "" } func (t *HatsSchedule) Activate() error { job, err := scheduler.CronWithSeconds(t.Cron).Do(fireSchedule, t.Name) if err != nil { return err } if existing, found := schedules[t.Name]; found { scheduler.RemoveByID(existing) } schedules[t.Name] = job scheduleStore.PutString(ctx, t.Name, t.Cron) return nil } func (t *HatsSchedule) Cancel() { if job, exists := schedules[t.Name]; exists { scheduler.RemoveByID(job) } scheduleStore.Purge(ctx, t.Name) } func StopSchedules() { if scheduler != nil { scheduler.Stop() } }