852 lines
34 KiB
Go
852 lines
34 KiB
Go
package cron
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/url"
|
||
"os"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"git.imall.cloud/openim/open-im-server-deploy/pkg/apistruct"
|
||
mgo "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database/mgo"
|
||
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/model"
|
||
"git.imall.cloud/openim/protocol/constant"
|
||
"git.imall.cloud/openim/protocol/msg"
|
||
"git.imall.cloud/openim/protocol/sdkws"
|
||
"github.com/openimsdk/tools/log"
|
||
"github.com/openimsdk/tools/mcontext"
|
||
)
|
||
|
||
// clearGroupMsg 清理群聊消息
|
||
// 注意:每次执行时都会重新从数据库读取配置,确保配置的实时性
|
||
func (c *cronServer) clearGroupMsg() {
|
||
now := time.Now()
|
||
operationID := fmt.Sprintf("cron_clear_group_msg_%d_%d", os.Getpid(), now.UnixMilli())
|
||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 定时任务触发:检查清理群聊消息配置", "operationID", operationID, "time", now.Format("2006-01-02 15:04:05"))
|
||
|
||
// 每次执行时都重新读取配置,确保获取最新的配置值
|
||
config, err := c.systemConfigDB.FindByKey(ctx, "clear_group_msg")
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 读取配置失败", err, "key", "clear_group_msg")
|
||
return
|
||
}
|
||
|
||
// 如果配置不存在,跳过执行
|
||
if config == nil {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 配置不存在,跳过执行", "key", "clear_group_msg")
|
||
return
|
||
}
|
||
|
||
// 记录从数据库读取到的配置详细信息(用于排查问题)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 从数据库读取到群聊清理配置",
|
||
"key", config.Key,
|
||
"value", config.Value,
|
||
"enabled", config.Enabled,
|
||
"title", config.Title,
|
||
"valueType", config.ValueType,
|
||
"createTime", config.CreateTime.Format("2006-01-02 15:04:05"),
|
||
"updateTime", config.UpdateTime.Format("2006-01-02 15:04:05"))
|
||
|
||
// 如果配置未启用,跳过执行
|
||
if !config.Enabled {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 配置未启用,跳过执行", "key", config.Key, "enabled", config.Enabled)
|
||
return
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] ====== 开始执行清理群聊消息任务 ======", "key", config.Key, "value", config.Value, "enabled", config.Enabled)
|
||
|
||
// 值为空也跳过,避免解析错误
|
||
if strings.TrimSpace(config.Value) == "" {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置值为空,跳过执行", "key", config.Key)
|
||
return
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置读取成功,配置已启用", "key", config.Key, "value", config.Value, "enabled", config.Enabled)
|
||
|
||
// 解析配置值(单位:分钟)
|
||
minutes, err := strconv.ParseInt(config.Value, 10, 64)
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 解析配置值失败", err, "value", config.Value)
|
||
return
|
||
}
|
||
|
||
if minutes <= 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置分钟数无效,跳过执行", "minutes", minutes)
|
||
return
|
||
}
|
||
|
||
// 计算删除时间点:查询当前时间减去配置分钟数之前的消息
|
||
// 例如:配置30分钟,当前时间09:35:00,则查询09:05:00之前的所有消息
|
||
deltime := now.Add(-time.Duration(minutes) * time.Minute)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置检查通过,开始查询消息",
|
||
"配置分钟数", minutes,
|
||
"当前时间", now.Format("2006-01-02 15:04:05"),
|
||
"查询时间点", deltime.Format("2006-01-02 15:04:05"),
|
||
"查询时间戳", deltime.UnixMilli(),
|
||
"说明", fmt.Sprintf("将查询send_time <= %d (即%s之前)的所有消息", deltime.UnixMilli(), deltime.Format("2006-01-02 15:04:05")))
|
||
|
||
const (
|
||
deleteCount = 10000
|
||
deleteLimit = 50
|
||
)
|
||
|
||
var totalCount int
|
||
var fileDeleteCount int
|
||
for i := 1; i <= deleteCount; i++ {
|
||
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
|
||
|
||
// 先查询消息,提取文件信息并删除S3文件
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始查询消息", "iteration", i, "timestamp", deltime.UnixMilli(), "limit", deleteLimit, "deltime", deltime.Format("2006-01-02 15:04:05"))
|
||
docs, err := c.msgDocDB.GetRandBeforeMsg(ctx, deltime.UnixMilli(), deleteLimit)
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 查询消息失败", err, "iteration", i, "timestamp", deltime.UnixMilli())
|
||
break
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 查询消息结果", "iteration", i, "docCount", len(docs), "timestamp", deltime.UnixMilli())
|
||
|
||
if len(docs) == 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 没有更多消息需要删除", "iteration", i)
|
||
break
|
||
}
|
||
|
||
// 处理每个文档中的消息,提取文件信息并删除
|
||
// 同时收集要删除的消息信息(conversationID -> seqs),用于发送通知
|
||
var processedDocs int
|
||
var deletedDocCount int
|
||
conversationSeqsMap := make(map[string][]int64) // conversationID -> []seq
|
||
conversationDocsMap := make(map[string]*model.MsgDocModel) // conversationID -> doc
|
||
docIDsToDelete := make([]string, 0, len(docs)) // 收集需要删除的文档ID
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始处理文档", "iteration", i, "totalDocs", len(docs))
|
||
for docIdx, doc := range docs {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 处理文档", "iteration", i, "docIndex", docIdx+1, "totalDocs", len(docs), "docID", doc.DocID)
|
||
|
||
// 判断是否为群聊消息
|
||
conversationID := extractConversationID(doc.DocID)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 提取会话ID", "docID", doc.DocID, "conversationID", conversationID, "isGroup", isGroupConversationID(conversationID))
|
||
if !isGroupConversationID(conversationID) {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 跳过非群聊消息", "docID", doc.DocID, "conversationID", conversationID)
|
||
continue
|
||
}
|
||
|
||
// 获取完整的消息内容
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 获取完整消息文档", "docID", doc.DocID)
|
||
fullDoc, err := c.msgDocDB.FindOneByDocID(ctx, doc.DocID)
|
||
if err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 获取完整消息文档失败", err, "docID", doc.DocID)
|
||
continue
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 获取完整消息文档成功", "docID", doc.DocID, "msgCount", len(fullDoc.Msg))
|
||
|
||
// 收集要删除的消息seq(只收集send_time <= deltime的消息)
|
||
var seqs []int64
|
||
var beforeTimeCount int
|
||
var afterTimeCount int
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始收集消息seq", "docID", doc.DocID, "msgCount", len(fullDoc.Msg), "查询时间戳", deltime.UnixMilli())
|
||
for msgIdx, msgInfo := range fullDoc.Msg {
|
||
if msgInfo.Msg != nil {
|
||
isBeforeTime := msgInfo.Msg.SendTime <= deltime.UnixMilli()
|
||
if isBeforeTime {
|
||
beforeTimeCount++
|
||
} else {
|
||
afterTimeCount++
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 处理消息",
|
||
"docID", doc.DocID,
|
||
"msgIndex", msgIdx+1,
|
||
"totalMsgs", len(fullDoc.Msg),
|
||
"seq", msgInfo.Msg.Seq,
|
||
"sendID", msgInfo.Msg.SendID,
|
||
"contentType", msgInfo.Msg.ContentType,
|
||
"sendTime", msgInfo.Msg.SendTime,
|
||
"sendTimeFormatted", time.Unix(msgInfo.Msg.SendTime/1000, 0).Format("2006-01-02 15:04:05"),
|
||
"查询时间戳", deltime.UnixMilli(),
|
||
"是否在查询时间点之前", isBeforeTime)
|
||
if msgInfo.Msg.Seq > 0 && isBeforeTime {
|
||
seqs = append(seqs, msgInfo.Msg.Seq)
|
||
}
|
||
} else {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 消息数据为空", nil, "docID", doc.DocID, "msgIndex", msgIdx+1)
|
||
}
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 收集消息seq完成",
|
||
"docID", doc.DocID,
|
||
"seqCount", len(seqs),
|
||
"seqs", seqs,
|
||
"在查询时间点之前的消息数", beforeTimeCount,
|
||
"在查询时间点之后的消息数", afterTimeCount,
|
||
"说明", fmt.Sprintf("文档中有%d条消息在查询时间点之前,%d条消息在查询时间点之后", beforeTimeCount, afterTimeCount))
|
||
if len(seqs) > 0 {
|
||
conversationSeqsMap[conversationID] = append(conversationSeqsMap[conversationID], seqs...)
|
||
conversationDocsMap[conversationID] = fullDoc
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 已添加到通知列表", "conversationID", conversationID, "totalSeqs", len(conversationSeqsMap[conversationID]))
|
||
}
|
||
|
||
// 提取文件信息并删除S3文件
|
||
deletedFiles := c.extractAndDeleteFiles(ctx, fullDoc, true) // true表示只处理群聊消息
|
||
fileDeleteCount += deletedFiles
|
||
|
||
// 如果文档中所有消息都在查询时间点之前,则删除整个文档
|
||
// 如果文档中只有部分消息在查询时间点之前,则只删除那些消息(通过DeleteMsgsPhysicalBySeqs)
|
||
if afterTimeCount == 0 {
|
||
// 文档中所有消息都需要删除,删除整个文档
|
||
docIDsToDelete = append(docIDsToDelete, doc.DocID)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文档标记为删除(所有消息都在查询时间点之前)", "docID", doc.DocID, "beforeTimeCount", beforeTimeCount)
|
||
} else {
|
||
// 文档中只有部分消息需要删除,使用RPC调用DeleteMsgPhysicalBySeq删除指定消息
|
||
if len(seqs) > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始删除文档中的部分消息", "docID", doc.DocID, "conversationID", conversationID, "seqs", seqs)
|
||
_, err := c.msgClient.DeleteMsgPhysicalBySeq(ctx, &msg.DeleteMsgPhysicalBySeqReq{
|
||
ConversationID: conversationID,
|
||
Seqs: seqs,
|
||
})
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 删除文档中的部分消息失败", err, "docID", doc.DocID, "conversationID", conversationID, "seqs", seqs)
|
||
} else {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 删除文档中的部分消息成功", "docID", doc.DocID, "conversationID", conversationID, "seqCount", len(seqs))
|
||
totalCount += len(seqs)
|
||
}
|
||
}
|
||
}
|
||
processedDocs++
|
||
}
|
||
if processedDocs > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文档处理完成(群聊)", "processedDocs", processedDocs, "totalDocs", len(docs), "deletedFiles", fileDeleteCount, "docIDsToDelete", len(docIDsToDelete), "iteration", i)
|
||
}
|
||
|
||
// 删除整个文档(如果文档中所有消息都在查询时间点之前)
|
||
if len(docIDsToDelete) > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始删除整个文档", "iteration", i, "docCount", len(docIDsToDelete))
|
||
for _, docID := range docIDsToDelete {
|
||
if err := c.msgDocDB.DeleteDoc(ctx, docID); err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 删除文档失败", err, "docID", docID)
|
||
} else {
|
||
deletedDocCount++
|
||
totalCount++ // 每个文档算作一条删除记录
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 删除文档成功", "docID", docID)
|
||
}
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 批次删除文档完成", "deletedDocCount", deletedDocCount, "totalDocCount", len(docIDsToDelete), "totalCount", totalCount, "iteration", i)
|
||
}
|
||
|
||
// 发送删除通知
|
||
if len(conversationSeqsMap) > 0 {
|
||
c.sendDeleteNotifications(ctx, conversationSeqsMap, conversationDocsMap, true)
|
||
}
|
||
|
||
if deletedDocCount < deleteLimit && len(docIDsToDelete) == 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 已处理完所有消息", "lastBatchCount", deletedDocCount)
|
||
break
|
||
}
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] ====== 清理群聊消息任务完成 ======", "deltime", deltime.Format("2006-01-02 15:04:05"), "duration", time.Since(now), "totalCount", totalCount, "fileDeleteCount", fileDeleteCount, "operationID", operationID)
|
||
}
|
||
|
||
// clearUserMsg 清理个人聊天消息
|
||
// 注意:每次执行时都会重新从数据库读取配置,确保配置的实时性
|
||
func (c *cronServer) clearUserMsg() {
|
||
now := time.Now()
|
||
operationID := fmt.Sprintf("cron_clear_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
|
||
ctx := mcontext.SetOperationID(c.ctx, operationID)
|
||
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 定时任务触发:检查清理个人聊天消息配置", "operationID", operationID, "time", now.Format("2006-01-02 15:04:05"))
|
||
|
||
// 每次执行时都重新读取配置,确保获取最新的配置值
|
||
config, err := c.systemConfigDB.FindByKey(ctx, "clear_user_msg")
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 读取配置失败", err, "key", "clear_user_msg")
|
||
return
|
||
}
|
||
|
||
// 如果配置不存在,跳过执行
|
||
if config == nil {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 配置不存在,跳过执行", "key", "clear_user_msg")
|
||
return
|
||
}
|
||
|
||
// 记录从数据库读取到的配置详细信息(用于排查问题)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 从数据库读取到个人消息清理配置",
|
||
"key", config.Key,
|
||
"value", config.Value,
|
||
"enabled", config.Enabled,
|
||
"title", config.Title,
|
||
"valueType", config.ValueType,
|
||
"createTime", config.CreateTime.Format("2006-01-02 15:04:05"),
|
||
"updateTime", config.UpdateTime.Format("2006-01-02 15:04:05"))
|
||
|
||
// 如果配置未启用,跳过执行
|
||
if !config.Enabled {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 配置未启用,跳过执行", "key", config.Key, "enabled", config.Enabled)
|
||
return
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] ====== 开始执行清理个人聊天消息任务 ======", "key", config.Key, "value", config.Value, "enabled", config.Enabled)
|
||
|
||
// 值为空也跳过,避免解析错误
|
||
if strings.TrimSpace(config.Value) == "" {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置值为空,跳过执行", "key", config.Key)
|
||
return
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置读取成功,配置已启用", "key", config.Key, "value", config.Value, "enabled", config.Enabled)
|
||
|
||
// 解析配置值(单位:分钟)
|
||
minutes, err := strconv.ParseInt(config.Value, 10, 64)
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 解析配置值失败", err, "value", config.Value)
|
||
return
|
||
}
|
||
|
||
if minutes <= 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置分钟数无效,跳过执行", "minutes", minutes)
|
||
return
|
||
}
|
||
|
||
// 计算删除时间点:查询当前时间减去配置分钟数之前的消息
|
||
// 例如:配置30分钟,当前时间09:35:00,则查询09:05:00之前的所有消息
|
||
deltime := now.Add(-time.Duration(minutes) * time.Minute)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 配置检查通过,开始查询消息(个人聊天)",
|
||
"配置分钟数", minutes,
|
||
"当前时间", now.Format("2006-01-02 15:04:05"),
|
||
"查询时间点", deltime.Format("2006-01-02 15:04:05"),
|
||
"查询时间戳", deltime.UnixMilli(),
|
||
"说明", fmt.Sprintf("将查询send_time <= %d (即%s之前)的所有消息", deltime.UnixMilli(), deltime.Format("2006-01-02 15:04:05")))
|
||
|
||
const (
|
||
deleteCount = 10000
|
||
deleteLimit = 50
|
||
)
|
||
|
||
var totalCount int
|
||
var fileDeleteCount int
|
||
for i := 1; i <= deleteCount; i++ {
|
||
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
|
||
|
||
// 先查询消息,提取文件信息并删除S3文件
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始查询消息(个人聊天)", "iteration", i, "timestamp", deltime.UnixMilli(), "limit", deleteLimit, "deltime", deltime.Format("2006-01-02 15:04:05"))
|
||
docs, err := c.msgDocDB.GetRandBeforeMsg(ctx, deltime.UnixMilli(), deleteLimit)
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 查询消息失败(个人聊天)", err, "iteration", i, "timestamp", deltime.UnixMilli())
|
||
break
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 查询消息结果(个人聊天)", "iteration", i, "docCount", len(docs), "timestamp", deltime.UnixMilli())
|
||
|
||
if len(docs) == 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 没有更多消息需要删除(个人聊天)", "iteration", i)
|
||
break
|
||
}
|
||
|
||
// 处理每个文档中的消息,提取文件信息并删除
|
||
// 同时收集要删除的消息信息(conversationID -> seqs),用于发送通知
|
||
var processedDocs int
|
||
var deletedDocCount int
|
||
conversationSeqsMap := make(map[string][]int64) // conversationID -> []seq
|
||
conversationDocsMap := make(map[string]*model.MsgDocModel) // conversationID -> doc
|
||
docIDsToDelete := make([]string, 0, len(docs)) // 收集需要删除的文档ID
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始处理文档(个人聊天)", "iteration", i, "totalDocs", len(docs))
|
||
for docIdx, doc := range docs {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 处理文档(个人聊天)", "iteration", i, "docIndex", docIdx+1, "totalDocs", len(docs), "docID", doc.DocID)
|
||
|
||
// 判断是否为个人聊天消息
|
||
conversationID := extractConversationID(doc.DocID)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 提取会话ID(个人聊天)", "docID", doc.DocID, "conversationID", conversationID, "isSingle", isSingleConversationID(conversationID))
|
||
if !isSingleConversationID(conversationID) {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 跳过非个人聊天消息", "docID", doc.DocID, "conversationID", conversationID)
|
||
continue
|
||
}
|
||
|
||
// 获取完整的消息内容
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 获取完整消息文档(个人聊天)", "docID", doc.DocID)
|
||
fullDoc, err := c.msgDocDB.FindOneByDocID(ctx, doc.DocID)
|
||
if err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 获取完整消息文档失败(个人聊天)", err, "docID", doc.DocID)
|
||
continue
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 获取完整消息文档成功(个人聊天)", "docID", doc.DocID, "msgCount", len(fullDoc.Msg))
|
||
|
||
// 收集要删除的消息seq(只收集send_time <= deltime的消息)
|
||
var seqs []int64
|
||
var beforeTimeCount int
|
||
var afterTimeCount int
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始收集消息seq(个人聊天)", "docID", doc.DocID, "msgCount", len(fullDoc.Msg), "查询时间戳", deltime.UnixMilli())
|
||
for msgIdx, msgInfo := range fullDoc.Msg {
|
||
if msgInfo.Msg != nil {
|
||
isBeforeTime := msgInfo.Msg.SendTime <= deltime.UnixMilli()
|
||
if isBeforeTime {
|
||
beforeTimeCount++
|
||
} else {
|
||
afterTimeCount++
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 处理消息(个人聊天)",
|
||
"docID", doc.DocID,
|
||
"msgIndex", msgIdx+1,
|
||
"totalMsgs", len(fullDoc.Msg),
|
||
"seq", msgInfo.Msg.Seq,
|
||
"sendID", msgInfo.Msg.SendID,
|
||
"contentType", msgInfo.Msg.ContentType,
|
||
"sendTime", msgInfo.Msg.SendTime,
|
||
"sendTimeFormatted", time.Unix(msgInfo.Msg.SendTime/1000, 0).Format("2006-01-02 15:04:05"),
|
||
"查询时间戳", deltime.UnixMilli(),
|
||
"是否在查询时间点之前", isBeforeTime)
|
||
if msgInfo.Msg.Seq > 0 && isBeforeTime {
|
||
seqs = append(seqs, msgInfo.Msg.Seq)
|
||
}
|
||
} else {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 消息数据为空(个人聊天)", nil, "docID", doc.DocID, "msgIndex", msgIdx+1)
|
||
}
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 收集消息seq完成(个人聊天)",
|
||
"docID", doc.DocID,
|
||
"seqCount", len(seqs),
|
||
"seqs", seqs,
|
||
"在查询时间点之前的消息数", beforeTimeCount,
|
||
"在查询时间点之后的消息数", afterTimeCount,
|
||
"说明", fmt.Sprintf("文档中有%d条消息在查询时间点之前,%d条消息在查询时间点之后", beforeTimeCount, afterTimeCount))
|
||
if len(seqs) > 0 {
|
||
conversationSeqsMap[conversationID] = append(conversationSeqsMap[conversationID], seqs...)
|
||
conversationDocsMap[conversationID] = fullDoc
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 已添加到通知列表(个人聊天)", "conversationID", conversationID, "totalSeqs", len(conversationSeqsMap[conversationID]))
|
||
}
|
||
|
||
// 提取文件信息并删除S3文件
|
||
deletedFiles := c.extractAndDeleteFiles(ctx, fullDoc, false) // false表示只处理个人聊天消息
|
||
fileDeleteCount += deletedFiles
|
||
|
||
// 如果文档中所有消息都在查询时间点之前,则删除整个文档
|
||
// 如果文档中只有部分消息在查询时间点之前,则只删除那些消息(通过DeleteMsgPhysicalBySeq)
|
||
if afterTimeCount == 0 {
|
||
// 文档中所有消息都需要删除,删除整个文档
|
||
docIDsToDelete = append(docIDsToDelete, doc.DocID)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文档标记为删除(所有消息都在查询时间点之前)(个人聊天)", "docID", doc.DocID, "beforeTimeCount", beforeTimeCount)
|
||
} else {
|
||
// 文档中只有部分消息需要删除,使用RPC调用DeleteMsgPhysicalBySeq删除指定消息
|
||
if len(seqs) > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始删除文档中的部分消息(个人聊天)", "docID", doc.DocID, "conversationID", conversationID, "seqs", seqs)
|
||
_, err := c.msgClient.DeleteMsgPhysicalBySeq(ctx, &msg.DeleteMsgPhysicalBySeqReq{
|
||
ConversationID: conversationID,
|
||
Seqs: seqs,
|
||
})
|
||
if err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 删除文档中的部分消息失败(个人聊天)", err, "docID", doc.DocID, "conversationID", conversationID, "seqs", seqs)
|
||
} else {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 删除文档中的部分消息成功(个人聊天)", "docID", doc.DocID, "conversationID", conversationID, "seqCount", len(seqs))
|
||
totalCount += len(seqs)
|
||
}
|
||
}
|
||
}
|
||
processedDocs++
|
||
}
|
||
if processedDocs > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文档处理完成(个人)", "processedDocs", processedDocs, "totalDocs", len(docs), "deletedFiles", fileDeleteCount, "docIDsToDelete", len(docIDsToDelete), "iteration", i)
|
||
}
|
||
|
||
// 删除整个文档(如果文档中所有消息都在查询时间点之前)
|
||
if len(docIDsToDelete) > 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始删除整个文档(个人聊天)", "iteration", i, "docCount", len(docIDsToDelete))
|
||
for _, docID := range docIDsToDelete {
|
||
if err := c.msgDocDB.DeleteDoc(ctx, docID); err != nil {
|
||
log.ZError(ctx, "[CLEAR_MSG] 删除文档失败(个人聊天)", err, "docID", docID)
|
||
} else {
|
||
deletedDocCount++
|
||
totalCount++ // 每个文档算作一条删除记录
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 删除文档成功(个人聊天)", "docID", docID)
|
||
}
|
||
}
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 批次删除文档完成(个人聊天)", "deletedDocCount", deletedDocCount, "totalDocCount", len(docIDsToDelete), "totalCount", totalCount, "iteration", i)
|
||
}
|
||
|
||
// 发送删除通知
|
||
if len(conversationSeqsMap) > 0 {
|
||
c.sendDeleteNotifications(ctx, conversationSeqsMap, conversationDocsMap, false)
|
||
}
|
||
|
||
if deletedDocCount < deleteLimit && len(docIDsToDelete) == 0 {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 已处理完所有消息(个人聊天)", "lastBatchCount", deletedDocCount)
|
||
break
|
||
}
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] ====== 清理个人聊天消息任务完成 ======", "deltime", deltime.Format("2006-01-02 15:04:05"), "duration", time.Since(now), "totalCount", totalCount, "fileDeleteCount", fileDeleteCount, "operationID", operationID)
|
||
}
|
||
|
||
// isGroupConversationID 判断是否为群聊会话ID
|
||
func isGroupConversationID(conversationID string) bool {
|
||
return strings.HasPrefix(conversationID, "g_") || strings.HasPrefix(conversationID, "sg_")
|
||
}
|
||
|
||
// isSingleConversationID 判断是否为个人聊天会话ID
|
||
func isSingleConversationID(conversationID string) bool {
|
||
return strings.HasPrefix(conversationID, "si_")
|
||
}
|
||
|
||
// extractConversationID 从docID中提取conversationID
|
||
func extractConversationID(docID string) string {
|
||
index := strings.LastIndex(docID, ":")
|
||
if index < 0 {
|
||
return ""
|
||
}
|
||
return docID[:index]
|
||
}
|
||
|
||
// extractAndDeleteFiles 从消息中提取文件信息并删除S3文件
|
||
// isGroupMsg: true表示只处理群聊消息,false表示只处理个人聊天消息
|
||
func (c *cronServer) extractAndDeleteFiles(ctx context.Context, doc *model.MsgDocModel, isGroupMsg bool) int {
|
||
if doc == nil || len(doc.Msg) == 0 {
|
||
return 0
|
||
}
|
||
|
||
// 判断conversationID类型
|
||
conversationID := extractConversationID(doc.DocID)
|
||
if isGroupMsg && !isGroupConversationID(conversationID) {
|
||
return 0
|
||
}
|
||
if !isGroupMsg && !isSingleConversationID(conversationID) {
|
||
return 0
|
||
}
|
||
|
||
var fileNames []string
|
||
fileNamesMap := make(map[string]bool) // 用于去重
|
||
var fileTypeStats = map[string]int{
|
||
"picture": 0,
|
||
"video": 0,
|
||
"file": 0,
|
||
"voice": 0,
|
||
}
|
||
|
||
// 遍历消息,提取文件信息
|
||
totalMsgs := len(doc.Msg)
|
||
var processedMsgs int
|
||
for _, msgInfo := range doc.Msg {
|
||
if msgInfo.Msg == nil {
|
||
continue
|
||
}
|
||
|
||
contentType := msgInfo.Msg.ContentType
|
||
content := msgInfo.Msg.Content
|
||
processedMsgs++
|
||
|
||
// 根据消息类型提取文件URL
|
||
switch contentType {
|
||
case constant.Picture:
|
||
// 图片消息
|
||
var pictureElem apistruct.PictureElem
|
||
if err := json.Unmarshal([]byte(content), &pictureElem); err == nil {
|
||
var extractedCount int
|
||
if pictureElem.SourcePicture.Url != "" {
|
||
if name := extractFileNameFromURL(pictureElem.SourcePicture.Url); name != "" {
|
||
fileNamesMap[name] = true
|
||
extractedCount++
|
||
}
|
||
}
|
||
if pictureElem.BigPicture.Url != "" {
|
||
if name := extractFileNameFromURL(pictureElem.BigPicture.Url); name != "" {
|
||
fileNamesMap[name] = true
|
||
extractedCount++
|
||
}
|
||
}
|
||
if pictureElem.SnapshotPicture.Url != "" {
|
||
if name := extractFileNameFromURL(pictureElem.SnapshotPicture.Url); name != "" {
|
||
fileNamesMap[name] = true
|
||
extractedCount++
|
||
}
|
||
}
|
||
if extractedCount > 0 {
|
||
fileTypeStats["picture"]++
|
||
}
|
||
} else {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 解析图片消息失败", "err", err, "seq", msgInfo.Msg.Seq)
|
||
}
|
||
case constant.Video:
|
||
// 视频消息
|
||
var videoElem apistruct.VideoElem
|
||
if err := json.Unmarshal([]byte(content), &videoElem); err == nil {
|
||
var extractedCount int
|
||
if videoElem.VideoURL != "" {
|
||
if name := extractFileNameFromURL(videoElem.VideoURL); name != "" {
|
||
fileNamesMap[name] = true
|
||
extractedCount++
|
||
}
|
||
}
|
||
if videoElem.SnapshotURL != "" {
|
||
if name := extractFileNameFromURL(videoElem.SnapshotURL); name != "" {
|
||
fileNamesMap[name] = true
|
||
extractedCount++
|
||
}
|
||
}
|
||
if extractedCount > 0 {
|
||
fileTypeStats["video"]++
|
||
}
|
||
} else {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 解析视频消息失败", "err", err, "seq", msgInfo.Msg.Seq)
|
||
}
|
||
case constant.File:
|
||
// 文件消息
|
||
var fileElem apistruct.FileElem
|
||
if err := json.Unmarshal([]byte(content), &fileElem); err == nil {
|
||
if fileElem.SourceURL != "" {
|
||
if name := extractFileNameFromURL(fileElem.SourceURL); name != "" {
|
||
fileNamesMap[name] = true
|
||
fileTypeStats["file"]++
|
||
}
|
||
}
|
||
} else {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 解析文件消息失败", "err", err, "seq", msgInfo.Msg.Seq)
|
||
}
|
||
case constant.Voice:
|
||
// 音频消息
|
||
var soundElem apistruct.SoundElem
|
||
if err := json.Unmarshal([]byte(content), &soundElem); err == nil {
|
||
if soundElem.SourceURL != "" {
|
||
if name := extractFileNameFromURL(soundElem.SourceURL); name != "" {
|
||
fileNamesMap[name] = true
|
||
fileTypeStats["voice"]++
|
||
}
|
||
}
|
||
} else {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 解析音频消息失败", "err", err, "seq", msgInfo.Msg.Seq)
|
||
}
|
||
}
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 消息处理统计", "docID", doc.DocID, "totalMsgs", totalMsgs, "processedMsgs", processedMsgs, "fileTypeStats", fileTypeStats)
|
||
|
||
// 将map转换为slice
|
||
for name := range fileNamesMap {
|
||
fileNames = append(fileNames, name)
|
||
}
|
||
|
||
if len(fileNames) == 0 {
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 消息中未找到文件", "docID", doc.DocID)
|
||
return 0
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 提取到文件列表", "docID", doc.DocID, "conversationID", conversationID, "fileCount", len(fileNames), "fileNames", fileNames[:min(10, len(fileNames))])
|
||
|
||
// 删除S3文件
|
||
// 通过objectDB查询文件信息,然后删除数据库记录
|
||
// 直接按文件名查询(不指定engine),再使用记录中的engine/key处理
|
||
deletedCount := 0
|
||
notFoundCount := 0
|
||
failedCount := 0
|
||
var deletedFiles []string
|
||
var notFoundFiles []string
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始删除文件记录", "docID", doc.DocID, "totalFiles", len(fileNames))
|
||
|
||
for i, fileName := range fileNames {
|
||
obj, err := c.objectDB.Take(ctx, "", fileName)
|
||
if err != nil || obj == nil {
|
||
// 检查是否是"未找到"错误(正常情况)还是真正的错误
|
||
if err != nil && !mgo.IsNotFound(err) {
|
||
// 真正的错误,记录为警告
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 查询文件记录出错", err, "fileName", fileName, "index", i+1, "total", len(fileNames))
|
||
} else {
|
||
// 文件不存在是正常情况,只记录debug日志
|
||
log.ZDebug(ctx, "[CLEAR_MSG] 文件记录不存在(正常)", "fileName", fileName, "index", i+1, "total", len(fileNames))
|
||
}
|
||
notFoundCount++
|
||
notFoundFiles = append(notFoundFiles, fileName)
|
||
continue
|
||
}
|
||
|
||
engine := obj.Engine
|
||
// 在删除前获取key引用计数,用于判断是否需要删除S3文件
|
||
keyCountBeforeDelete, err := c.objectDB.GetKeyCount(ctx, engine, obj.Key)
|
||
if err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 获取key引用计数失败", err, "engine", engine, "key", obj.Key, "fileName", fileName)
|
||
keyCountBeforeDelete = 0 // 如果获取失败,假设为0,后续会尝试删除
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 准备删除文件记录", "engine", engine, "fileName", fileName, "key", obj.Key, "index", i+1, "total", len(fileNames), "size", obj.Size, "contentType", obj.ContentType, "keyCountBeforeDelete", keyCountBeforeDelete)
|
||
|
||
// 删除数据库记录
|
||
if err := c.objectDB.Delete(ctx, engine, []string{fileName}); err != nil {
|
||
failedCount++
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 删除文件记录失败", err, "engine", engine, "fileName", fileName, "key", obj.Key, "index", i+1, "total", len(fileNames))
|
||
continue
|
||
}
|
||
|
||
deletedCount++
|
||
deletedFiles = append(deletedFiles, fileName)
|
||
|
||
// 删除数据库记录后,再次检查key引用计数
|
||
keyCountAfterDelete, err := c.objectDB.GetKeyCount(ctx, engine, obj.Key)
|
||
if err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 删除后获取key引用计数失败", err, "engine", engine, "key", obj.Key, "fileName", fileName)
|
||
}
|
||
|
||
// 删除缓存
|
||
if c.s3Cache != nil {
|
||
if err := c.s3Cache.DelS3Key(ctx, engine, fileName); err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 删除S3缓存失败", err, "engine", engine, "fileName", fileName)
|
||
} else {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] S3缓存删除成功", "engine", engine, "fileName", fileName)
|
||
}
|
||
}
|
||
|
||
// 如果删除前引用计数<=1,说明删除后应该为0,S3文件应该被删除
|
||
if keyCountBeforeDelete <= 1 {
|
||
// 删除S3文件
|
||
if c.s3Client != nil {
|
||
if err := c.s3Client.DeleteObject(ctx, obj.Key); err != nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 删除S3文件失败", err, "engine", engine, "key", obj.Key, "fileName", fileName)
|
||
} else {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] S3文件删除成功",
|
||
"engine", engine,
|
||
"key", obj.Key,
|
||
"fileName", fileName,
|
||
"keyCountBeforeDelete", keyCountBeforeDelete,
|
||
"keyCountAfterDelete", keyCountAfterDelete)
|
||
}
|
||
} else {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] S3客户端未初始化,无法删除S3文件", nil,
|
||
"engine", engine,
|
||
"key", obj.Key,
|
||
"fileName", fileName,
|
||
"keyCountBeforeDelete", keyCountBeforeDelete,
|
||
"keyCountAfterDelete", keyCountAfterDelete)
|
||
}
|
||
} else {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文件key仍有其他引用,S3文件保留",
|
||
"engine", engine,
|
||
"key", obj.Key,
|
||
"fileName", fileName,
|
||
"keyCountBeforeDelete", keyCountBeforeDelete,
|
||
"keyCountAfterDelete", keyCountAfterDelete)
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文件记录删除成功", "engine", engine, "fileName", fileName, "key", obj.Key, "index", i+1, "total", len(fileNames), "size", obj.Size, "contentType", obj.ContentType)
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 文件删除汇总", "docID", doc.DocID, "conversationID", conversationID,
|
||
"totalFiles", len(fileNames),
|
||
"deletedCount", deletedCount,
|
||
"notFoundCount", notFoundCount,
|
||
"failedCount", failedCount,
|
||
"deletedFiles", deletedFiles[:min(5, len(deletedFiles))],
|
||
"notFoundFiles", notFoundFiles[:min(5, len(notFoundFiles))])
|
||
|
||
return deletedCount
|
||
}
|
||
|
||
// extractFileNameFromURL 从URL中提取文件名
|
||
func extractFileNameFromURL(fileURL string) string {
|
||
if fileURL == "" {
|
||
return ""
|
||
}
|
||
|
||
// 解析URL
|
||
parsedURL, err := url.Parse(fileURL)
|
||
if err != nil {
|
||
// 如果解析失败,尝试从URL路径中提取
|
||
parts := strings.Split(fileURL, "/")
|
||
if len(parts) > 0 {
|
||
lastPart := parts[len(parts)-1]
|
||
// 移除查询参数
|
||
if idx := strings.Index(lastPart, "?"); idx >= 0 {
|
||
lastPart = lastPart[:idx]
|
||
}
|
||
return lastPart
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// 从URL路径中提取文件名
|
||
path := parsedURL.Path
|
||
parts := strings.Split(path, "/")
|
||
if len(parts) > 0 {
|
||
fileName := parts[len(parts)-1]
|
||
// 移除查询参数
|
||
if idx := strings.Index(fileName, "?"); idx >= 0 {
|
||
fileName = fileName[:idx]
|
||
}
|
||
return fileName
|
||
}
|
||
|
||
return ""
|
||
}
|
||
|
||
func min(a, b int) int {
|
||
if a < b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
// sendDeleteNotifications 发送消息删除通知
|
||
// conversationSeqsMap: conversationID -> []seq
|
||
// conversationDocsMap: conversationID -> doc
|
||
// isGroupMsg: true表示群聊消息,false表示个人聊天消息
|
||
func (c *cronServer) sendDeleteNotifications(ctx context.Context, conversationSeqsMap map[string][]int64, conversationDocsMap map[string]*model.MsgDocModel, isGroupMsg bool) {
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 开始发送删除通知", "conversationCount", len(conversationSeqsMap), "isGroupMsg", isGroupMsg)
|
||
|
||
adminUserID := c.config.Share.IMAdminUser.UserIDs[0]
|
||
|
||
for conversationID, seqs := range conversationSeqsMap {
|
||
if len(seqs) == 0 {
|
||
continue
|
||
}
|
||
|
||
// 从conversationDocsMap获取原始消息文档(参考撤销消息的实现)
|
||
doc, ok := conversationDocsMap[conversationID]
|
||
if !ok || doc == nil || len(doc.Msg) == 0 {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 无法获取原始消息", nil, "conversationID", conversationID)
|
||
continue
|
||
}
|
||
|
||
// 获取第一条消息的信息(参考撤销消息的实现:使用原始消息的SessionType、GroupID、RecvID)
|
||
var firstMsg *model.MsgDataModel
|
||
for _, msgInfo := range doc.Msg {
|
||
if msgInfo.Msg != nil {
|
||
firstMsg = msgInfo.Msg
|
||
break
|
||
}
|
||
}
|
||
if firstMsg == nil {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 无法获取原始消息数据", nil, "conversationID", conversationID)
|
||
continue
|
||
}
|
||
|
||
// 构建删除通知
|
||
tips := &sdkws.DeleteMsgsTips{
|
||
UserID: adminUserID,
|
||
ConversationID: conversationID,
|
||
Seqs: seqs,
|
||
}
|
||
|
||
// 参考撤销消息的实现:根据原始消息的SessionType确定recvID
|
||
var recvID string
|
||
var sessionType int32
|
||
if firstMsg.SessionType == constant.ReadGroupChatType {
|
||
recvID = firstMsg.GroupID
|
||
sessionType = firstMsg.SessionType
|
||
} else {
|
||
recvID = firstMsg.RecvID
|
||
sessionType = firstMsg.SessionType
|
||
}
|
||
|
||
if recvID == "" {
|
||
log.ZWarn(ctx, "[CLEAR_MSG] 无法确定通知接收者", nil, "conversationID", conversationID, "sessionType", sessionType, "groupID", firstMsg.GroupID, "recvID", firstMsg.RecvID)
|
||
continue
|
||
}
|
||
|
||
// 使用NotificationSender发送通知(参考撤销消息的实现)
|
||
c.notificationSender.NotificationWithSessionType(ctx, adminUserID, recvID,
|
||
constant.DeleteMsgsNotification, sessionType, tips)
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 发送删除通知", "conversationID", conversationID, "recvID", recvID, "sessionType", sessionType, "seqCount", len(seqs))
|
||
}
|
||
|
||
log.ZInfo(ctx, "[CLEAR_MSG] 删除通知发送完成", "conversationCount", len(conversationSeqsMap), "isGroupMsg", isGroupMsg)
|
||
}
|