diff --git a/.env b/.env index ca5f7c6..25a5f20 100644 --- a/.env +++ b/.env @@ -8,4 +8,5 @@ MONGO_AUTHSOURCE=admin MONGO_DATABASE=openim_v3 SCHEDULER_MONGO_DATABASE=scheduler_center REDIS_ADDR=127.0.0.1:6379 +REDIS_USERNAME= REDIS_PASSWORD= diff --git a/.env.example b/.env.example index 392e99b..e60a878 100644 --- a/.env.example +++ b/.env.example @@ -8,4 +8,5 @@ MONGO_AUTHSOURCE=admin MONGO_DATABASE=openim_v3 SCHEDULER_MONGO_DATABASE=scheduler_center REDIS_ADDR=127.0.0.1:6379 +REDIS_USERNAME= REDIS_PASSWORD= diff --git a/.env.local b/.env.local index 44dea80..6830b8a 100644 --- a/.env.local +++ b/.env.local @@ -7,3 +7,6 @@ MONGO_PASSWORD=rI57PJsJhnz_qlRkfnTa0RPT MONGO_AUTHSOURCE=admin MONGO_DATABASE=openim_v3 SCHEDULER_MONGO_DATABASE=scheduler_center_local +REDIS_ADDR=127.0.0.1:6379 +REDIS_USERNAME= +REDIS_PASSWORD= diff --git a/.env.prod b/.env.prod index 95b4b77..25a5f20 100644 --- a/.env.prod +++ b/.env.prod @@ -7,3 +7,6 @@ MONGO_PASSWORD=rI57PJsJhnz_qlRkfnTa0RPT MONGO_AUTHSOURCE=admin MONGO_DATABASE=openim_v3 SCHEDULER_MONGO_DATABASE=scheduler_center +REDIS_ADDR=127.0.0.1:6379 +REDIS_USERNAME= +REDIS_PASSWORD= diff --git a/README.md b/README.md index 3bb3997..5176373 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,9 @@ MONGO_PASSWORD= MONGO_AUTHSOURCE=admin MONGO_DATABASE=openim_v3 SCHEDULER_MONGO_DATABASE=scheduler_center +REDIS_ADDR=127.0.0.1:6379 +REDIS_USERNAME= +REDIS_PASSWORD= ``` ## 启动方式 diff --git a/api b/api index 8c7f5d9..9c36ca0 100755 Binary files a/api and b/api differ diff --git a/cmd/api/main.go b/cmd/api/main.go index a80f4b4..e0585fa 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -14,6 +14,7 @@ import ( storemongo "scheduler-backend/internal/store/mongo" "scheduler-backend/pkg/config" "scheduler-backend/pkg/log" + redisclient "scheduler-backend/pkg/redis" ) func main() { @@ -31,6 +32,11 @@ func main() { logger.Error("scheduler api mongo connect failed", "error", err) os.Exit(1) } + redisConn, err := redisclient.Connect(rootCtx, cfg) + if err != nil { + logger.Error("scheduler api redis connect failed", "error", err) + os.Exit(1) + } jobConfigStore := storemongo.NewJobConfigStore(databases.MetaDB) if err := jobdef.SyncJobConfigs(rootCtx, "job-config-list", jobConfigStore, logger); err != nil { logger.Error("sync job configs failed", "error", err) @@ -42,16 +48,18 @@ func main() { Logger: logger, MetaDB: databases.MetaDB, BusinessDB: databases.BusinessDB, + Redis: redisConn, }) router := api.NewRouter(api.RouterDeps{ - Registry: registry, - JobConfigStore: jobConfigStore, - ExecutionStore: storemongo.NewJobExecutionStore(databases.MetaDB), - ProfileStore: storemongo.NewAdminProfileStore(databases.MetaDB), - AdminUserStore: storemongo.NewAdminUserStore(databases.MetaDB), - ConfigStore: storemongo.NewSystemConfigStore(databases.MetaDB), - Executor: execSvc, + Registry: registry, + JobConfigStore: jobConfigStore, + ExecutionStore: storemongo.NewJobExecutionStore(databases.MetaDB), + ProfileStore: storemongo.NewAdminProfileStore(databases.MetaDB), + AdminUserStore: storemongo.NewAdminUserStore(databases.MetaDB), + ConfigStore: storemongo.NewSystemConfigStore(databases.MetaDB), + GlobalConfigStore: storemongo.NewGlobalConfigStore(databases.MetaDB), + Executor: execSvc, }) logger.Info("scheduler api starting", "addr", addr) @@ -62,6 +70,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + _ = redisConn.Close() _ = databases.Client.Disconnect(ctx) os.Exit(0) }() diff --git a/cmd/worker/main.go b/cmd/worker/main.go index c25ae69..8f49c4a 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -15,6 +15,7 @@ import ( storemongo "scheduler-backend/internal/store/mongo" "scheduler-backend/pkg/config" "scheduler-backend/pkg/log" + redisclient "scheduler-backend/pkg/redis" "github.com/go-co-op/gocron/v2" ) @@ -33,6 +34,11 @@ func main() { logger.Error("scheduler worker mongo connect failed", "error", err) os.Exit(1) } + redisConn, err := redisclient.Connect(rootCtx, cfg) + if err != nil { + logger.Error("scheduler worker redis connect failed", "error", err) + os.Exit(1) + } jobConfigStore := storemongo.NewJobConfigStore(databases.MetaDB) if err := jobdef.SyncJobConfigs(rootCtx, "job-config-list", jobConfigStore, logger); err != nil { @@ -45,6 +51,7 @@ func main() { Logger: logger, MetaDB: databases.MetaDB, BusinessDB: databases.BusinessDB, + Redis: redisConn, } execSvc := executor.NewService(registry, runtime) g, err := gocron.NewScheduler() @@ -99,6 +106,9 @@ func main() { if err := healthServer.Shutdown(shutdownCtx); err != nil { logger.Error("scheduler worker health server shutdown failed", "error", err) } + if err := redisConn.Close(); err != nil { + logger.Error("scheduler worker redis disconnect failed", "error", err) + } if err := databases.Client.Disconnect(context.Background()); err != nil { logger.Error("scheduler worker mongo disconnect failed", "error", err) } diff --git a/internal/api/router.go b/internal/api/router.go index f801fa3..1c9a8d5 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -51,6 +51,7 @@ type executionItem struct { ID string `json:"id"` JobName string `json:"jobName"` TriggerType string `json:"triggerType"` + ScheduleType string `json:"scheduleType,omitempty"` Status string `json:"status"` StartedAt string `json:"startedAt,omitempty"` FinishedAt string `json:"finishedAt,omitempty"` @@ -61,6 +62,7 @@ type executionDetailItem struct { ID string `json:"id"` JobConfigID string `json:"jobConfigId"` TriggerType string `json:"triggerType"` + ScheduleType string `json:"scheduleType,omitempty"` Status string `json:"status"` ParamsSnapshot string `json:"paramsSnapshot"` StartedAt string `json:"startedAt,omitempty"` @@ -133,6 +135,12 @@ type dashboardOverviewResponse struct { RecentExecutions []executionItem `json:"recentExecutions"` } +type globalConfigItem struct { + Config string `json:"config"` + UpdatedBy string `json:"updatedBy"` + UpdatedAt string `json:"updatedAt,omitempty"` +} + type JobConfigLister interface { List(ctx context.Context) ([]model.JobConfig, error) GetByID(ctx context.Context, id primitive.ObjectID) (*model.JobConfig, error) @@ -173,14 +181,20 @@ type SystemConfigStore interface { ToggleEnabled(ctx context.Context, id primitive.ObjectID, enabled bool) error } +type GlobalConfigStore interface { + Get(ctx context.Context, key string) (*model.GlobalConfig, error) + Upsert(ctx context.Context, item *model.GlobalConfig) error +} + type RouterDeps struct { - Registry *jobdef.Registry - JobConfigStore JobConfigLister - ExecutionStore JobExecutionLister - ProfileStore AdminProfileStore - AdminUserStore AdminUserStore - ConfigStore SystemConfigStore - Executor *executor.Service + Registry *jobdef.Registry + JobConfigStore JobConfigLister + ExecutionStore JobExecutionLister + ProfileStore AdminProfileStore + AdminUserStore AdminUserStore + ConfigStore SystemConfigStore + GlobalConfigStore GlobalConfigStore + Executor *executor.Service } type upsertJobRequest struct { @@ -243,6 +257,12 @@ type toggleSystemConfigRequest struct { Enabled bool `json:"enabled"` } +type updateGlobalConfigRequest struct { + Config json.RawMessage `json:"config" binding:"required"` +} + +const schedulerGlobalConfigKey = "scheduler" + func NewRouter(deps RouterDeps) *gin.Engine { router := gin.Default() router.Use(func(c *gin.Context) { @@ -579,6 +599,35 @@ func NewRouter(deps RouterDeps) *gin.Engine { "errorMessage": execRecord.ErrorMessage, }) }) + router.GET("/admin/scheduler/global-config", func(c *gin.Context) { + item, err := deps.GlobalConfigStore.Get(c.Request.Context(), schedulerGlobalConfigKey) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, toGlobalConfigItem(*item)) + }) + router.PUT("/admin/scheduler/global-config", func(c *gin.Context) { + var req updateGlobalConfigRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if !json.Valid(req.Config) { + c.JSON(http.StatusBadRequest, gin.H{"error": errInvalidJSON("config").Error()}) + return + } + item := &model.GlobalConfig{ + Key: schedulerGlobalConfigKey, + Config: string(req.Config), + UpdatedBy: "admin", + } + if err := deps.GlobalConfigStore.Upsert(c.Request.Context(), item); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, toGlobalConfigItem(*item)) + }) router.GET("/admin/scheduler/executions", func(c *gin.Context) { jobs, err := deps.JobConfigStore.List(c.Request.Context()) if err != nil { @@ -592,16 +641,20 @@ func NewRouter(deps RouterDeps) *gin.Engine { } jobNameMap := make(map[string]string, len(jobs)) + jobScheduleTypeMap := make(map[string]string, len(jobs)) for _, item := range jobs { jobNameMap[item.ID.Hex()] = item.Name + jobScheduleTypeMap[item.ID.Hex()] = item.ScheduleType } responseItems := make([]executionItem, 0, len(items)) for _, item := range items { + jobID := item.JobConfigID.Hex() responseItems = append(responseItems, executionItem{ ID: objectIDToString(item.ID), JobName: jobDisplayName(jobNameMap, item.JobConfigID), TriggerType: item.TriggerType, + ScheduleType: jobScheduleTypeMap[jobID], Status: item.Status, StartedAt: formatTime(item.StartedAt), FinishedAt: formatTime(item.FinishedAt), @@ -631,7 +684,17 @@ func NewRouter(deps RouterDeps) *gin.Engine { return } - c.JSON(http.StatusOK, toExecutionDetailItem(*item)) + detail := toExecutionDetailItem(*item) + job, err := deps.JobConfigStore.GetByID(c.Request.Context(), item.JobConfigID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + if job != nil { + detail.ScheduleType = job.ScheduleType + } + + c.JSON(http.StatusOK, detail) }) router.GET("/admin/scheduler/executions/:id/logfile", func(c *gin.Context) { id, err := primitive.ObjectIDFromHex(c.Param("id")) @@ -900,6 +963,14 @@ func toExecutionDetailItem(item model.JobExecution) executionDetailItem { } } +func toGlobalConfigItem(item model.GlobalConfig) globalConfigItem { + return globalConfigItem{ + Config: item.Config, + UpdatedBy: item.UpdatedBy, + UpdatedAt: formatTime(&item.UpdatedAt), + } +} + func toProfileItem(item model.AdminProfile) profileItem { return profileItem{ ID: objectIDToString(item.ID), diff --git a/internal/jobdef/handler.go b/internal/jobdef/handler.go index bb47afa..01ec87f 100644 --- a/internal/jobdef/handler.go +++ b/internal/jobdef/handler.go @@ -9,6 +9,7 @@ import ( "strings" "sync" + "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" "scheduler-backend/pkg/config" @@ -93,6 +94,7 @@ type Runtime struct { Logger log.Logger MetaDB *mongo.Database BusinessDB *mongo.Database + Redis *redis.Client } type Handler interface { diff --git a/internal/jobdef/url_rewrite_handler.go b/internal/jobdef/url_rewrite_handler.go index 483e666..a081b82 100644 --- a/internal/jobdef/url_rewrite_handler.go +++ b/internal/jobdef/url_rewrite_handler.go @@ -36,7 +36,9 @@ func (URLRewriteHandler) Run(ctx context.Context, runtime Runtime, req ExecuteRe redisCfg := urlrewrite.RedisConfig{ Addr: runtime.Config.RedisAddr, + Username: runtime.Config.RedisUsername, Password: runtime.Config.RedisPassword, + Client: runtime.Redis, } batchID, err := urlrewrite.Run(ctx, runtime.BusinessDB, params, redisCfg, logf) if err != nil { diff --git a/internal/store/model/global_config.go b/internal/store/model/global_config.go new file mode 100644 index 0000000..da21927 --- /dev/null +++ b/internal/store/model/global_config.go @@ -0,0 +1,15 @@ +package model + +import ( + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type GlobalConfig struct { + ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` + Key string `bson:"key" json:"key"` + Config string `bson:"config" json:"config"` + UpdatedBy string `bson:"updatedBy" json:"updatedBy"` + UpdatedAt time.Time `bson:"updatedAt" json:"updatedAt"` +} diff --git a/internal/store/mongo/global_config_store.go b/internal/store/mongo/global_config_store.go new file mode 100644 index 0000000..7e86c21 --- /dev/null +++ b/internal/store/mongo/global_config_store.go @@ -0,0 +1,86 @@ +package mongo + +import ( + "context" + "errors" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + mongodriver "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "scheduler-backend/internal/store/model" +) + +const schedulerGlobalConfigKey = "scheduler" +const defaultSchedulerGlobalConfigJSON = `{ + "maxConcurrentJobs": 5, + "defaultTimeoutSeconds": 300 +}` + +type GlobalConfigStore struct { + collection *mongodriver.Collection +} + +func NewGlobalConfigStore(db *mongodriver.Database) *GlobalConfigStore { + return &GlobalConfigStore{ + collection: db.Collection("global_config"), + } +} + +func (s *GlobalConfigStore) Get(ctx context.Context, key string) (*model.GlobalConfig, error) { + findCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + var item model.GlobalConfig + err := s.collection.FindOne(findCtx, bson.M{"key": key}).Decode(&item) + if err == nil { + return &item, nil + } + if !errors.Is(err, mongodriver.ErrNoDocuments) { + return nil, err + } + + item = model.GlobalConfig{ + ID: primitive.NewObjectID(), + Key: key, + Config: defaultConfigForKey(key), + UpdatedBy: "system", + UpdatedAt: time.Now(), + } + if _, err := s.collection.InsertOne(findCtx, item); err != nil { + return nil, err + } + return &item, nil +} + +func (s *GlobalConfigStore) Upsert(ctx context.Context, item *model.GlobalConfig) error { + updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + item.UpdatedAt = time.Now() + if item.UpdatedBy == "" { + item.UpdatedBy = "system" + } + if item.Key == "" { + item.Key = schedulerGlobalConfigKey + } + + _, err := s.collection.UpdateOne(updateCtx, bson.M{"key": item.Key}, bson.M{ + "$set": bson.M{ + "key": item.Key, + "config": item.Config, + "updatedBy": item.UpdatedBy, + "updatedAt": item.UpdatedAt, + }, + }, options.Update().SetUpsert(true)) + return err +} + +func defaultConfigForKey(key string) string { + if key == schedulerGlobalConfigKey { + return defaultSchedulerGlobalConfigJSON + } + return "{}" +} diff --git a/internal/urlrewrite/redis.go b/internal/urlrewrite/redis.go index 344ff1c..eb7ff81 100644 --- a/internal/urlrewrite/redis.go +++ b/internal/urlrewrite/redis.go @@ -6,9 +6,10 @@ import ( "github.com/redis/go-redis/v9" ) -func newRedisClient(ctx context.Context, addr, password string) (*redis.Client, error) { +func newRedisClient(ctx context.Context, addr, username, password string) (*redis.Client, error) { rdb := redis.NewClient(&redis.Options{ Addr: addr, + Username: username, Password: password, }) if err := rdb.Ping(ctx).Err(); err != nil { diff --git a/internal/urlrewrite/runner.go b/internal/urlrewrite/runner.go index b9f164e..5219fa9 100644 --- a/internal/urlrewrite/runner.go +++ b/internal/urlrewrite/runner.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -35,7 +36,9 @@ type Params struct { type RedisConfig struct { Addr string + Username string Password string + Client *redis.Client } type Summary struct { @@ -216,11 +219,19 @@ func invalidateBatchCache(ctx context.Context, db *mongo.Database, redisCfg Redi return nil } - rdb, err := newRedisClient(ctx, redisCfg.Addr, redisCfg.Password) - if err != nil { - return fmt.Errorf("connect redis: %w", err) + rdb := redisCfg.Client + shouldClose := false + if rdb == nil { + var err error + rdb, err = newRedisClient(ctx, redisCfg.Addr, redisCfg.Username, redisCfg.Password) + if err != nil { + return fmt.Errorf("connect redis: %w", err) + } + shouldClose = true + } + if shouldClose { + defer rdb.Close() } - defer rdb.Close() if err := rdb.Del(ctx, keys...).Err(); err != nil { return fmt.Errorf("redis del: %w", err) diff --git a/pkg/config/config.go b/pkg/config/config.go index 828e96d..02cfe96 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -20,6 +20,7 @@ type Config struct { BusinessMongoDatabase string SchedulerMongoDatabase string RedisAddr string + RedisUsername string RedisPassword string } @@ -37,6 +38,7 @@ func Load() Config { BusinessMongoDatabase: getenv("MONGO_DATABASE", "openim_v3"), SchedulerMongoDatabase: getenv("SCHEDULER_MONGO_DATABASE", "scheduler_center"), RedisAddr: getenv("REDIS_ADDR", ""), + RedisUsername: getenv("REDIS_USERNAME", ""), RedisPassword: getenv("REDIS_PASSWORD", ""), } } diff --git a/pkg/redis/client.go b/pkg/redis/client.go new file mode 100644 index 0000000..451a94b --- /dev/null +++ b/pkg/redis/client.go @@ -0,0 +1,22 @@ +package redis + +import ( + "context" + + goredis "github.com/redis/go-redis/v9" + + "scheduler-backend/pkg/config" +) + +func Connect(ctx context.Context, cfg config.Config) (*goredis.Client, error) { + client := goredis.NewClient(&goredis.Options{ + Addr: cfg.RedisAddr, + Username: cfg.RedisUsername, + Password: cfg.RedisPassword, + }) + if err := client.Ping(ctx).Err(); err != nil { + _ = client.Close() + return nil, err + } + return client, nil +}