package api import ( "context" "errors" "time" "git.imall.cloud/openim/open-im-server-deploy/pkg/apistruct" "git.imall.cloud/openim/open-im-server-deploy/pkg/authverify" rediscache "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache/redis" "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database" "git.imall.cloud/openim/open-im-server-deploy/pkg/rpcli" "git.imall.cloud/openim/protocol/constant" "git.imall.cloud/openim/protocol/sdkws" "github.com/gin-gonic/gin" "github.com/openimsdk/tools/a2r" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" ) type StatisticsApi struct { rdb redis.UniversalClient msgDatabase database.Msg userClient *rpcli.UserClient groupClient *rpcli.GroupClient } func NewStatisticsApi(rdb redis.UniversalClient, msgDatabase database.Msg, userClient *rpcli.UserClient, groupClient *rpcli.GroupClient) *StatisticsApi { return &StatisticsApi{ rdb: rdb, msgDatabase: msgDatabase, userClient: userClient, groupClient: groupClient, } } const ( trendIntervalMinutes15 = 15 trendIntervalMinutes30 = 30 trendIntervalMinutes60 = 60 trendChatTypeSingle = 1 trendChatTypeGroup = 2 defaultTrendDuration = 24 * time.Hour ) // refreshOnlineUserCountAndHistory 刷新在线人数并写入历史采样 func refreshOnlineUserCountAndHistory(ctx context.Context, rdb redis.UniversalClient) { count, err := rediscache.RefreshOnlineUserCount(ctx, rdb) if err != nil { log.ZWarn(ctx, "refresh online user count failed", err) return } if err := rediscache.AppendOnlineUserCountHistory(ctx, rdb, time.Now().UnixMilli(), count); err != nil { log.ZWarn(ctx, "append online user count history failed", err) } } // startOnlineCountRefresher 定时刷新在线人数缓存 func startOnlineCountRefresher(ctx context.Context, cfg *Config, rdb redis.UniversalClient) { if cfg == nil || rdb == nil { return } refreshCfg := cfg.API.OnlineCountRefresh if !refreshCfg.Enable || refreshCfg.Interval <= 0 { return } log.ZInfo(ctx, "online user count refresh enabled", "interval", refreshCfg.Interval) go func() { refreshOnlineUserCountAndHistory(ctx, rdb) ticker := time.NewTicker(refreshCfg.Interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: refreshOnlineUserCountAndHistory(ctx, rdb) } } }() } // OnlineUserCount 在线人数统计接口 func (s *StatisticsApi) OnlineUserCount(c *gin.Context) { if err := authverify.CheckAdmin(c); err != nil { apiresp.GinError(c, err) return } if s.rdb == nil { apiresp.GinError(c, errs.ErrInternalServer.WrapMsg("redis client is nil")) return } count, err := rediscache.GetOnlineUserCount(c, s.rdb) if err != nil { if errors.Is(err, redis.Nil) { count, err = rediscache.RefreshOnlineUserCount(c, s.rdb) if err == nil { if appendErr := rediscache.AppendOnlineUserCountHistory(c, s.rdb, time.Now().UnixMilli(), count); appendErr != nil { log.ZWarn(c, "append online user count history failed", appendErr) } } } } if err != nil { apiresp.GinError(c, err) return } apiresp.GinSuccess(c, &apistruct.OnlineUserCountResp{OnlineCount: count}) } // OnlineUserCountTrend 在线人数走势统计接口 func (s *StatisticsApi) OnlineUserCountTrend(c *gin.Context) { if err := authverify.CheckAdmin(c); err != nil { apiresp.GinError(c, err) return } req, err := a2r.ParseRequest[apistruct.OnlineUserCountTrendReq](c) if err != nil { apiresp.GinError(c, err) return } if s.rdb == nil { apiresp.GinError(c, errs.ErrInternalServer.WrapMsg("redis client is nil")) return } intervalMillis, err := parseTrendIntervalMillis(req.IntervalMinutes) if err != nil { apiresp.GinError(c, err) return } startTime, endTime, err := normalizeTrendTimeRange(req.StartTime, req.EndTime) if err != nil { apiresp.GinError(c, err) return } bucketStart, bucketEnd := alignTrendRange(startTime, endTime, intervalMillis) // 使用对齐后的时间范围获取历史数据,确保数据范围与构建数据点的范围一致 samples, err := rediscache.GetOnlineUserCountHistory(c, s.rdb, bucketStart, bucketEnd) if err != nil { apiresp.GinError(c, err) return } // 将当前在线人数作为最新采样,确保最后一个时间段展示该段内的最大在线人数 now := time.Now().UnixMilli() currentBucket := now - (now % intervalMillis) if now < 0 && now%intervalMillis != 0 { currentBucket = now - ((now % intervalMillis) + intervalMillis) } if currentBucket >= bucketStart && currentBucket <= bucketEnd { if currentCount, err := rediscache.GetOnlineUserCount(c, s.rdb); err == nil { samples = append(samples, rediscache.OnlineUserCountSample{ Timestamp: now, Count: currentCount, }) } } points := buildOnlineUserCountTrendPoints(samples, bucketStart, bucketEnd, intervalMillis) apiresp.GinSuccess(c, &apistruct.OnlineUserCountTrendResp{ IntervalMinutes: req.IntervalMinutes, Points: points, }) } // UserSendMsgCount 用户发送消息总数统计 func (s *StatisticsApi) UserSendMsgCount(c *gin.Context) { if err := authverify.CheckAdmin(c); err != nil { apiresp.GinError(c, err) return } _, err := a2r.ParseRequest[apistruct.UserSendMsgCountReq](c) if err != nil { apiresp.GinError(c, err) return } if s.msgDatabase == nil { apiresp.GinError(c, errs.ErrInternalServer.WrapMsg("msg database is nil")) return } now := time.Now() endTime := now.UnixMilli() start24h := now.Add(-24 * time.Hour).UnixMilli() start7d := now.Add(-7 * 24 * time.Hour).UnixMilli() start30d := now.Add(-30 * 24 * time.Hour).UnixMilli() count24h, err := s.msgDatabase.CountUserSendMessages(c, "", start24h, endTime, "") if err != nil { apiresp.GinError(c, err) return } count7d, err := s.msgDatabase.CountUserSendMessages(c, "", start7d, endTime, "") if err != nil { apiresp.GinError(c, err) return } count30d, err := s.msgDatabase.CountUserSendMessages(c, "", start30d, endTime, "") if err != nil { apiresp.GinError(c, err) return } apiresp.GinSuccess(c, &apistruct.UserSendMsgCountResp{ Count24h: count24h, Count7d: count7d, Count30d: count30d, }) } // UserSendMsgCountTrend 用户发送消息走势统计 func (s *StatisticsApi) UserSendMsgCountTrend(c *gin.Context) { if err := authverify.CheckAdmin(c); err != nil { apiresp.GinError(c, err) return } req, err := a2r.ParseRequest[apistruct.UserSendMsgCountTrendReq](c) if err != nil { apiresp.GinError(c, err) return } if s.msgDatabase == nil { apiresp.GinError(c, errs.ErrInternalServer.WrapMsg("msg database is nil")) return } intervalMillis, err := parseTrendIntervalMillis(req.IntervalMinutes) if err != nil { apiresp.GinError(c, err) return } startTime, endTime, err := normalizeTrendTimeRange(req.StartTime, req.EndTime) if err != nil { apiresp.GinError(c, err) return } sessionTypes, err := mapTrendChatType(req.ChatType) if err != nil { apiresp.GinError(c, err) return } bucketStart, bucketEnd := alignTrendRange(startTime, endTime, intervalMillis) countMap, err := s.msgDatabase.CountUserSendMessagesTrend(c, req.UserID, sessionTypes, startTime, endTime, intervalMillis) if err != nil { apiresp.GinError(c, err) return } points := buildUserSendMsgCountTrendPoints(countMap, bucketStart, bucketEnd, intervalMillis) apiresp.GinSuccess(c, &apistruct.UserSendMsgCountTrendResp{ UserID: req.UserID, ChatType: req.ChatType, IntervalMinutes: req.IntervalMinutes, Points: points, }) } // UserSendMsgQuery 用户发送消息查询 func (s *StatisticsApi) UserSendMsgQuery(c *gin.Context) { if err := authverify.CheckAdmin(c); err != nil { apiresp.GinError(c, err) return } req, err := a2r.ParseRequest[apistruct.UserSendMsgQueryReq](c) if err != nil { apiresp.GinError(c, err) return } if req.StartTime > 0 && req.EndTime > 0 && req.EndTime <= req.StartTime { apiresp.GinError(c, errs.ErrArgs.WrapMsg("invalid time range")) return } if s.msgDatabase == nil { apiresp.GinError(c, errs.ErrInternalServer.WrapMsg("msg database is nil")) return } pageNumber := req.PageNumber if pageNumber <= 0 { pageNumber = 1 } showNumber := req.ShowNumber if showNumber <= 0 { showNumber = 50 } const maxShowNumber int32 = 200 if showNumber > maxShowNumber { showNumber = maxShowNumber } total, msgs, err := s.msgDatabase.SearchUserMessages(c, req.UserID, req.StartTime, req.EndTime, req.Content, pageNumber, showNumber) if err != nil { apiresp.GinError(c, err) return } sendIDs := make([]string, 0, len(msgs)) recvIDs := make([]string, 0, len(msgs)) groupIDs := make([]string, 0, len(msgs)) for _, item := range msgs { if item == nil || item.Msg == nil { continue } msg := item.Msg if msg.SendID != "" { sendIDs = append(sendIDs, msg.SendID) } switch msg.SessionType { case constant.ReadGroupChatType, constant.WriteGroupChatType: if msg.GroupID != "" { groupIDs = append(groupIDs, msg.GroupID) } default: if msg.RecvID != "" { recvIDs = append(recvIDs, msg.RecvID) } } } sendIDs = datautil.Distinct(sendIDs) recvIDs = datautil.Distinct(recvIDs) groupIDs = datautil.Distinct(groupIDs) sendMap, recvMap, groupMap := map[string]*sdkws.UserInfo{}, map[string]*sdkws.UserInfo{}, map[string]*sdkws.GroupInfo{} if s.userClient != nil { if len(sendIDs) > 0 { if users, err := s.userClient.GetUsersInfo(c, sendIDs); err == nil { sendMap = datautil.SliceToMap(users, (*sdkws.UserInfo).GetUserID) } } if len(recvIDs) > 0 { if users, err := s.userClient.GetUsersInfo(c, recvIDs); err == nil { recvMap = datautil.SliceToMap(users, (*sdkws.UserInfo).GetUserID) } } } if s.groupClient != nil && len(groupIDs) > 0 { if groups, err := s.groupClient.GetGroupsInfo(c, groupIDs); err == nil { groupMap = datautil.SliceToMap(groups, (*sdkws.GroupInfo).GetGroupID) } } records := make([]*apistruct.UserSendMsgQueryRecord, 0, len(msgs)) for _, item := range msgs { if item == nil || item.Msg == nil { continue } msg := item.Msg msgID := msg.ServerMsgID if msgID == "" { msgID = msg.ClientMsgID } senderName := msg.SenderNickname if senderName == "" { if u := sendMap[msg.SendID]; u != nil { senderName = u.Nickname } else { senderName = msg.SendID } } recvID := msg.RecvID recvName := "" if msg.SessionType == constant.ReadGroupChatType || msg.SessionType == constant.WriteGroupChatType { if msg.GroupID != "" { recvID = msg.GroupID } if g := groupMap[recvID]; g != nil { recvName = g.GroupName } else if recvID != "" { recvName = recvID } } else { if u := recvMap[msg.RecvID]; u != nil { recvName = u.Nickname } else if msg.RecvID != "" { recvName = msg.RecvID } } records = append(records, &apistruct.UserSendMsgQueryRecord{ MsgID: msgID, SendID: msg.SendID, SenderName: senderName, RecvID: recvID, RecvName: recvName, ContentType: msg.ContentType, ContentTypeName: contentTypeName(msg.ContentType), SessionType: msg.SessionType, ChatTypeName: chatTypeName(msg.SessionType), Content: msg.Content, SendTime: msg.SendTime, }) } apiresp.GinSuccess(c, &apistruct.UserSendMsgQueryResp{ Count: total, PageNumber: pageNumber, ShowNumber: showNumber, Records: records, }) } // parseTrendIntervalMillis 解析走势统计间隔并转换为毫秒 func parseTrendIntervalMillis(intervalMinutes int32) (int64, error) { switch intervalMinutes { case trendIntervalMinutes15, trendIntervalMinutes30, trendIntervalMinutes60: return int64(intervalMinutes) * int64(time.Minute/time.Millisecond), nil default: return 0, errs.ErrArgs.WrapMsg("invalid intervalMinutes") } } // normalizeTrendTimeRange 标准化走势统计时间区间 func normalizeTrendTimeRange(startTime int64, endTime int64) (int64, int64, error) { now := time.Now().UnixMilli() if endTime <= 0 { endTime = now } if startTime <= 0 { startTime = endTime - int64(defaultTrendDuration/time.Millisecond) } if startTime < 0 { startTime = 0 } if endTime <= startTime { return 0, 0, errs.ErrArgs.WrapMsg("invalid time range") } return startTime, endTime, nil } // alignTrendRange 对齐走势统计区间到间隔边界 func alignTrendRange(startTime int64, endTime int64, intervalMillis int64) (int64, int64) { if intervalMillis <= 0 { return startTime, endTime } // 开始时间向下对齐到间隔边界 bucketStart := startTime - (startTime % intervalMillis) if startTime < 0 { bucketStart = startTime - ((startTime % intervalMillis) + intervalMillis) } // 结束时间向下对齐到所在间隔的起始(只包含已发生的间隔) bucketEnd := endTime - (endTime % intervalMillis) if endTime < 0 && endTime%intervalMillis != 0 { bucketEnd = endTime - ((endTime % intervalMillis) + intervalMillis) } // 确保至少覆盖一个间隔 if bucketEnd < bucketStart { bucketEnd = bucketStart } return bucketStart, bucketEnd } // buildOnlineUserCountTrendPoints 构建在线人数走势数据点 func buildOnlineUserCountTrendPoints(samples []rediscache.OnlineUserCountSample, startTime int64, endTime int64, intervalMillis int64) []*apistruct.OnlineUserCountTrendItem { points := make([]*apistruct.OnlineUserCountTrendItem, 0) if intervalMillis <= 0 || endTime <= startTime { return points } maxMap := make(map[int64]int64) for _, sample := range samples { // 将采样时间戳对齐到间隔边界 bucket := sample.Timestamp - (sample.Timestamp % intervalMillis) // 处理负数时间戳的情况(虽然通常不会发生) if sample.Timestamp < 0 && sample.Timestamp%intervalMillis != 0 { bucket = sample.Timestamp - ((sample.Timestamp % intervalMillis) + intervalMillis) } if sample.Count > maxMap[bucket] { maxMap[bucket] = sample.Count } } // 计算需要生成的数据点数量 // endTime是对齐后的最后一个bucket的起始时间,所以需要包含它 estimated := int((endTime-startTime)/intervalMillis) + 1 if estimated > 0 { points = make([]*apistruct.OnlineUserCountTrendItem, 0, estimated) } // 生成从startTime到endTime(包含endTime)的所有时间点 // endTime已经是对齐后的最后一个bucket的起始时间 for ts := startTime; ts <= endTime; ts += intervalMillis { maxVal := maxMap[ts] points = append(points, &apistruct.OnlineUserCountTrendItem{ Timestamp: ts, OnlineCount: maxVal, }) } return points } // buildUserSendMsgCountTrendPoints 构建用户发送消息走势数据点 func buildUserSendMsgCountTrendPoints(countMap map[int64]int64, startTime int64, endTime int64, intervalMillis int64) []*apistruct.UserSendMsgCountTrendItem { points := make([]*apistruct.UserSendMsgCountTrendItem, 0) if intervalMillis <= 0 || endTime <= startTime { return points } estimated := int((endTime - startTime) / intervalMillis) if estimated > 0 { points = make([]*apistruct.UserSendMsgCountTrendItem, 0, estimated) } for ts := startTime; ts < endTime; ts += intervalMillis { points = append(points, &apistruct.UserSendMsgCountTrendItem{ Timestamp: ts, Count: countMap[ts], }) } return points } // mapTrendChatType 走势统计聊天类型转为 sessionType 列表 func mapTrendChatType(chatType int32) ([]int32, error) { switch chatType { case trendChatTypeSingle: return []int32{constant.SingleChatType}, nil case trendChatTypeGroup: return []int32{constant.ReadGroupChatType, constant.WriteGroupChatType}, nil default: return nil, errs.ErrArgs.WrapMsg("invalid chatType") } } // contentTypeName 消息类型名称转换 func contentTypeName(contentType int32) string { switch contentType { case constant.Text: return "文本消息" case constant.Picture: return "图片消息" case constant.Voice: return "语音消息" case constant.Video: return "视频消息" case constant.File: return "文件消息" case constant.AtText: return "艾特消息" case constant.Merger: return "合并消息" case constant.Card: return "名片消息" case constant.Location: return "位置消息" case constant.Custom: return "自定义消息" case constant.Revoke: return "撤回消息" case constant.MarkdownText: return "Markdown消息" default: return "未知消息" } } // chatTypeName 聊天类型名称转换 func chatTypeName(sessionType int32) string { switch sessionType { case constant.SingleChatType: return "单聊" case constant.ReadGroupChatType, constant.WriteGroupChatType: return "群聊" case constant.NotificationChatType: return "通知" default: return "未知" } }