Files
kim.dev.6789 e50142a3b9 复制项目
2026-01-14 22:16:44 +08:00

641 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package kubernetes
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// addrConn 存储连接和地址信息,用于连接复用
type addrConn struct {
conn *grpc.ClientConn
addr string
reused bool // 标记是否被复用
}
type KubernetesConnManager struct {
clientset *kubernetes.Clientset
namespace string
dialOptions []grpc.DialOption
rpcTargets map[string]string
selfTarget string
// watchNames 只监听这些服务的 Endpoints 变化
watchNames []string
mu sync.RWMutex
connMap map[string][]*addrConn
}
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
func NewKubernetesConnManager(namespace string, watchNames []string, options ...grpc.DialOption) (*KubernetesConnManager, error) {
ctx := context.Background()
log.ZInfo(ctx, "K8s Discovery: Initializing connection manager", "namespace", namespace, "watchNames", watchNames)
// 获取集群内配置
config, err := rest.InClusterConfig()
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to create in-cluster config", err)
return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
}
log.ZDebug(ctx, "K8s Discovery: Successfully created in-cluster config")
// 创建 K8s API 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to create clientset", err)
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
log.ZDebug(ctx, "K8s Discovery: Successfully created clientset")
// 初始化连接管理器
k := &KubernetesConnManager{
clientset: clientset,
namespace: namespace,
dialOptions: options,
connMap: make(map[string][]*addrConn),
rpcTargets: make(map[string]string),
watchNames: watchNames,
}
// 启动后台 goroutine 监听 Endpoints 变化
log.ZInfo(ctx, "K8s Discovery: Starting Endpoints watcher")
go k.watchEndpoints()
log.ZInfo(ctx, "K8s Discovery: Connection manager initialized successfully")
return k, nil
}
// parseServiceName 解析服务名,去掉端口信息
// 例如user-rpc-service:http-10320 -> user-rpc-service
func parseServiceName(serviceName string) string {
if idx := strings.Index(serviceName, ":"); idx != -1 {
return serviceName[:idx]
}
return serviceName
}
// initializeConns 初始化指定服务的所有 gRPC 连接(支持连接复用)
func (k *KubernetesConnManager) initializeConns(serviceName string) error {
ctx := context.Background()
log.ZInfo(ctx, "K8s Discovery: Starting to initialize connections", "serviceName", serviceName)
// 步骤 1: 获取 Service 的端口
port, err := k.getServicePort(serviceName)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get service port", err, "serviceName", serviceName)
return fmt.Errorf("failed to get service port: %w", err)
}
log.ZDebug(ctx, "K8s Discovery: Got service port", "serviceName", serviceName, "port", port)
// 步骤 2: 获取旧连接,建立地址到连接的映射(用于复用)
k.mu.Lock()
oldList := k.connMap[serviceName]
addrMap := make(map[string]*addrConn, len(oldList))
for _, ac := range oldList {
addrMap[ac.addr] = ac
ac.reused = false // 重置复用标记
}
k.mu.Unlock()
log.ZDebug(ctx, "K8s Discovery: Old connections snapshot", "serviceName", serviceName, "count", len(oldList))
// 步骤 3: 获取 Service 对应的 Endpoints
endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(
ctx,
serviceName,
metav1.GetOptions{},
)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get endpoints", err, "serviceName", serviceName)
return fmt.Errorf("failed to get endpoints for service %s: %w", serviceName, err)
}
// 统计 Endpoints 数量
var totalAddresses int
for _, subset := range endpoints.Subsets {
totalAddresses += len(subset.Addresses)
}
log.ZDebug(ctx, "K8s Discovery: Found endpoint addresses", "serviceName", serviceName, "count", totalAddresses)
// 步骤 4: 为每个 Pod IP 创建或复用 gRPC 连接
var newList []*addrConn
var reusedCount, createdCount int
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
// 检查是否可以复用旧连接
if ac, ok := addrMap[target]; ok {
// 复用旧连接
ac.reused = true
newList = append(newList, ac)
reusedCount++
log.ZDebug(ctx, "K8s Discovery: Reusing existing connection", "serviceName", serviceName, "target", target)
continue
}
// 创建新连接
log.ZDebug(ctx, "K8s Discovery: Creating new connection", "serviceName", serviceName, "target", target)
conn, err := grpc.Dial(
target,
append(k.dialOptions,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)...,
)
if err != nil {
log.ZWarn(ctx, "K8s Discovery: Failed to dial endpoint, skipping", err, "serviceName", serviceName, "target", target)
// 跳过无法连接的端点,不终止整个初始化
continue
}
state := conn.GetState()
log.ZDebug(ctx, "K8s Discovery: New connection created", "serviceName", serviceName, "target", target, "state", state.String())
newList = append(newList, &addrConn{conn: conn, addr: target, reused: false})
createdCount++
}
}
log.ZInfo(ctx, "K8s Discovery: Connection initialization summary", "serviceName", serviceName,
"total", len(newList), "reused", reusedCount, "created", createdCount)
// 步骤 5: 收集需要关闭的旧连接(未被复用的)
var connsToClose []*addrConn
for _, ac := range oldList {
if !ac.reused {
connsToClose = append(connsToClose, ac)
}
}
// 步骤 6: 更新连接映射
k.mu.Lock()
k.connMap[serviceName] = newList
k.mu.Unlock()
log.ZDebug(ctx, "K8s Discovery: Connection map updated", "serviceName", serviceName,
"oldCount", len(oldList), "newCount", len(newList), "toClose", len(connsToClose))
// 步骤 7: 延迟关闭未复用的旧连接
if len(connsToClose) > 0 {
log.ZInfo(ctx, "K8s Discovery: Scheduling delayed close for unused connections", "serviceName", serviceName, "count", len(connsToClose), "delaySeconds", 5)
go func() {
time.Sleep(5 * time.Second)
log.ZDebug(ctx, "K8s Discovery: Closing unused old connections", "serviceName", serviceName, "count", len(connsToClose))
closedCount := 0
for _, ac := range connsToClose {
if err := ac.conn.Close(); err != nil {
log.ZError(ctx, "K8s Discovery: Failed to close old connection", err, "serviceName", serviceName, "addr", ac.addr)
} else {
closedCount++
}
}
log.ZInfo(ctx, "K8s Discovery: Closed unused connections", "serviceName", serviceName, "closed", closedCount, "total", len(connsToClose))
}()
}
log.ZInfo(ctx, "K8s Discovery: Connection initialization completed", "serviceName", serviceName)
return nil
}
// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
// 解析服务名,去掉端口信息
svcName := parseServiceName(serviceName)
log.ZDebug(ctx, "K8s Discovery: GetConns called", "serviceName", serviceName, "parsedName", svcName)
// 步骤 1: 第一次检查缓存(读锁)
k.mu.RLock()
conns, exists := k.connMap[svcName]
k.mu.RUnlock()
// 步骤 2: 如果缓存中有连接,检查健康状态
if exists && len(conns) > 0 {
log.ZDebug(ctx, "K8s Discovery: Found connections in cache, checking health", "serviceName", svcName, "count", len(conns))
// 检查连接健康状态
validConns := k.filterValidConns(ctx, svcName, conns)
// 如果还有有效连接,更新缓存并返回
if len(validConns) > 0 {
if len(validConns) < len(conns) {
log.ZWarn(ctx, "K8s Discovery: Removed invalid connections", nil, "serviceName", svcName, "removed", len(conns)-len(validConns), "remaining", len(validConns))
k.mu.Lock()
k.connMap[svcName] = validConns
k.mu.Unlock()
} else {
log.ZDebug(ctx, "K8s Discovery: All connections are healthy", "serviceName", svcName, "count", len(validConns))
}
// 转换为接口类型
result := make([]grpc.ClientConnInterface, len(validConns))
for i, ac := range validConns {
result[i] = ac.conn
}
return result, nil
}
// 如果所有连接都失效,清除缓存并重新初始化
log.ZWarn(ctx, "K8s Discovery: All connections are invalid, reinitializing", nil, "serviceName", svcName)
k.mu.Lock()
delete(k.connMap, svcName)
k.mu.Unlock()
} else {
log.ZDebug(ctx, "K8s Discovery: No connections in cache, initializing", "serviceName", svcName)
}
// 步骤 3: 缓存中没有连接或所有连接都失效,重新初始化
k.mu.Lock()
conns, exists = k.connMap[svcName]
if exists && len(conns) > 0 {
log.ZDebug(ctx, "K8s Discovery: Connections were initialized by another goroutine", "serviceName", svcName)
k.mu.Unlock()
result := make([]grpc.ClientConnInterface, len(conns))
for i, ac := range conns {
result[i] = ac.conn
}
return result, nil
}
k.mu.Unlock()
// 初始化新连接
log.ZDebug(ctx, "K8s Discovery: Initializing new connections", "serviceName", svcName)
if err := k.initializeConns(svcName); err != nil {
log.ZError(ctx, "K8s Discovery: Failed to initialize connections", err, "serviceName", svcName)
return nil, fmt.Errorf("failed to initialize connections for service %s: %w", svcName, err)
}
// 返回新初始化的连接
k.mu.RLock()
conns = k.connMap[svcName]
k.mu.RUnlock()
log.ZDebug(ctx, "K8s Discovery: Returning connections", "serviceName", svcName, "count", len(conns))
result := make([]grpc.ClientConnInterface, len(conns))
for i, ac := range conns {
result[i] = ac.conn
}
return result, nil
}
// filterValidConns 过滤出有效的连接
func (k *KubernetesConnManager) filterValidConns(ctx context.Context, serviceName string, conns []*addrConn) []*addrConn {
validConns := make([]*addrConn, 0, len(conns))
invalidStates := make(map[string]int)
for _, ac := range conns {
state := ac.conn.GetState()
// 只保留 Ready 和 Idle 状态的连接
if state == connectivity.Ready || state == connectivity.Idle {
validConns = append(validConns, ac)
} else {
invalidStates[state.String()]++
log.ZDebug(ctx, "K8s Discovery: Connection is invalid, closing", "serviceName", serviceName, "addr", ac.addr, "state", state.String())
if err := ac.conn.Close(); err != nil {
log.ZError(ctx, "K8s Discovery: Failed to close invalid connection", err, "serviceName", serviceName, "addr", ac.addr)
}
}
}
if len(invalidStates) > 0 {
log.ZWarn(ctx, "K8s Discovery: Found invalid connections", nil, "serviceName", serviceName, "invalidStates", invalidStates)
}
return validConns
}
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
// 重要GetConn 使用 DNS避免连接被强制关闭
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
// 解析服务名,去掉端口信息
svcName := parseServiceName(serviceName)
log.ZDebug(ctx, "K8s Discovery: GetConn called (using DNS)", "serviceName", serviceName, "parsedName", svcName)
var target string
// 检查是否有自定义目标
if k.rpcTargets[svcName] == "" {
// 获取 Service 端口
svcPort, err := k.getServicePort(svcName)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get service port", err, "serviceName", svcName)
return nil, err
}
// 构建 K8s DNS 名称
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName, k.namespace, svcPort)
log.ZDebug(ctx, "K8s Discovery: Using DNS target", "serviceName", svcName, "target", target)
} else {
target = k.rpcTargets[svcName]
log.ZDebug(ctx, "K8s Discovery: Using custom target", "serviceName", svcName, "target", target)
}
// 创建 gRPC 连接
log.ZDebug(ctx, "K8s Discovery: Dialing DNS target", "serviceName", svcName, "target", target)
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*10),
grpc.MaxCallSendMsgSize(1024*1024*20),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
}, k.dialOptions...)...,
)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to dial DNS target", err, "serviceName", svcName, "target", target)
return nil, err
}
log.ZDebug(ctx, "K8s Discovery: GetConn completed successfully", "serviceName", svcName)
return conn, nil
}
// IsSelfNode checks if the given connection is to the current node
func (k *KubernetesConnManager) IsSelfNode(cc grpc.ClientConnInterface) bool {
return false
}
// watchEndpoints 监听 Endpoints 资源变化
func (k *KubernetesConnManager) watchEndpoints() {
ctx := context.Background()
log.ZInfo(ctx, "K8s Discovery: Starting Endpoints watcher")
informerFactory := informers.NewSharedInformerFactoryWithOptions(k.clientset, time.Minute*10,
informers.WithNamespace(k.namespace))
informer := informerFactory.Core().V1().Endpoints().Informer()
log.ZDebug(ctx, "K8s Discovery: Endpoints Informer created")
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldEndpoint := oldObj.(*v1.Endpoints)
newEndpoint := newObj.(*v1.Endpoints)
if k.endpointsChanged(oldEndpoint, newEndpoint) {
k.handleEndpointChange(newObj)
}
},
DeleteFunc: func(obj interface{}) {
k.handleEndpointChange(obj)
},
})
log.ZDebug(ctx, "K8s Discovery: Starting Informer factory")
informerFactory.Start(ctx.Done())
log.ZDebug(ctx, "K8s Discovery: Waiting for Informer cache to sync")
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
log.ZError(ctx, "K8s Discovery: Failed to sync Informer cache", nil)
return
}
log.ZInfo(ctx, "K8s Discovery: Informer cache synced successfully")
log.ZInfo(ctx, "K8s Discovery: Endpoints watcher is running")
<-ctx.Done()
log.ZInfo(ctx, "K8s Discovery: Endpoints watcher stopped")
}
// endpointsChanged 检查 Endpoints 是否有实际变化
func (k *KubernetesConnManager) endpointsChanged(old, new *v1.Endpoints) bool {
oldAddresses := make(map[string]bool)
for _, subset := range old.Subsets {
for _, address := range subset.Addresses {
oldAddresses[address.IP] = true
}
}
newAddresses := make(map[string]bool)
for _, subset := range new.Subsets {
for _, address := range subset.Addresses {
newAddresses[address.IP] = true
}
}
if len(oldAddresses) != len(newAddresses) {
return true
}
for ip := range oldAddresses {
if !newAddresses[ip] {
return true
}
}
return false
}
// handleEndpointChange 处理 Endpoints 资源变化
func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
ctx := context.Background()
endpoint, ok := obj.(*v1.Endpoints)
if !ok {
log.ZError(ctx, "K8s Discovery: Expected *v1.Endpoints", nil, "actualType", fmt.Sprintf("%T", obj))
return
}
serviceName := endpoint.Name
// 只处理 watchNames 中的服务
if len(k.watchNames) > 0 && !datautil.Contain(serviceName, k.watchNames...) {
log.ZDebug(ctx, "K8s Discovery: Ignoring Endpoints change (not in watchNames)", "serviceName", serviceName)
return
}
log.ZInfo(ctx, "K8s Discovery: Handling Endpoints change", "serviceName", serviceName)
var totalAddresses int
for _, subset := range endpoint.Subsets {
totalAddresses += len(subset.Addresses)
}
log.ZDebug(ctx, "K8s Discovery: Endpoint addresses count", "serviceName", serviceName, "count", totalAddresses)
if err := k.initializeConns(serviceName); err != nil {
log.ZError(ctx, "K8s Discovery: Failed to initialize connections", err, "serviceName", serviceName)
} else {
log.ZInfo(ctx, "K8s Discovery: Successfully updated connections", "serviceName", serviceName)
}
}
// getServicePort 获取 Service 的 RPC 端口
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
ctx := context.Background()
log.ZDebug(ctx, "K8s Discovery: Getting service port", "serviceName", serviceName)
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(
ctx,
serviceName,
metav1.GetOptions{},
)
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get service", err, "serviceName", serviceName, "namespace", k.namespace)
return 0, fmt.Errorf("failed to get service %s: %w", serviceName, err)
}
if len(svc.Spec.Ports) == 0 {
log.ZError(ctx, "K8s Discovery: Service has no ports defined", nil, "serviceName", serviceName)
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}
var svcPort int32
for _, port := range svc.Spec.Ports {
if port.Port != 10001 {
svcPort = port.Port
break
}
}
if svcPort == 0 {
log.ZError(ctx, "K8s Discovery: Service has no RPC port", nil, "serviceName", serviceName)
return 0, fmt.Errorf("service %s has no RPC port (all ports are 10001)", serviceName)
}
log.ZDebug(ctx, "K8s Discovery: Got service port", "serviceName", serviceName, "port", svcPort)
return svcPort, nil
}
// Close 关闭所有连接
func (k *KubernetesConnManager) Close() {
ctx := context.Background()
log.ZInfo(ctx, "K8s Discovery: Closing all connections")
k.mu.Lock()
defer k.mu.Unlock()
totalConns := 0
for serviceName, conns := range k.connMap {
log.ZDebug(ctx, "K8s Discovery: Closing connections for service", "serviceName", serviceName, "count", len(conns))
for _, ac := range conns {
if err := ac.conn.Close(); err != nil {
log.ZError(ctx, "K8s Discovery: Failed to close connection", err, "serviceName", serviceName, "addr", ac.addr)
}
}
totalConns += len(conns)
}
log.ZInfo(ctx, "K8s Discovery: Closed all connections", "totalCount", totalConns)
k.connMap = make(map[string][]*addrConn)
}
// GetSelfConnTarget returns the connection target for the current service.
func (k *KubernetesConnManager) GetSelfConnTarget() string {
ctx := context.Background()
if k.selfTarget == "" {
hostName := os.Getenv("HOSTNAME")
log.ZDebug(ctx, "K8s Discovery: Getting self connection target", "hostname", hostName)
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(ctx, hostName, metav1.GetOptions{})
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get pod", err, "hostname", hostName)
}
for pod.Status.PodIP == "" {
log.ZDebug(ctx, "K8s Discovery: Waiting for pod IP to be assigned", "hostname", hostName)
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
if err != nil {
log.ZError(ctx, "K8s Discovery: Failed to get pod", err)
}
time.Sleep(3 * time.Second)
}
var selfPort int32
for _, port := range pod.Spec.Containers[0].Ports {
if port.ContainerPort != 10001 {
selfPort = port.ContainerPort
break
}
}
k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
log.ZInfo(ctx, "K8s Discovery: Self connection target", "target", k.selfTarget)
}
return k.selfTarget
}
// AddOption appends gRPC dial options to the existing options.
func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
k.mu.Lock()
defer k.mu.Unlock()
k.dialOptions = append(k.dialOptions, opts...)
log.ZDebug(context.Background(), "K8s Discovery: Added dial options", "count", len(opts))
}
// CloseConn closes a given gRPC client connection.
func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
log.ZDebug(context.Background(), "K8s Discovery: Closing single connection")
conn.Close()
}
func (k *KubernetesConnManager) Register(ctx context.Context, serviceName, host string, port int, opts ...grpc.DialOption) error {
// K8s 环境下不需要注册,返回 nil
return nil
}
func (k *KubernetesConnManager) UnRegister() error {
// K8s 环境下不需要注销,返回 nil
return nil
}
func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
// K8s 环境下不支持,返回空
return "", nil
}
// KeyValue interface methods - K8s环境下不支持返回 discovery.ErrNotSupported
// 这样调用方可以通过 errors.Is(err, discovery.ErrNotSupported) 来判断并忽略
func (k *KubernetesConnManager) SetKey(ctx context.Context, key string, value []byte) error {
return discovery.ErrNotSupported
}
func (k *KubernetesConnManager) SetWithLease(ctx context.Context, key string, val []byte, ttl int64) error {
return discovery.ErrNotSupported
}
func (k *KubernetesConnManager) GetKey(ctx context.Context, key string) ([]byte, error) {
return nil, discovery.ErrNotSupported
}
func (k *KubernetesConnManager) GetKeyWithPrefix(ctx context.Context, key string) ([][]byte, error) {
return nil, discovery.ErrNotSupported
}
func (k *KubernetesConnManager) WatchKey(ctx context.Context, key string, fn discovery.WatchKeyHandler) error {
return discovery.ErrNotSupported
}