Files
kim.dev.6789 e50142a3b9 复制项目
2026-01-14 22:16:44 +08:00

308 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cron
import (
"context"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/config"
disetcd "git.imall.cloud/openim/open-im-server-deploy/pkg/common/discovery/etcd"
mcache "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache/mcache"
redisCache "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache/redis"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database/mgo"
"git.imall.cloud/openim/open-im-server-deploy/pkg/dbbuild"
"git.imall.cloud/openim/open-im-server-deploy/pkg/notification"
"git.imall.cloud/openim/open-im-server-deploy/pkg/rpcli"
pbconversation "git.imall.cloud/openim/protocol/conversation"
"git.imall.cloud/openim/protocol/msg"
"git.imall.cloud/openim/protocol/third"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/cont"
"github.com/openimsdk/tools/s3/disable"
"github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/robfig/cron/v3"
"google.golang.org/grpc"
)
type Config struct {
CronTask config.CronTask
Share config.Share
Discovery config.Discovery
Mongo config.Mongo
Redis config.Redis
Minio config.Minio
Third config.Third
Notification config.Notification
}
func Start(ctx context.Context, conf *Config, client discovery.SvcDiscoveryRegistry, service grpc.ServiceRegistrar) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
if conf.CronTask.RetainChatRecords < 1 {
log.ZInfo(ctx, "disable cron")
<-ctx.Done()
return nil
}
ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUser.UserIDs[0])
msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg)
if err != nil {
return err
}
thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation)
if err != nil {
return err
}
groupConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Group)
if err != nil {
return err
}
// 初始化数据库连接(用于会议群聊解散)
dbb := dbbuild.NewBuilder(&conf.Mongo, &conf.Redis)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
meetingDB, err := mgo.NewMeetingMongo(mgocli.GetDB())
if err != nil {
return err
}
systemConfigDB, err := mgo.NewSystemConfigMongo(mgocli.GetDB())
if err != nil {
return err
}
msgDocDB, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
objectDB, err := mgo.NewS3Mongo(mgocli.GetDB())
if err != nil {
return err
}
// 初始化S3客户端和缓存用于删除S3文件
rdb, err := dbb.Redis(ctx)
if err != nil {
log.ZWarn(ctx, "Redis连接失败S3文件删除功能可能受限", err)
rdb = nil
}
var s3Client s3.Interface
var s3Cache cont.S3Cache
switch enable := conf.Third.Object.Enable; enable {
case "minio":
var minioCache minio.Cache
if rdb == nil {
mc, err := mgo.NewCacheMgo(mgocli.GetDB())
if err != nil {
log.ZWarn(ctx, "Mongo缓存初始化失败S3文件删除功能可能受限", err)
s3Client = disable.NewDisable()
s3Cache = nil
} else {
minioCache = mcache.NewMinioCache(mc)
s3Client, err = minio.NewMinio(ctx, minioCache, *conf.Minio.Build())
if err != nil {
log.ZError(ctx, "Minio初始化失败", err)
return err
}
s3Cache = nil // MongoDB缓存模式下S3Cache为nil
}
} else {
minioCache = redisCache.NewMinioCache(rdb)
s3Client, err = minio.NewMinio(ctx, minioCache, *conf.Minio.Build())
if err != nil {
log.ZError(ctx, "Minio初始化失败", err)
return err
}
s3Cache = redisCache.NewS3Cache(rdb, s3Client)
}
case "":
s3Client = disable.NewDisable()
s3Cache = nil
default:
// 其他S3类型暂不支持使用disable模式
log.ZWarn(ctx, "S3类型不支持使用disable模式", nil, "enable", enable)
s3Client = disable.NewDisable()
s3Cache = nil
}
var locker Locker
if conf.Discovery.Enable == config.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
conf.CronTask.GetConfigFileName(),
conf.Share.GetConfigFileName(),
conf.Discovery.GetConfigFileName(),
})
cm.Watch(ctx)
locker, err = NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient())
if err != nil {
return err
}
}
if locker == nil {
locker = emptyLocker{}
}
// 初始化NotificationSender用于发送删除消息通知
notificationSender := notification.NewNotificationSender(&conf.Notification,
notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
return msg.NewMsgClient(msgConn).SendMsg(ctx, req)
}),
)
srv := &cronServer{
ctx: ctx,
config: conf,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
groupClient: rpcli.NewGroupClient(groupConn),
meetingDB: meetingDB,
systemConfigDB: systemConfigDB,
msgDocDB: msgDocDB,
objectDB: objectDB,
s3Client: s3Client,
s3Cache: s3Cache,
notificationSender: notificationSender,
locker: locker,
}
if err := srv.registerClearS3(); err != nil {
return err
}
if err := srv.registerDeleteMsg(); err != nil {
return err
}
if err := srv.registerClearUserMsg(); err != nil {
return err
}
if err := srv.registerDismissMeetingGroups(); err != nil {
return err
}
if err := srv.registerClearGroupMsgByConfig(); err != nil {
return err
}
if err := srv.registerClearUserMsgByConfig(); err != nil {
return err
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime)
srv.cron.Start()
log.ZDebug(ctx, "cron task server is running")
<-ctx.Done()
log.ZDebug(ctx, "cron task server is shutting down")
srv.cron.Stop()
return nil
}
type Locker interface {
ExecuteWithLock(ctx context.Context, taskName string, task func())
}
type emptyLocker struct{}
func (emptyLocker) ExecuteWithLock(ctx context.Context, taskName string, task func()) {
task()
}
type cronServer struct {
ctx context.Context
config *Config
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
groupClient *rpcli.GroupClient
meetingDB database.Meeting
systemConfigDB database.SystemConfig
msgDocDB database.Msg
objectDB database.ObjectInfo
s3Client s3.Interface
s3Cache cont.S3Cache
notificationSender *notification.NotificationSender
locker Locker
}
func (c *cronServer) registerClearS3() error {
if c.config.CronTask.FileExpireTime <= 0 || len(c.config.CronTask.DeleteObjectType) == 0 {
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
return nil
}
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3)
})
return errs.WrapMsg(err, "failed to register clear s3 cron task")
}
func (c *cronServer) registerDeleteMsg() error {
if c.config.CronTask.RetainChatRecords <= 0 {
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
return nil
}
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg)
})
return errs.WrapMsg(err, "failed to register delete msg cron task")
}
func (c *cronServer) registerClearUserMsg() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg)
})
return errs.WrapMsg(err, "failed to register clear user msg cron task")
}
func (c *cronServer) registerDismissMeetingGroups() error {
// 每分钟执行一次检查已结束超过10分钟的会议并解散群聊
_, err := c.cron.AddFunc("*/1 * * * *", func() {
c.locker.ExecuteWithLock(c.ctx, "dismissMeetingGroups", c.dismissMeetingGroups)
})
return errs.WrapMsg(err, "failed to register dismiss meeting groups cron task")
}
func (c *cronServer) registerClearGroupMsgByConfig() error {
// 使用配置文件中的执行时间,清理群聊消息(根据系统配置)
cronExpr := c.config.CronTask.CronExecuteTime
log.ZInfo(c.ctx, "[CLEAR_MSG] 注册清理群聊消息定时任务", "cron", cronExpr)
_, err := c.cron.AddFunc(cronExpr, func() {
c.locker.ExecuteWithLock(c.ctx, "clearGroupMsgByConfig", c.clearGroupMsg)
})
if err != nil {
log.ZError(c.ctx, "[CLEAR_MSG] 注册清理群聊消息定时任务失败", err)
} else {
log.ZInfo(c.ctx, "[CLEAR_MSG] 清理群聊消息定时任务注册成功", "cron", cronExpr)
}
return errs.WrapMsg(err, "failed to register clear group msg by config cron task")
}
func (c *cronServer) registerClearUserMsgByConfig() error {
// 使用配置文件中的执行时间,清理个人聊天消息(根据系统配置)
cronExpr := c.config.CronTask.CronExecuteTime
log.ZInfo(c.ctx, "[CLEAR_MSG] 注册清理个人聊天消息定时任务", "cron", cronExpr)
_, err := c.cron.AddFunc(cronExpr, func() {
c.locker.ExecuteWithLock(c.ctx, "clearUserMsgByConfig", c.clearUserMsg)
})
if err != nil {
log.ZError(c.ctx, "[CLEAR_MSG] 注册清理个人聊天消息定时任务失败", err)
} else {
log.ZInfo(c.ctx, "[CLEAR_MSG] 清理个人聊天消息定时任务注册成功", "cron", cronExpr)
}
return errs.WrapMsg(err, "failed to register clear user msg by config cron task")
}