package kubernetes import ( "context" "fmt" "os" "strings" "sync" "time" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) // addrConn 存储连接和地址信息,用于连接复用 type addrConn struct { conn *grpc.ClientConn addr string reused bool // 标记是否被复用 } type KubernetesConnManager struct { clientset *kubernetes.Clientset namespace string dialOptions []grpc.DialOption rpcTargets map[string]string selfTarget string // watchNames 只监听这些服务的 Endpoints 变化 watchNames []string mu sync.RWMutex connMap map[string][]*addrConn } // NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery. func NewKubernetesConnManager(namespace string, watchNames []string, options ...grpc.DialOption) (*KubernetesConnManager, error) { ctx := context.Background() log.ZInfo(ctx, "K8s Discovery: Initializing connection manager", "namespace", namespace, "watchNames", watchNames) // 获取集群内配置 config, err := rest.InClusterConfig() if err != nil { log.ZError(ctx, "K8s Discovery: Failed to create in-cluster config", err) return nil, fmt.Errorf("failed to create in-cluster config: %v", err) } log.ZDebug(ctx, "K8s Discovery: Successfully created in-cluster config") // 创建 K8s API 客户端 clientset, err := kubernetes.NewForConfig(config) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to create clientset", err) return nil, fmt.Errorf("failed to create clientset: %v", err) } log.ZDebug(ctx, "K8s Discovery: Successfully created clientset") // 初始化连接管理器 k := &KubernetesConnManager{ clientset: clientset, namespace: namespace, dialOptions: options, connMap: make(map[string][]*addrConn), rpcTargets: make(map[string]string), watchNames: watchNames, } // 启动后台 goroutine 监听 Endpoints 变化 log.ZInfo(ctx, "K8s Discovery: Starting Endpoints watcher") go k.watchEndpoints() log.ZInfo(ctx, "K8s Discovery: Connection manager initialized successfully") return k, nil } // parseServiceName 解析服务名,去掉端口信息 // 例如:user-rpc-service:http-10320 -> user-rpc-service func parseServiceName(serviceName string) string { if idx := strings.Index(serviceName, ":"); idx != -1 { return serviceName[:idx] } return serviceName } // initializeConns 初始化指定服务的所有 gRPC 连接(支持连接复用) func (k *KubernetesConnManager) initializeConns(serviceName string) error { ctx := context.Background() log.ZInfo(ctx, "K8s Discovery: Starting to initialize connections", "serviceName", serviceName) // 步骤 1: 获取 Service 的端口 port, err := k.getServicePort(serviceName) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get service port", err, "serviceName", serviceName) return fmt.Errorf("failed to get service port: %w", err) } log.ZDebug(ctx, "K8s Discovery: Got service port", "serviceName", serviceName, "port", port) // 步骤 2: 获取旧连接,建立地址到连接的映射(用于复用) k.mu.Lock() oldList := k.connMap[serviceName] addrMap := make(map[string]*addrConn, len(oldList)) for _, ac := range oldList { addrMap[ac.addr] = ac ac.reused = false // 重置复用标记 } k.mu.Unlock() log.ZDebug(ctx, "K8s Discovery: Old connections snapshot", "serviceName", serviceName, "count", len(oldList)) // 步骤 3: 获取 Service 对应的 Endpoints endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get( ctx, serviceName, metav1.GetOptions{}, ) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get endpoints", err, "serviceName", serviceName) return fmt.Errorf("failed to get endpoints for service %s: %w", serviceName, err) } // 统计 Endpoints 数量 var totalAddresses int for _, subset := range endpoints.Subsets { totalAddresses += len(subset.Addresses) } log.ZDebug(ctx, "K8s Discovery: Found endpoint addresses", "serviceName", serviceName, "count", totalAddresses) // 步骤 4: 为每个 Pod IP 创建或复用 gRPC 连接 var newList []*addrConn var reusedCount, createdCount int for _, subset := range endpoints.Subsets { for _, address := range subset.Addresses { target := fmt.Sprintf("%s:%d", address.IP, port) // 检查是否可以复用旧连接 if ac, ok := addrMap[target]; ok { // 复用旧连接 ac.reused = true newList = append(newList, ac) reusedCount++ log.ZDebug(ctx, "K8s Discovery: Reusing existing connection", "serviceName", serviceName, "target", target) continue } // 创建新连接 log.ZDebug(ctx, "K8s Discovery: Creating new connection", "serviceName", serviceName, "target", target) conn, err := grpc.Dial( target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: 3 * time.Second, PermitWithoutStream: true, }), )..., ) if err != nil { log.ZWarn(ctx, "K8s Discovery: Failed to dial endpoint, skipping", err, "serviceName", serviceName, "target", target) // 跳过无法连接的端点,不终止整个初始化 continue } state := conn.GetState() log.ZDebug(ctx, "K8s Discovery: New connection created", "serviceName", serviceName, "target", target, "state", state.String()) newList = append(newList, &addrConn{conn: conn, addr: target, reused: false}) createdCount++ } } log.ZInfo(ctx, "K8s Discovery: Connection initialization summary", "serviceName", serviceName, "total", len(newList), "reused", reusedCount, "created", createdCount) // 步骤 5: 收集需要关闭的旧连接(未被复用的) var connsToClose []*addrConn for _, ac := range oldList { if !ac.reused { connsToClose = append(connsToClose, ac) } } // 步骤 6: 更新连接映射 k.mu.Lock() k.connMap[serviceName] = newList k.mu.Unlock() log.ZDebug(ctx, "K8s Discovery: Connection map updated", "serviceName", serviceName, "oldCount", len(oldList), "newCount", len(newList), "toClose", len(connsToClose)) // 步骤 7: 延迟关闭未复用的旧连接 if len(connsToClose) > 0 { log.ZInfo(ctx, "K8s Discovery: Scheduling delayed close for unused connections", "serviceName", serviceName, "count", len(connsToClose), "delaySeconds", 5) go func() { time.Sleep(5 * time.Second) log.ZDebug(ctx, "K8s Discovery: Closing unused old connections", "serviceName", serviceName, "count", len(connsToClose)) closedCount := 0 for _, ac := range connsToClose { if err := ac.conn.Close(); err != nil { log.ZError(ctx, "K8s Discovery: Failed to close old connection", err, "serviceName", serviceName, "addr", ac.addr) } else { closedCount++ } } log.ZInfo(ctx, "K8s Discovery: Closed unused connections", "serviceName", serviceName, "closed", closedCount, "total", len(connsToClose)) }() } log.ZInfo(ctx, "K8s Discovery: Connection initialization completed", "serviceName", serviceName) return nil } // GetConns returns gRPC client connections for a given Kubernetes service name. func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { // 解析服务名,去掉端口信息 svcName := parseServiceName(serviceName) log.ZDebug(ctx, "K8s Discovery: GetConns called", "serviceName", serviceName, "parsedName", svcName) // 步骤 1: 第一次检查缓存(读锁) k.mu.RLock() conns, exists := k.connMap[svcName] k.mu.RUnlock() // 步骤 2: 如果缓存中有连接,检查健康状态 if exists && len(conns) > 0 { log.ZDebug(ctx, "K8s Discovery: Found connections in cache, checking health", "serviceName", svcName, "count", len(conns)) // 检查连接健康状态 validConns := k.filterValidConns(ctx, svcName, conns) // 如果还有有效连接,更新缓存并返回 if len(validConns) > 0 { if len(validConns) < len(conns) { log.ZWarn(ctx, "K8s Discovery: Removed invalid connections", nil, "serviceName", svcName, "removed", len(conns)-len(validConns), "remaining", len(validConns)) k.mu.Lock() k.connMap[svcName] = validConns k.mu.Unlock() } else { log.ZDebug(ctx, "K8s Discovery: All connections are healthy", "serviceName", svcName, "count", len(validConns)) } // 转换为接口类型 result := make([]grpc.ClientConnInterface, len(validConns)) for i, ac := range validConns { result[i] = ac.conn } return result, nil } // 如果所有连接都失效,清除缓存并重新初始化 log.ZWarn(ctx, "K8s Discovery: All connections are invalid, reinitializing", nil, "serviceName", svcName) k.mu.Lock() delete(k.connMap, svcName) k.mu.Unlock() } else { log.ZDebug(ctx, "K8s Discovery: No connections in cache, initializing", "serviceName", svcName) } // 步骤 3: 缓存中没有连接或所有连接都失效,重新初始化 k.mu.Lock() conns, exists = k.connMap[svcName] if exists && len(conns) > 0 { log.ZDebug(ctx, "K8s Discovery: Connections were initialized by another goroutine", "serviceName", svcName) k.mu.Unlock() result := make([]grpc.ClientConnInterface, len(conns)) for i, ac := range conns { result[i] = ac.conn } return result, nil } k.mu.Unlock() // 初始化新连接 log.ZDebug(ctx, "K8s Discovery: Initializing new connections", "serviceName", svcName) if err := k.initializeConns(svcName); err != nil { log.ZError(ctx, "K8s Discovery: Failed to initialize connections", err, "serviceName", svcName) return nil, fmt.Errorf("failed to initialize connections for service %s: %w", svcName, err) } // 返回新初始化的连接 k.mu.RLock() conns = k.connMap[svcName] k.mu.RUnlock() log.ZDebug(ctx, "K8s Discovery: Returning connections", "serviceName", svcName, "count", len(conns)) result := make([]grpc.ClientConnInterface, len(conns)) for i, ac := range conns { result[i] = ac.conn } return result, nil } // filterValidConns 过滤出有效的连接 func (k *KubernetesConnManager) filterValidConns(ctx context.Context, serviceName string, conns []*addrConn) []*addrConn { validConns := make([]*addrConn, 0, len(conns)) invalidStates := make(map[string]int) for _, ac := range conns { state := ac.conn.GetState() // 只保留 Ready 和 Idle 状态的连接 if state == connectivity.Ready || state == connectivity.Idle { validConns = append(validConns, ac) } else { invalidStates[state.String()]++ log.ZDebug(ctx, "K8s Discovery: Connection is invalid, closing", "serviceName", serviceName, "addr", ac.addr, "state", state.String()) if err := ac.conn.Close(); err != nil { log.ZError(ctx, "K8s Discovery: Failed to close invalid connection", err, "serviceName", serviceName, "addr", ac.addr) } } } if len(invalidStates) > 0 { log.ZWarn(ctx, "K8s Discovery: Found invalid connections", nil, "serviceName", serviceName, "invalidStates", invalidStates) } return validConns } // GetConn returns a single gRPC client connection for a given Kubernetes service name. // 重要:GetConn 使用 DNS,避免连接被强制关闭 func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { // 解析服务名,去掉端口信息 svcName := parseServiceName(serviceName) log.ZDebug(ctx, "K8s Discovery: GetConn called (using DNS)", "serviceName", serviceName, "parsedName", svcName) var target string // 检查是否有自定义目标 if k.rpcTargets[svcName] == "" { // 获取 Service 端口 svcPort, err := k.getServicePort(svcName) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get service port", err, "serviceName", svcName) return nil, err } // 构建 K8s DNS 名称 target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName, k.namespace, svcPort) log.ZDebug(ctx, "K8s Discovery: Using DNS target", "serviceName", svcName, "target", target) } else { target = k.rpcTargets[svcName] log.ZDebug(ctx, "K8s Discovery: Using custom target", "serviceName", svcName, "target", target) } // 创建 gRPC 连接 log.ZDebug(ctx, "K8s Discovery: Dialing DNS target", "serviceName", svcName, "target", target) conn, err := grpc.DialContext( ctx, target, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: 3 * time.Second, PermitWithoutStream: true, }), }, k.dialOptions...)..., ) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to dial DNS target", err, "serviceName", svcName, "target", target) return nil, err } log.ZDebug(ctx, "K8s Discovery: GetConn completed successfully", "serviceName", svcName) return conn, nil } // IsSelfNode checks if the given connection is to the current node func (k *KubernetesConnManager) IsSelfNode(cc grpc.ClientConnInterface) bool { return false } // watchEndpoints 监听 Endpoints 资源变化 func (k *KubernetesConnManager) watchEndpoints() { ctx := context.Background() log.ZInfo(ctx, "K8s Discovery: Starting Endpoints watcher") informerFactory := informers.NewSharedInformerFactoryWithOptions(k.clientset, time.Minute*10, informers.WithNamespace(k.namespace)) informer := informerFactory.Core().V1().Endpoints().Informer() log.ZDebug(ctx, "K8s Discovery: Endpoints Informer created") informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { k.handleEndpointChange(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { oldEndpoint := oldObj.(*v1.Endpoints) newEndpoint := newObj.(*v1.Endpoints) if k.endpointsChanged(oldEndpoint, newEndpoint) { k.handleEndpointChange(newObj) } }, DeleteFunc: func(obj interface{}) { k.handleEndpointChange(obj) }, }) log.ZDebug(ctx, "K8s Discovery: Starting Informer factory") informerFactory.Start(ctx.Done()) log.ZDebug(ctx, "K8s Discovery: Waiting for Informer cache to sync") if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { log.ZError(ctx, "K8s Discovery: Failed to sync Informer cache", nil) return } log.ZInfo(ctx, "K8s Discovery: Informer cache synced successfully") log.ZInfo(ctx, "K8s Discovery: Endpoints watcher is running") <-ctx.Done() log.ZInfo(ctx, "K8s Discovery: Endpoints watcher stopped") } // endpointsChanged 检查 Endpoints 是否有实际变化 func (k *KubernetesConnManager) endpointsChanged(old, new *v1.Endpoints) bool { oldAddresses := make(map[string]bool) for _, subset := range old.Subsets { for _, address := range subset.Addresses { oldAddresses[address.IP] = true } } newAddresses := make(map[string]bool) for _, subset := range new.Subsets { for _, address := range subset.Addresses { newAddresses[address.IP] = true } } if len(oldAddresses) != len(newAddresses) { return true } for ip := range oldAddresses { if !newAddresses[ip] { return true } } return false } // handleEndpointChange 处理 Endpoints 资源变化 func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) { ctx := context.Background() endpoint, ok := obj.(*v1.Endpoints) if !ok { log.ZError(ctx, "K8s Discovery: Expected *v1.Endpoints", nil, "actualType", fmt.Sprintf("%T", obj)) return } serviceName := endpoint.Name // 只处理 watchNames 中的服务 if len(k.watchNames) > 0 && !datautil.Contain(serviceName, k.watchNames...) { log.ZDebug(ctx, "K8s Discovery: Ignoring Endpoints change (not in watchNames)", "serviceName", serviceName) return } log.ZInfo(ctx, "K8s Discovery: Handling Endpoints change", "serviceName", serviceName) var totalAddresses int for _, subset := range endpoint.Subsets { totalAddresses += len(subset.Addresses) } log.ZDebug(ctx, "K8s Discovery: Endpoint addresses count", "serviceName", serviceName, "count", totalAddresses) if err := k.initializeConns(serviceName); err != nil { log.ZError(ctx, "K8s Discovery: Failed to initialize connections", err, "serviceName", serviceName) } else { log.ZInfo(ctx, "K8s Discovery: Successfully updated connections", "serviceName", serviceName) } } // getServicePort 获取 Service 的 RPC 端口 func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) { ctx := context.Background() log.ZDebug(ctx, "K8s Discovery: Getting service port", "serviceName", serviceName) svc, err := k.clientset.CoreV1().Services(k.namespace).Get( ctx, serviceName, metav1.GetOptions{}, ) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get service", err, "serviceName", serviceName, "namespace", k.namespace) return 0, fmt.Errorf("failed to get service %s: %w", serviceName, err) } if len(svc.Spec.Ports) == 0 { log.ZError(ctx, "K8s Discovery: Service has no ports defined", nil, "serviceName", serviceName) return 0, fmt.Errorf("service %s has no ports defined", serviceName) } var svcPort int32 for _, port := range svc.Spec.Ports { if port.Port != 10001 { svcPort = port.Port break } } if svcPort == 0 { log.ZError(ctx, "K8s Discovery: Service has no RPC port", nil, "serviceName", serviceName) return 0, fmt.Errorf("service %s has no RPC port (all ports are 10001)", serviceName) } log.ZDebug(ctx, "K8s Discovery: Got service port", "serviceName", serviceName, "port", svcPort) return svcPort, nil } // Close 关闭所有连接 func (k *KubernetesConnManager) Close() { ctx := context.Background() log.ZInfo(ctx, "K8s Discovery: Closing all connections") k.mu.Lock() defer k.mu.Unlock() totalConns := 0 for serviceName, conns := range k.connMap { log.ZDebug(ctx, "K8s Discovery: Closing connections for service", "serviceName", serviceName, "count", len(conns)) for _, ac := range conns { if err := ac.conn.Close(); err != nil { log.ZError(ctx, "K8s Discovery: Failed to close connection", err, "serviceName", serviceName, "addr", ac.addr) } } totalConns += len(conns) } log.ZInfo(ctx, "K8s Discovery: Closed all connections", "totalCount", totalConns) k.connMap = make(map[string][]*addrConn) } // GetSelfConnTarget returns the connection target for the current service. func (k *KubernetesConnManager) GetSelfConnTarget() string { ctx := context.Background() if k.selfTarget == "" { hostName := os.Getenv("HOSTNAME") log.ZDebug(ctx, "K8s Discovery: Getting self connection target", "hostname", hostName) pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(ctx, hostName, metav1.GetOptions{}) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get pod", err, "hostname", hostName) } for pod.Status.PodIP == "" { log.ZDebug(ctx, "K8s Discovery: Waiting for pod IP to be assigned", "hostname", hostName) pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{}) if err != nil { log.ZError(ctx, "K8s Discovery: Failed to get pod", err) } time.Sleep(3 * time.Second) } var selfPort int32 for _, port := range pod.Spec.Containers[0].Ports { if port.ContainerPort != 10001 { selfPort = port.ContainerPort break } } k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort) log.ZInfo(ctx, "K8s Discovery: Self connection target", "target", k.selfTarget) } return k.selfTarget } // AddOption appends gRPC dial options to the existing options. func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) { k.mu.Lock() defer k.mu.Unlock() k.dialOptions = append(k.dialOptions, opts...) log.ZDebug(context.Background(), "K8s Discovery: Added dial options", "count", len(opts)) } // CloseConn closes a given gRPC client connection. func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) { log.ZDebug(context.Background(), "K8s Discovery: Closing single connection") conn.Close() } func (k *KubernetesConnManager) Register(ctx context.Context, serviceName, host string, port int, opts ...grpc.DialOption) error { // K8s 环境下不需要注册,返回 nil return nil } func (k *KubernetesConnManager) UnRegister() error { // K8s 环境下不需要注销,返回 nil return nil } func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { // K8s 环境下不支持,返回空 return "", nil } // KeyValue interface methods - K8s环境下不支持,返回 discovery.ErrNotSupported // 这样调用方可以通过 errors.Is(err, discovery.ErrNotSupported) 来判断并忽略 func (k *KubernetesConnManager) SetKey(ctx context.Context, key string, value []byte) error { return discovery.ErrNotSupported } func (k *KubernetesConnManager) SetWithLease(ctx context.Context, key string, val []byte, ttl int64) error { return discovery.ErrNotSupported } func (k *KubernetesConnManager) GetKey(ctx context.Context, key string) ([]byte, error) { return nil, discovery.ErrNotSupported } func (k *KubernetesConnManager) GetKeyWithPrefix(ctx context.Context, key string) ([][]byte, error) { return nil, discovery.ErrNotSupported } func (k *KubernetesConnManager) WatchKey(ctx context.Context, key string, fn discovery.WatchKeyHandler) error { return discovery.ErrNotSupported }