复制项目

This commit is contained in:
kim.dev.6789
2026-01-14 22:16:44 +08:00
parent e2577b8cee
commit e50142a3b9
691 changed files with 97009 additions and 1 deletions

View File

@@ -0,0 +1,112 @@
package msgtransfer
import (
"context"
"encoding/base64"
"git.imall.cloud/openim/open-im-server-deploy/pkg/apistruct"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/config"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/webhook"
"git.imall.cloud/openim/protocol/constant"
"git.imall.cloud/openim/protocol/sdkws"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
cbapi "git.imall.cloud/openim/open-im-server-deploy/pkg/callbackstruct"
)
func toCommonCallback(ctx context.Context, msg *sdkws.MsgData, command string) cbapi.CommonCallbackReq {
return cbapi.CommonCallbackReq{
SendID: msg.SendID,
ServerMsgID: msg.ServerMsgID,
CallbackCommand: command,
ClientMsgID: msg.ClientMsgID,
OperationID: mcontext.GetOperationID(ctx),
SenderPlatformID: msg.SenderPlatformID,
SenderNickname: msg.SenderNickname,
SessionType: msg.SessionType,
MsgFrom: msg.MsgFrom,
ContentType: msg.ContentType,
Status: msg.Status,
SendTime: msg.SendTime,
CreateTime: msg.CreateTime,
AtUserIDList: msg.AtUserIDList,
SenderFaceURL: msg.SenderFaceURL,
Content: GetContent(msg),
Seq: uint32(msg.Seq),
Ex: msg.Ex,
}
}
func GetContent(msg *sdkws.MsgData) string {
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
var tips sdkws.TipsComm
_ = proto.Unmarshal(msg.Content, &tips)
content := tips.JsonDetail
return content
} else {
return string(msg.Content)
}
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if !filterAfterMsg(msg, after) {
return
}
cbReq := &cbapi.CallbackAfterMsgSaveDBReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand),
RecvID: msg.RecvID,
GroupID: msg.GroupID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg))
}
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
keyMsgData := apistruct.KeyMsgData{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
}
return map[string]string{
webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)),
}
}
func filterAfterMsg(msg *sdkws.MsgData, after *config.AfterConfig) bool {
return filterMsg(msg, after.AttentionIds, after.DeniedTypes)
}
func filterMsg(msg *sdkws.MsgData, attentionIds []string, deniedTypes []int32) bool {
// According to the attentionIds configuration, only some users are sent
if len(attentionIds) != 0 && msg.ContentType == constant.SingleChatType && !datautil.Contain(msg.RecvID, attentionIds...) {
return false
}
if len(attentionIds) != 0 && msg.ContentType == constant.ReadGroupChatType && !datautil.Contain(msg.GroupID, attentionIds...) {
return false
}
if defaultDeniedTypes(msg.ContentType) {
return false
}
if len(deniedTypes) != 0 && datautil.Contain(msg.ContentType, deniedTypes...) {
return false
}
return true
}
func defaultDeniedTypes(contentType int32) bool {
if contentType >= constant.NotificationBegin && contentType <= constant.NotificationEnd {
return true
}
if contentType == constant.Typing {
return true
}
return false
}

View File

