// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mgo import ( "context" "time" "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database" "git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) // RedPacketMgo implements RedPacket using MongoDB as the storage backend. type RedPacketMgo struct { coll *mongo.Collection } // NewRedPacketMongo creates a new instance of RedPacketMgo with the provided MongoDB database. func NewRedPacketMongo(db *mongo.Database) (database.RedPacket, error) { coll := db.Collection(database.RedPacketName) _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ { Keys: bson.D{{Key: "red_packet_id", Value: 1}}, Options: options.Index().SetUnique(true), }, { Keys: bson.D{{Key: "send_user_id", Value: 1}, {Key: "create_time", Value: -1}}, }, { Keys: bson.D{{Key: "group_id", Value: 1}, {Key: "create_time", Value: -1}}, }, { Keys: bson.D{{Key: "expire_time", Value: 1}}, }, }) if err != nil { return nil, err } return &RedPacketMgo{coll: coll}, nil } // Create creates a new red packet record. func (r *RedPacketMgo) Create(ctx context.Context, redPacket *model.RedPacket) error { if redPacket.CreateTime.IsZero() { redPacket.CreateTime = time.Now() } return mongoutil.InsertOne(ctx, r.coll, redPacket) } // Take retrieves a red packet by ID. Returns an error if not found. func (r *RedPacketMgo) Take(ctx context.Context, redPacketID string) (*model.RedPacket, error) { return mongoutil.FindOne[*model.RedPacket](ctx, r.coll, bson.M{"red_packet_id": redPacketID}) } // UpdateStatus updates the status of a red packet. func (r *RedPacketMgo) UpdateStatus(ctx context.Context, redPacketID string, status int32) error { return mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, bson.M{"$set": bson.M{"status": status}}, false) } // UpdateRemain updates the remain amount and count of a red packet. func (r *RedPacketMgo) UpdateRemain(ctx context.Context, redPacketID string, remainAmount int64, remainCount int32) error { update := bson.M{ "$set": bson.M{ "remain_amount": remainAmount, "remain_count": remainCount, }, } // If remain count is 0, update status to finished if remainCount == 0 { update["$set"].(bson.M)["status"] = model.RedPacketStatusFinished } return mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, update, false) } // DecreaseRemainAtomic 原子性地减少红包剩余数量和金额(防止并发问题) // 只有在 remain_count > 0 且状态为 Active 时才会更新 func (r *RedPacketMgo) DecreaseRemainAtomic(ctx context.Context, redPacketID string, amount int64) (*model.RedPacket, error) { // 过滤条件:红包ID匹配、剩余数量>0、状态为Active filter := bson.M{ "red_packet_id": redPacketID, "remain_count": bson.M{"$gt": 0}, "status": model.RedPacketStatusActive, } // 使用 $inc 原子性地减少剩余数量和金额 update := bson.M{ "$inc": bson.M{ "remain_amount": -amount, "remain_count": -1, }, } // 使用 findOneAndUpdate 返回更新后的文档 opts := options.FindOneAndUpdate().SetReturnDocument(options.After) var updatedRedPacket model.RedPacket err := r.coll.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updatedRedPacket) if err != nil { if err == mongo.ErrNoDocuments { // 红包不存在、已领完或状态不正确 return nil, errs.ErrArgs.WrapMsg("red packet not available (already finished or expired)") } return nil, err } // 如果剩余数量为0,更新状态为已完成 if updatedRedPacket.RemainCount == 0 { statusUpdate := bson.M{"$set": bson.M{"status": model.RedPacketStatusFinished}} _ = mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, statusUpdate, false) updatedRedPacket.Status = model.RedPacketStatusFinished } return &updatedRedPacket, nil } // FindExpiredRedPackets finds red packets that have expired. func (r *RedPacketMgo) FindExpiredRedPackets(ctx context.Context, beforeTime time.Time) ([]*model.RedPacket, error) { filter := bson.M{ "expire_time": bson.M{"$lt": beforeTime}, "status": model.RedPacketStatusActive, } return mongoutil.Find[*model.RedPacket](ctx, r.coll, filter) } // FindRedPacketsByUser finds red packets sent by a user with pagination. func (r *RedPacketMgo) FindRedPacketsByUser(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) { filter := bson.M{"send_user_id": userID} return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, filter, pagination, &options.FindOptions{ Sort: bson.D{{Key: "create_time", Value: -1}}, }) } // FindRedPacketsByGroup finds red packets in a group with pagination. func (r *RedPacketMgo) FindRedPacketsByGroup(ctx context.Context, groupID string, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) { filter := bson.M{"group_id": groupID} return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, filter, pagination, &options.FindOptions{ Sort: bson.D{{Key: "create_time", Value: -1}}, }) } // FindAllRedPackets finds all red packets with pagination. func (r *RedPacketMgo) FindAllRedPackets(ctx context.Context, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) { return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, bson.M{}, pagination, &options.FindOptions{ Sort: bson.D{{Key: "create_time", Value: -1}}, }) } // RedPacketReceiveMgo implements RedPacketReceive using MongoDB as the storage backend. type RedPacketReceiveMgo struct { coll *mongo.Collection } // NewRedPacketReceiveMongo creates a new instance of RedPacketReceiveMgo with the provided MongoDB database. func NewRedPacketReceiveMongo(db *mongo.Database) (database.RedPacketReceive, error) { coll := db.Collection(database.RedPacketReceiveName) _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ { Keys: bson.D{{Key: "receive_id", Value: 1}}, Options: options.Index().SetUnique(true), }, { Keys: bson.D{{Key: "red_packet_id", Value: 1}, {Key: "receive_time", Value: -1}}, }, { Keys: bson.D{{Key: "receive_user_id", Value: 1}, {Key: "red_packet_id", Value: 1}}, Options: options.Index().SetUnique(true), }, { Keys: bson.D{{Key: "receive_user_id", Value: 1}, {Key: "receive_time", Value: -1}}, }, }) if err != nil { return nil, err } return &RedPacketReceiveMgo{coll: coll}, nil } // Create creates a new red packet receive record. func (r *RedPacketReceiveMgo) Create(ctx context.Context, receive *model.RedPacketReceive) error { if receive.ReceiveTime.IsZero() { receive.ReceiveTime = time.Now() } return mongoutil.InsertOne(ctx, r.coll, receive) } // Take retrieves a receive record by ID. Returns an error if not found. func (r *RedPacketReceiveMgo) Take(ctx context.Context, receiveID string) (*model.RedPacketReceive, error) { return mongoutil.FindOne[*model.RedPacketReceive](ctx, r.coll, bson.M{"receive_id": receiveID}) } // FindByRedPacketID finds all receive records for a red packet. func (r *RedPacketReceiveMgo) FindByRedPacketID(ctx context.Context, redPacketID string) ([]*model.RedPacketReceive, error) { return mongoutil.Find[*model.RedPacketReceive](ctx, r.coll, bson.M{"red_packet_id": redPacketID}, &options.FindOptions{ Sort: bson.D{{Key: "receive_time", Value: 1}}, }) } // FindByUserAndRedPacketID finds if a user has received a specific red packet. func (r *RedPacketReceiveMgo) FindByUserAndRedPacketID(ctx context.Context, userID, redPacketID string) (*model.RedPacketReceive, error) { return mongoutil.FindOne[*model.RedPacketReceive](ctx, r.coll, bson.M{ "receive_user_id": userID, "red_packet_id": redPacketID, }) } // FindByUser finds all red packets received by a user with pagination. func (r *RedPacketReceiveMgo) FindByUser(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, receives []*model.RedPacketReceive, err error) { filter := bson.M{"receive_user_id": userID} return mongoutil.FindPage[*model.RedPacketReceive](ctx, r.coll, filter, pagination, &options.FindOptions{ Sort: bson.D{{Key: "receive_time", Value: -1}}, }) } // DeleteByReceiveID deletes a receive record by receive ID (for cleanup on failure). func (r *RedPacketReceiveMgo) DeleteByReceiveID(ctx context.Context, receiveID string) error { return mongoutil.DeleteOne(ctx, r.coll, bson.M{"receive_id": receiveID}) }