// Copyright © 2023 OpenIM open source community. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package chat import ( "context" "fmt" "time" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" chatdb "git.imall.cloud/openim/chat/pkg/common/db/table/chat" ) func NewWallet(db *mongo.Database) (chatdb.WalletInterface, error) { coll := db.Collection("wallets") _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ { Keys: bson.D{{Key: "user_id", Value: 1}}, Options: options.Index().SetUnique(true), }, }) if err != nil { return nil, errs.Wrap(err) } return &Wallet{coll: coll}, nil } type Wallet struct { coll *mongo.Collection } func (o *Wallet) Create(ctx context.Context, wallets ...*chatdb.Wallet) error { return mongoutil.InsertMany(ctx, o.coll, wallets) } func (o *Wallet) Take(ctx context.Context, userID string) (*chatdb.Wallet, error) { return mongoutil.FindOne[*chatdb.Wallet](ctx, o.coll, bson.M{"user_id": userID}) } func (o *Wallet) Find(ctx context.Context, userIDs []string) ([]*chatdb.Wallet, error) { return mongoutil.Find[*chatdb.Wallet](ctx, o.coll, bson.M{"user_id": bson.M{"$in": userIDs}}) } func (o *Wallet) Update(ctx context.Context, userID string, data map[string]any) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": data}, true) } func (o *Wallet) UpdateBalance(ctx context.Context, userID string, balance int64) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": bson.M{"balance": balance}}, true) } // IncrementBalance 原子更新余额,使用 $inc 操作符防止并发问题 // 返回更新前后的余额 // 如果 amount 是负数(扣款),会检查余额是否足够,余额不足时返回错误 func (o *Wallet) IncrementBalance(ctx context.Context, userID string, amount int64) (beforeBalance int64, afterBalance int64, err error) { // 如果 amount 是负数(扣款),需要确保余额不会变为负数 filter := bson.M{"user_id": userID} if amount < 0 { // 扣款时,确保余额足够(balance >= -amount,即 balance + amount >= 0) // 例如:余额 100,扣款 -150,则 balance >= 150 不满足,更新失败 filter["balance"] = bson.M{"$gte": -amount} } update := bson.M{ "$inc": bson.M{"balance": amount}, "$set": bson.M{"update_time": time.Now()}, } opts := options.FindOneAndUpdate(). SetReturnDocument(options.After). // 返回更新后的文档 SetUpsert(true) // 如果不存在则创建 var wallet chatdb.Wallet err = o.coll.FindOneAndUpdate(ctx, filter, update, opts).Decode(&wallet) if err != nil { if err == mongo.ErrNoDocuments { // 如果是因为余额不足导致更新失败(filter 条件不满足) if amount < 0 { // 获取当前余额用于错误提示 currentWallet, takeErr := o.Take(ctx, userID) if takeErr == nil && currentWallet != nil { return currentWallet.Balance, currentWallet.Balance, errs.NewCodeError(errs.ErrArgs.Code(), fmt.Sprintf("余额不足:当前余额为 %d 分,需要 %d 分", currentWallet.Balance, -amount)) } // 如果钱包不存在,说明余额为0,无法扣款 return 0, 0, errs.NewCodeError(errs.ErrArgs.Code(), fmt.Sprintf("余额不足:钱包不存在或余额为0,需要 %d 分", -amount)) } // 如果是增加余额但钱包不存在,应该由 upsert 创建,不应该到这里 // 如果到这里说明有其他问题 return 0, 0, errs.NewCodeError(errs.ErrArgs.Code(), "更新钱包余额失败") } return 0, 0, errs.Wrap(err) } // 计算更新前的余额 beforeBalance = wallet.Balance - amount afterBalance = wallet.Balance // 双重检查:确保余额不为负数(虽然 filter 已经保证,但为了安全再加一次检查) if afterBalance < 0 { // 如果余额为负数,回滚操作 rollbackUpdate := bson.M{ "$inc": bson.M{"balance": -amount}, // 回滚 "$set": bson.M{"update_time": time.Now()}, } _ = o.coll.FindOneAndUpdate(ctx, bson.M{"user_id": userID}, rollbackUpdate, options.FindOneAndUpdate().SetReturnDocument(options.After)) return beforeBalance, beforeBalance, errs.NewCodeError(errs.ErrArgs.Code(), "余额更新后不能为负数") } return beforeBalance, afterBalance, nil } func (o *Wallet) UpdatePaymentPassword(ctx context.Context, userID string, paymentPassword string) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": bson.M{"payment_password": paymentPassword, "update_time": time.Now()}}, true) } func (o *Wallet) UpdateWithdrawAccount(ctx context.Context, userID string, withdrawAccount string) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": bson.M{"withdraw_account": withdrawAccount, "update_time": time.Now()}}, true) } func (o *Wallet) UpdateWithdrawAccountWithType(ctx context.Context, userID string, withdrawAccount string, accountType int32) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": bson.M{"withdraw_account": withdrawAccount, "withdraw_account_type": accountType, "update_time": time.Now()}}, true) } func (o *Wallet) UpdateRealNameAuth(ctx context.Context, userID string, realNameAuth chatdb.RealNameAuth) error { return mongoutil.UpdateOne(ctx, o.coll, bson.M{"user_id": userID}, bson.M{"$set": bson.M{"real_name_auth": realNameAuth, "update_time": time.Now()}}, true) } func (o *Wallet) Delete(ctx context.Context, userIDs []string) error { return mongoutil.DeleteMany(ctx, o.coll, bson.M{"user_id": bson.M{"$in": userIDs}}) } func (o *Wallet) Page(ctx context.Context, pagination pagination.Pagination) (int64, []*chatdb.Wallet, error) { return mongoutil.FindPage[*chatdb.Wallet](ctx, o.coll, bson.M{}, pagination, options.Find().SetSort(bson.D{{Key: "create_time", Value: -1}})) } // PageByRealNameAuthAuditStatus 按实名认证审核状态分页查询钱包 // auditStatus: 0-所有审核状态,1-审核通过,2-审核拒绝 // userID: 用户ID搜索(可选,为空时不过滤) func (o *Wallet) PageByRealNameAuthAuditStatus(ctx context.Context, auditStatus int32, userID string, pagination pagination.Pagination) (int64, []*chatdb.Wallet, error) { filter := bson.M{ "real_name_auth.id_card": bson.M{"$ne": ""}, // 过滤身份证号不为空的(已完成实名认证) "$expr": bson.M{ // 身份证长度至少 6,null/非字符串时按空字符串处理,避免 count 报错 "$gte": []any{ bson.M{"$strLenCP": bson.M{"$ifNull": []any{"$real_name_auth.id_card", ""}}}, 6, }, }, } // 支持按审核状态筛选:0-待审核,1-审核通过,2-审核拒绝;auditStatus < 0 表示不过滤状态 if auditStatus >= 0 { filter["real_name_auth.audit_status"] = auditStatus } // 支持按用户ID搜索 if userID != "" { filter["user_id"] = userID } return mongoutil.FindPage[*chatdb.Wallet](ctx, o.coll, filter, pagination, options.Find().SetSort(bson.D{{Key: "update_time", Value: -1}})) } // SearchByRealNameAuth 按实名认证信息搜索钱包(返回userIDs) // realNameKeyword: 真实姓名搜索关键词(可选) // idCardKeyword: 身份证号搜索关键词(可选) func (o *Wallet) SearchByRealNameAuth(ctx context.Context, realNameKeyword string, idCardKeyword string) ([]string, error) { filter := bson.M{ "real_name_auth.id_card": bson.M{"$ne": ""}, // 过滤身份证号不为空的(已完成实名认证) "$expr": bson.M{ // 身份证长度至少 6,null/非字符串时按空字符串处理,避免 count 报错 "$gte": []any{ bson.M{"$strLenCP": bson.M{"$ifNull": []any{"$real_name_auth.id_card", ""}}}, 6, }, }, } // 构建搜索条件 orConditions := []bson.M{} if realNameKeyword != "" { orConditions = append(orConditions, bson.M{"real_name_auth.name": bson.M{"$regex": realNameKeyword, "$options": "i"}}) } if idCardKeyword != "" { orConditions = append(orConditions, bson.M{"real_name_auth.id_card": bson.M{"$regex": idCardKeyword, "$options": "i"}}) } if len(orConditions) > 0 { filter["$or"] = orConditions } // 只查询 user_id 字段 opts := options.Find().SetProjection(bson.M{"user_id": 1}) wallets, err := mongoutil.Find[*chatdb.Wallet](ctx, o.coll, filter, opts) if err != nil { return nil, err } userIDs := make([]string, 0, len(wallets)) for _, wallet := range wallets { userIDs = append(userIDs, wallet.UserID) } return userIDs, nil }