@@ -0,0 +1,188 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msgtransfer
import (
"context"
"time"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache/mcache"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/cache/redis"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database/mgo"
"git.imall.cloud/openim/open-im-server-deploy/pkg/dbbuild"
"git.imall.cloud/openim/open-im-server-deploy/pkg/mqbuild"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/tools/utils/runtimeenv"
conf "git.imall.cloud/openim/open-im-server-deploy/pkg/common/config"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/controller"
"github.com/openimsdk/tools/log"
"google.golang.org/grpc"
)
type MsgTransfer struct {
historyConsumer mq.Consumer
historyMongoConsumer mq.Consumer
// This consumer aggregated messages, subscribed to the topic:toRedis,
// the message is stored in redis, Incr Redis, and then the message is sent to toPush topic for push,
// and the message is sent to toMongo topic for persistence
historyHandler *OnlineHistoryRedisConsumerHandler
//This consumer handle message to mongo
historyMongoHandler *OnlineHistoryMongoConsumerHandler
ctx context.Context
//cancel context.CancelFunc
}
type Config struct {
MsgTransfer conf.MsgTransfer
RedisConfig conf.Redis
MongodbConfig conf.Mongo
KafkaConfig conf.Kafka
Share conf.Share
WebhooksConfig conf.Webhooks
Discovery conf.Discovery
Index conf.Index
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server grpc.ServiceRegistrar) error {
builder := mqbuild.NewBuilder(&config.KafkaConfig)
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runtimeenv.RuntimeEnvironment(), "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", config.Index)
dbb := dbbuild.NewBuilder(&config.MongodbConfig, &config.RedisConfig)
mgocli, err := dbb.Mongo(ctx)
if err != nil {
return err
}
rdb, err := dbb.Redis(ctx)
if err != nil {
return err
}
//if config.Discovery.Enable == conf.ETCD {
// cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
// config.MsgTransfer.GetConfigFileName(),
// config.RedisConfig.GetConfigFileName(),
// config.MongodbConfig.GetConfigFileName(),
// config.KafkaConfig.GetConfigFileName(),
// config.Share.GetConfigFileName(),
// config.WebhooksConfig.GetConfigFileName(),
// config.Discovery.GetConfigFileName(),
// conf.LogConfigFileName,
// })
// cm.Watch(ctx)
//}
mongoProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToMongoTopic)
if err != nil {
return err
}
pushProducer, err := builder.GetTopicProducer(ctx, config.KafkaConfig.ToPushTopic)
if err != nil {
return err
}
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
var msgModel cache.MsgCache
if rdb == nil {
cm, err := mgo.NewCacheMgo(mgocli.GetDB())
if err != nil {
return err
}
msgModel = mcache.NewMsgCache(cm, msgDocModel)
} else {
msgModel = redis.NewMsgCache(rdb, msgDocModel)
}
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, mongoProducer, pushProducer)
if err != nil {
return err
}
historyConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToRedisTopic)
if err != nil {
return err
}
historyMongoConsumer, err := builder.GetTopicConsumer(ctx, config.KafkaConfig.ToMongoTopic)
if err != nil {
return err
}
historyHandler, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase)
if err != nil {
return err
}
historyMongoHandler := NewOnlineHistoryMongoConsumerHandler(msgTransferDatabase, config)
msgTransfer := &MsgTransfer{
historyConsumer: historyConsumer,
historyMongoConsumer: historyMongoConsumer,
historyHandler: historyHandler,
historyMongoHandler: historyMongoHandler,
}
return msgTransfer.Start(ctx)
}
func (m *MsgTransfer) Start(ctx context.Context) error {
m.ctx = ctx
go func() {
for {
if err := m.historyConsumer.Subscribe(m.ctx, m.historyHandler.HandlerRedisMessage); err != nil {
log.ZError(m.ctx, "historyConsumer err, will retry in 5 seconds", err)
time.Sleep(5 * time.Second)
continue
}
// Subscribe returned normally (possibly due to context cancellation), retry immediately
log.ZWarn(m.ctx, "historyConsumer Subscribe returned normally, will retry immediately", nil)
}
}()
go func() {
fn := func(msg mq.Message) error {
m.historyMongoHandler.HandleChatWs2Mongo(msg)
return nil
}
for {
if err := m.historyMongoConsumer.Subscribe(m.ctx, fn); err != nil {
log.ZError(m.ctx, "historyMongoConsumer err, will retry in 5 seconds", err)
time.Sleep(5 * time.Second)
continue
}
// Subscribe returned normally (possibly due to context cancellation), retry immediately
log.ZWarn(m.ctx, "historyMongoConsumer Subscribe returned normally, will retry immediately", nil)
}
}()
go m.historyHandler.HandleUserHasReadSeqMessages(m.ctx)
err := m.historyHandler.redisMessageBatches.Start()
if err != nil {
return err
}
<-m.ctx.Done()
return m.ctx.Err()
}

View File

