package scheduler import ( "context" "encoding/json" "fmt" "time" "github.com/go-co-op/gocron/v2" "go.mongodb.org/mongo-driver/bson/primitive" "scheduler-backend/internal/executor" "scheduler-backend/internal/jobdef" "scheduler-backend/internal/store/model" ) type JobConfigStore interface { ListEnabled(ctx context.Context) ([]model.JobConfig, error) UpdateRunState(ctx context.Context, id primitive.ObjectID, status string, lastRunAt *time.Time) error UpdateNextRunAt(ctx context.Context, id primitive.ObjectID, nextRunAt *time.Time) error } type JobExecutionStore interface { Create(ctx context.Context, item *model.JobExecution) error UpdateResult(ctx context.Context, item *model.JobExecution) error } type Service struct { scheduler gocron.Scheduler jobConfigStore JobConfigStore executionStore JobExecutionStore executor *executor.Service runtime jobdef.Runtime } func NewService( scheduler gocron.Scheduler, jobConfigStore JobConfigStore, executionStore JobExecutionStore, executor *executor.Service, runtime jobdef.Runtime, ) *Service { return &Service{ scheduler: scheduler, jobConfigStore: jobConfigStore, executionStore: executionStore, executor: executor, runtime: runtime, } } func (s *Service) RegisterEnabledJobs(ctx context.Context) error { items, err := s.jobConfigStore.ListEnabled(ctx) if err != nil { return err } for _, item := range items { if err := s.registerJob(ctx, item); err != nil { return err } } s.scheduler.Start() return nil } func (s *Service) Shutdown() error { return s.scheduler.Shutdown() } func (s *Service) registerJob(ctx context.Context, item model.JobConfig) error { switch item.ScheduleType { case "manual": return s.jobConfigStore.UpdateNextRunAt(ctx, item.ID, nil) case "cron": job, err := s.scheduler.NewJob( gocron.CronJob(item.ScheduleValue, false), gocron.NewTask(func() { s.runScheduledJob(context.Background(), item) }), ) if err != nil { return fmt.Errorf("register cron job %s: %w", item.Name, err) } next, err := job.NextRun() if err != nil { return err } return s.jobConfigStore.UpdateNextRunAt(ctx, item.ID, &next) case "interval": duration, err := time.ParseDuration(item.ScheduleValue) if err != nil { return fmt.Errorf("parse interval for %s: %w", item.Name, err) } job, err := s.scheduler.NewJob( gocron.DurationJob(duration), gocron.NewTask(func() { s.runScheduledJob(context.Background(), item) }), ) if err != nil { return fmt.Errorf("register interval job %s: %w", item.Name, err) } next, err := job.NextRun() if err != nil { return err } return s.jobConfigStore.UpdateNextRunAt(ctx, item.ID, &next) default: return fmt.Errorf("unsupported schedule type %s", item.ScheduleType) } } func (s *Service) runScheduledJob(ctx context.Context, job model.JobConfig) { now := time.Now() params := json.RawMessage(job.DefaultParams) record := model.JobExecution{ JobConfigID: job.ID, TriggerType: "schedule", Status: "running", ParamsSnapshot: string(params), StartedAt: &now, CreatedAt: now, } if err := s.executionStore.Create(ctx, &record); err != nil { s.runtime.Logger.Error("create scheduled execution failed", "jobID", job.ID.Hex(), "error", err) return } execReq := jobdef.ExecuteRequest{ ExecutionID: record.ID.Hex(), JobID: job.ID.Hex(), TriggerType: "schedule", Params: params, LogCollector: jobdef.NewLogCollector(), } runErr := s.executor.Execute(ctx, job.HandlerKey, execReq) finishedAt := time.Now() record.FinishedAt = &finishedAt record.DurationMs = finishedAt.Sub(now).Milliseconds() flushed := execReq.LogCollector.Flush(record.ID.Hex()) if runErr != nil { record.Status = "failed" record.ErrorMessage = runErr.Error() record.ResultSummary = "scheduled execution failed" if flushed.LogText != "" { record.LogText = flushed.LogText } else { record.LogText = runErr.Error() } record.LogFile = flushed.LogFile _ = s.jobConfigStore.UpdateRunState(ctx, job.ID, "failed", &finishedAt) } else { record.Status = "success" record.ResultSummary = "scheduled execution succeeded" if flushed.LogText != "" { record.LogText = flushed.LogText } else { record.LogText = "handler executed successfully" } record.LogFile = flushed.LogFile _ = s.jobConfigStore.UpdateRunState(ctx, job.ID, "success", &finishedAt) } if err := s.executionStore.UpdateResult(ctx, &record); err != nil { s.runtime.Logger.Error("update scheduled execution failed", "executionID", record.ID.Hex(), "error", err) } }