Files
kim.dev.6789 e50142a3b9 复制项目
2026-01-14 22:16:44 +08:00

235 lines
9.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgo
import (
"context"
"time"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/database"
"git.imall.cloud/openim/open-im-server-deploy/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// RedPacketMgo implements RedPacket using MongoDB as the storage backend.
type RedPacketMgo struct {
coll *mongo.Collection
}
// NewRedPacketMongo creates a new instance of RedPacketMgo with the provided MongoDB database.
func NewRedPacketMongo(db *mongo.Database) (database.RedPacket, error) {
coll := db.Collection(database.RedPacketName)
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "red_packet_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "send_user_id", Value: 1}, {Key: "create_time", Value: -1}},
},
{
Keys: bson.D{{Key: "group_id", Value: 1}, {Key: "create_time", Value: -1}},
},
{
Keys: bson.D{{Key: "expire_time", Value: 1}},
},
})
if err != nil {
return nil, err
}
return &RedPacketMgo{coll: coll}, nil
}
// Create creates a new red packet record.
func (r *RedPacketMgo) Create(ctx context.Context, redPacket *model.RedPacket) error {
if redPacket.CreateTime.IsZero() {
redPacket.CreateTime = time.Now()
}
return mongoutil.InsertOne(ctx, r.coll, redPacket)
}
// Take retrieves a red packet by ID. Returns an error if not found.
func (r *RedPacketMgo) Take(ctx context.Context, redPacketID string) (*model.RedPacket, error) {
return mongoutil.FindOne[*model.RedPacket](ctx, r.coll, bson.M{"red_packet_id": redPacketID})
}
// UpdateStatus updates the status of a red packet.
func (r *RedPacketMgo) UpdateStatus(ctx context.Context, redPacketID string, status int32) error {
return mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, bson.M{"$set": bson.M{"status": status}}, false)
}
// UpdateRemain updates the remain amount and count of a red packet.
func (r *RedPacketMgo) UpdateRemain(ctx context.Context, redPacketID string, remainAmount int64, remainCount int32) error {
update := bson.M{
"$set": bson.M{
"remain_amount": remainAmount,
"remain_count": remainCount,
},
}
// If remain count is 0, update status to finished
if remainCount == 0 {
update["$set"].(bson.M)["status"] = model.RedPacketStatusFinished
}
return mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, update, false)
}
// DecreaseRemainAtomic 原子性地减少红包剩余数量和金额(防止并发问题)
// 只有在 remain_count > 0 且状态为 Active 时才会更新
func (r *RedPacketMgo) DecreaseRemainAtomic(ctx context.Context, redPacketID string, amount int64) (*model.RedPacket, error) {
// 过滤条件红包ID匹配、剩余数量>0、状态为Active
filter := bson.M{
"red_packet_id": redPacketID,
"remain_count": bson.M{"$gt": 0},
"status": model.RedPacketStatusActive,
}
// 使用 $inc 原子性地减少剩余数量和金额
update := bson.M{
"$inc": bson.M{
"remain_amount": -amount,
"remain_count": -1,
},
}
// 使用 findOneAndUpdate 返回更新后的文档
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var updatedRedPacket model.RedPacket
err := r.coll.FindOneAndUpdate(ctx, filter, update, opts).Decode(&updatedRedPacket)
if err != nil {
if err == mongo.ErrNoDocuments {
// 红包不存在、已领完或状态不正确
return nil, errs.ErrArgs.WrapMsg("red packet not available (already finished or expired)")
}
return nil, err
}
// 如果剩余数量为0更新状态为已完成
if updatedRedPacket.RemainCount == 0 {
statusUpdate := bson.M{"$set": bson.M{"status": model.RedPacketStatusFinished}}
_ = mongoutil.UpdateOne(ctx, r.coll, bson.M{"red_packet_id": redPacketID}, statusUpdate, false)
updatedRedPacket.Status = model.RedPacketStatusFinished
}
return &updatedRedPacket, nil
}
// FindExpiredRedPackets finds red packets that have expired.
func (r *RedPacketMgo) FindExpiredRedPackets(ctx context.Context, beforeTime time.Time) ([]*model.RedPacket, error) {
filter := bson.M{
"expire_time": bson.M{"$lt": beforeTime},
"status": model.RedPacketStatusActive,
}
return mongoutil.Find[*model.RedPacket](ctx, r.coll, filter)
}
// FindRedPacketsByUser finds red packets sent by a user with pagination.
func (r *RedPacketMgo) FindRedPacketsByUser(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) {
filter := bson.M{"send_user_id": userID}
return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, filter, pagination, &options.FindOptions{
Sort: bson.D{{Key: "create_time", Value: -1}},
})
}
// FindRedPacketsByGroup finds red packets in a group with pagination.
func (r *RedPacketMgo) FindRedPacketsByGroup(ctx context.Context, groupID string, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) {
filter := bson.M{"group_id": groupID}
return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, filter, pagination, &options.FindOptions{
Sort: bson.D{{Key: "create_time", Value: -1}},
})
}
// FindAllRedPackets finds all red packets with pagination.
func (r *RedPacketMgo) FindAllRedPackets(ctx context.Context, pagination pagination.Pagination) (total int64, redPackets []*model.RedPacket, err error) {
return mongoutil.FindPage[*model.RedPacket](ctx, r.coll, bson.M{}, pagination, &options.FindOptions{
Sort: bson.D{{Key: "create_time", Value: -1}},
})
}
// RedPacketReceiveMgo implements RedPacketReceive using MongoDB as the storage backend.
type RedPacketReceiveMgo struct {
coll *mongo.Collection
}
// NewRedPacketReceiveMongo creates a new instance of RedPacketReceiveMgo with the provided MongoDB database.
func NewRedPacketReceiveMongo(db *mongo.Database) (database.RedPacketReceive, error) {
coll := db.Collection(database.RedPacketReceiveName)
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{{Key: "receive_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "red_packet_id", Value: 1}, {Key: "receive_time", Value: -1}},
},
{
Keys: bson.D{{Key: "receive_user_id", Value: 1}, {Key: "red_packet_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "receive_user_id", Value: 1}, {Key: "receive_time", Value: -1}},
},
})
if err != nil {
return nil, err
}
return &RedPacketReceiveMgo{coll: coll}, nil
}
// Create creates a new red packet receive record.
func (r *RedPacketReceiveMgo) Create(ctx context.Context, receive *model.RedPacketReceive) error {
if receive.ReceiveTime.IsZero() {
receive.ReceiveTime = time.Now()
}
return mongoutil.InsertOne(ctx, r.coll, receive)
}
// Take retrieves a receive record by ID. Returns an error if not found.
func (r *RedPacketReceiveMgo) Take(ctx context.Context, receiveID string) (*model.RedPacketReceive, error) {
return mongoutil.FindOne[*model.RedPacketReceive](ctx, r.coll, bson.M{"receive_id": receiveID})
}
// FindByRedPacketID finds all receive records for a red packet.
func (r *RedPacketReceiveMgo) FindByRedPacketID(ctx context.Context, redPacketID string) ([]*model.RedPacketReceive, error) {
return mongoutil.Find[*model.RedPacketReceive](ctx, r.coll, bson.M{"red_packet_id": redPacketID}, &options.FindOptions{
Sort: bson.D{{Key: "receive_time", Value: 1}},
})
}
// FindByUserAndRedPacketID finds if a user has received a specific red packet.
func (r *RedPacketReceiveMgo) FindByUserAndRedPacketID(ctx context.Context, userID, redPacketID string) (*model.RedPacketReceive, error) {
return mongoutil.FindOne[*model.RedPacketReceive](ctx, r.coll, bson.M{
"receive_user_id": userID,
"red_packet_id": redPacketID,
})
}
// FindByUser finds all red packets received by a user with pagination.
func (r *RedPacketReceiveMgo) FindByUser(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, receives []*model.RedPacketReceive, err error) {
filter := bson.M{"receive_user_id": userID}
return mongoutil.FindPage[*model.RedPacketReceive](ctx, r.coll, filter, pagination, &options.FindOptions{
Sort: bson.D{{Key: "receive_time", Value: -1}},
})
}
// DeleteByReceiveID deletes a receive record by receive ID (for cleanup on failure).
func (r *RedPacketReceiveMgo) DeleteByReceiveID(ctx context.Context, receiveID string) error {
return mongoutil.DeleteOne(ctx, r.coll, bson.M{"receive_id": receiveID})
}