@@ -0,0 +1,410 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msgtransfer
import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/mq"
"sync"
"time"
"git.imall.cloud/openim/open-im-server-deploy/pkg/rpcli"
"github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/prommetrics"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/controller"
"git.imall.cloud/openim/open-im-server-deploy/pkg/msgprocessor"
"git.imall.cloud/openim/open-im-server-deploy/pkg/tools/batcher"
"git.imall.cloud/openim/protocol/constant"
pbconv "git.imall.cloud/openim/protocol/conversation"
"git.imall.cloud/openim/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
)
const (
size = 1000
mainDataBuffer = 2000
subChanBuffer = 200
worker = 100
interval = 100 * time.Millisecond
hasReadChanBuffer = 2000
)
type ContextMsg struct {
message *sdkws.MsgData
ctx context.Context
}
// This structure is used for asynchronously writing the senders read sequence (seq) regarding a message into MongoDB.
// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10.
type userHasReadSeq struct {
conversationID string
userHasReadMap map[string]int64
}
type OnlineHistoryRedisConsumerHandler struct {
redisMessageBatches *batcher.Batcher[ConsumerMessage]
msgTransferDatabase controller.MsgTransferDatabase
conversationUserHasReadChan chan *userHasReadSeq
wg sync.WaitGroup
groupClient *rpcli.GroupClient
conversationClient *rpcli.ConversationClient
}
type ConsumerMessage struct {
Ctx context.Context
Key string
Value []byte
Raw mq.Message
}
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.Conn, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return nil, err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
if err != nil {
return nil, err
}
var och OnlineHistoryRedisConsumerHandler
och.msgTransferDatabase = database
och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer)
och.groupClient = rpcli.NewGroupClient(groupConn)
och.conversationClient = rpcli.NewConversationClient(conversationConn)
och.wg.Add(1)
b := batcher.New[ConsumerMessage](
batcher.WithSize(size),
batcher.WithWorker(worker),
batcher.WithInterval(interval),
batcher.WithDataBuffer(mainDataBuffer),
batcher.WithSyncWait(true),
batcher.WithBuffer(subChanBuffer),
)
b.Sharding = func(key string) int {
hashCode := stringutil.GetHashCode(key)
return int(hashCode) % och.redisMessageBatches.Worker()
}
b.Key = func(consumerMessage *ConsumerMessage) string {
return consumerMessage.Key
}
b.Do = och.do
och.redisMessageBatches = b
och.redisMessageBatches.OnComplete = func(lastMessage *ConsumerMessage, totalCount int) {
lastMessage.Raw.Mark()
lastMessage.Raw.Commit()
}
return &och, nil
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) {
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
log.ZDebug(ctx, "number of categorized messages", "storageMsgList", len(storageMsgList), "notStorageMsgList",
len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList))
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMessages[0].message)
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMessages[0].message)
och.handleMsg(ctx, val.Key(), conversationIDMsg, storageMsgList, notStorageMsgList)
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
// Outer map: conversationID -> (userID -> maxHasReadSeq)
conversationUserSeq := make(map[string]map[string]int64)
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
continue
}
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
continue
}
// Calculate the max seq from tips.Seqs
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
}
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
}
}
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
// persist to db
for convID, userSeqMap := range conversationUserSeq {
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {
ctxMsg := &ContextMsg{}
msgFromMQ := &sdkws.MsgData{}
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
if err != nil {
log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
continue
}
ctxMsg.ctx = consumerMessages[i].Ctx
ctxMsg.message = msgFromMQ
log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key", consumerMessages[i].Key)
ctxMessages = append(ctxMessages, ctxMsg)
}
return ctxMessages
}
// Get messages/notifications stored message list, not stored and pushed message list.
func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs []*ContextMsg) (storageMsgList,
notStorageMsgList, storageNotificationList, notStorageNotificationList []*ContextMsg) {
for _, v := range totalMsgs {
options := msgprocessor.Options(v.message.Options)
if !options.IsNotNotification() {
// clone msg from notificationMsg
if options.IsSendMsg() {
msg := proto.Clone(v.message).(*sdkws.MsgData)
// message
if v.message.Options != nil {
msg.Options = msgprocessor.NewMsgOptions()
}
msg.Options = msgprocessor.WithOptions(msg.Options,
msgprocessor.WithOfflinePush(options.IsOfflinePush()),
msgprocessor.WithUnreadCount(options.IsUnreadCount()),
)
v.message.Options = msgprocessor.WithOptions(
v.message.Options,
msgprocessor.WithOfflinePush(false),
msgprocessor.WithUnreadCount(false),
)
ctxMsg := &ContextMsg{
message: msg,
ctx: v.ctx,
}
storageMsgList = append(storageMsgList, ctxMsg)
}
if options.IsHistory() {
storageNotificationList = append(storageNotificationList, v)
} else {
notStorageNotificationList = append(notStorageNotificationList, v)
}
} else {
if options.IsHistory() {
storageMsgList = append(storageMsgList, v)
} else {
notStorageMsgList = append(notStorageMsgList, v)
}
}
}
return
}
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
log.ZInfo(ctx, "handle storage msg")
for _, storageMsg := range storageList {
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
}
och.toPushTopic(ctx, key, conversationID, notStorageList)
var storageMessageList []*sdkws.MsgData
for _, msg := range storageList {
storageMessageList = append(storageMessageList, msg.message)
}
if len(storageMessageList) > 0 {
msg := storageMessageList[0]
lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return
}
log.ZInfo(ctx, "BatchInsertChat2Cache end")
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
}
select {
case och.conversationUserHasReadChan <- &userHasReadSeq{
conversationID: conversationID,
userHasReadMap: userSeqMap,
}:
default:
log.ZWarn(ctx, "conversationUserHasReadChan full, drop userHasReadSeq update", nil,
"conversationID", conversationID, "userSeqMapSize", len(userSeqMap))
}
if isNewConversation {
switch msg.SessionType {
case constant.ReadGroupChatType:
log.ZDebug(ctx, "group chat first create conversation", "conversationID",
conversationID)
userIDs, err := och.groupClient.GetGroupMemberUserIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
conversationID)
} else {
log.ZInfo(ctx, "GetGroupMemberIDs end")
if err := och.conversationClient.CreateGroupChatConversations(ctx, msg.GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err,
"conversationID", conversationID)
}
}
case constant.SingleChatType, constant.NotificationChatType:
req := &pbconv.CreateSingleChatConversationsReq{
RecvID: msg.RecvID,
SendID: msg.SendID,
ConversationID: conversationID,
ConversationType: msg.SessionType,
}
if err := och.conversationClient.CreateSingleChatConversations(ctx, req); err != nil {
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
"conversationID", conversationID, "sessionType", msg.SessionType)
}
default:
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
msg.SessionType)
}
}
log.ZInfo(ctx, "success incr to next topic")
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
}
log.ZInfo(ctx, "MsgToMongoMQ end")
och.toPushTopic(ctx, key, conversationID, storageList)
log.ZInfo(ctx, "toPushTopic end")
}
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, key, conversationID string,
storageList, notStorageList []*ContextMsg) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
var storageMessageList []*sdkws.MsgData
for _, msg := range storageList {
storageMessageList = append(storageMessageList, msg.message)
}
if len(storageMessageList) > 0 {
lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
"storageList", storageMessageList)
return
}
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
}
och.toPushTopic(ctx, key, conversationID, storageList)
}
}
func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
log.ZPanic(ctx, "HandleUserHasReadSeqMessages Panic", errs.ErrPanic(r))
}
}()
defer och.wg.Done()
for msg := range och.conversationUserHasReadChan {
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil {
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap)
}
}
log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages")
}
func (och *OnlineHistoryRedisConsumerHandler) Close() {
close(och.conversationUserHasReadChan)
och.wg.Wait()
}
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs {
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
if err := och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message); err != nil {
log.ZError(ctx, "msg to push topic error", err, "msg", v.message.String())
}
}
}
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
var allMessageOperationID string
for i, v := range values {
if opid := mcontext.GetOperationID(v.ctx); opid != "" {
if i == 0 {
allMessageOperationID += opid
} else {
allMessageOperationID += "$" + opid
}
}
}
return mcontext.SetOperationID(ctx, allMessageOperationID)
}
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(msg mq.Message) error { // a instance in the consumer group
err := och.redisMessageBatches.Put(msg.Context(), &ConsumerMessage{Ctx: msg.Context(), Key: msg.Key(), Value: msg.Value(), Raw: msg})
if err != nil {
log.ZWarn(msg.Context(), "put msg to error", err, "key", msg.Key(), "value", msg.Value())
}
return nil
}

View File

@@ -0,0 +1,78 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msgtransfer
import (
"github.com/openimsdk/tools/mq"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/prommetrics"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/controller"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/webhook"
pbmsg "git.imall.cloud/openim/protocol/msg"
"github.com/openimsdk/tools/log"
"google.golang.org/protobuf/proto"
)
type OnlineHistoryMongoConsumerHandler struct {
msgTransferDatabase controller.MsgTransferDatabase
config *Config
webhookClient *webhook.Client
}
func NewOnlineHistoryMongoConsumerHandler(database controller.MsgTransferDatabase, config *Config) *OnlineHistoryMongoConsumerHandler {
return &OnlineHistoryMongoConsumerHandler{
msgTransferDatabase: database,
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
}
}
func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message) {
ctx := val.Context()
key := val.Key()
msg := val.Value()
msgFromMQ := pbmsg.MsgDataToMongoByMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.ZError(ctx, "unmarshall failed", err, "key", key, "len", len(msg))
return
}
if len(msgFromMQ.MsgData) == 0 {
log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "key", key, "msg", msg)
return
}
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
prommetrics.MsgInsertMongoFailedCounter.Inc()
} else {
prommetrics.MsgInsertMongoSuccessCounter.Inc()
val.Mark()
}
for _, msgData := range msgFromMQ.MsgData {
mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData)
}
//var seqs []int64
//for _, msg := range msgFromMQ.MsgData {
// seqs = append(seqs, msg.Seq)
//}
//if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil {
// log.ZError(ctx, "remove cache msg from redis err", err, "msg",
// msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
//}
}