Files
open-im-server-deploy/docs/K8s服务发现Bug最终修复方案.md
kim.dev.6789 e50142a3b9 复制项目
2026-01-14 22:16:44 +08:00

926 lines
30 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# K8s 服务发现 Bug 最终修复方案(含调试日志)
## 目录
- [1. 修复方案概述](#1-修复方案概述)
- [2. 完整修复代码](#2-完整修复代码)
- [3. 调试日志说明](#3-调试日志说明)
- [4. 测试验证](#4-测试验证)
- [5. 问题排查指南](#5-问题排查指南)
---
## 1. 修复方案概述
### 1.1 修复内容
基于历史修复尝试的教训,本次修复包含以下内容:
1.**修复监听资源类型**:从 Pod 改为 Endpoints
2.**GetConn 使用 DNS**:避免连接被强制关闭(关键!)
3.**GetConns 使用 Endpoints**:支持负载均衡和自动更新
4.**延迟关闭旧连接**:避免正在进行的请求失败
5.**添加健康检查**:确保连接有效性
6.**添加 KeepAlive**:支持自动重连
7.**添加详细调试日志**:方便问题排查
### 1.2 核心原则
- **GetConn → DNS**:避免连接被强制关闭,导致消息同步和推送失败
- **GetConns → Endpoints**:支持负载均衡和自动更新
- **延迟关闭**:给正在进行的请求时间完成
- **详细日志**:记录关键操作,方便调试
---
## 2. 完整修复代码
### 2.1 修复后的完整文件
```go
package kubernetes
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type KubernetesConnManager struct {
clientset *kubernetes.Clientset
namespace string
dialOptions []grpc.DialOption
rpcTargets map[string]string
selfTarget string
mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
}
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
func NewKubernetesConnManager(namespace string, options ...grpc.DialOption) (*KubernetesConnManager, error) {
log.Printf("[K8s Discovery] Initializing Kubernetes connection manager, namespace: %s", namespace)
// 获取集群内配置
config, err := rest.InClusterConfig()
if err != nil {
log.Printf("[K8s Discovery] ERROR: Failed to create in-cluster config: %v", err)
return nil, fmt.Errorf("failed to create in-cluster config: %v", err)
}
log.Printf("[K8s Discovery] Successfully created in-cluster config")
// 创建 K8s API 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Printf("[K8s Discovery] ERROR: Failed to create clientset: %v", err)
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
log.Printf("[K8s Discovery] Successfully created clientset")
// 初始化连接管理器
k := &KubernetesConnManager{
clientset: clientset,
namespace: namespace,
dialOptions: options,
connMap: make(map[string][]*grpc.ClientConn),
rpcTargets: make(map[string]string),
}
// 启动后台 goroutine 监听 Endpoints 变化
log.Printf("[K8s Discovery] Starting Endpoints watcher")
go k.watchEndpoints()
log.Printf("[K8s Discovery] Kubernetes connection manager initialized successfully")
return k, nil
}
// initializeConns 初始化指定服务的所有 gRPC 连接
func (k *KubernetesConnManager) initializeConns(serviceName string) error {
log.Printf("[K8s Discovery] [%s] Starting to initialize connections", serviceName)
// 步骤 1: 获取 Service 的端口
port, err := k.getServicePort(serviceName)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to get service port: %v", serviceName, err)
return fmt.Errorf("failed to get service port: %w", err)
}
log.Printf("[K8s Discovery] [%s] Service port: %d", serviceName, port)
// 步骤 2: 获取 Service 对应的 Endpoints
endpoints, err := k.clientset.CoreV1().Endpoints(k.namespace).Get(
context.Background(),
serviceName,
metav1.GetOptions{},
)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to get endpoints: %v", serviceName, err)
return fmt.Errorf("failed to get endpoints for service %s: %w", serviceName, err)
}
// 统计 Endpoints 数量
var totalAddresses int
for _, subset := range endpoints.Subsets {
totalAddresses += len(subset.Addresses)
}
log.Printf("[K8s Discovery] [%s] Found %d endpoint addresses", serviceName, totalAddresses)
// 步骤 3: 为每个 Pod IP 创建 gRPC 连接
var newConns []*grpc.ClientConn
var newTargets []string
var failedTargets []string
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
target := fmt.Sprintf("%s:%d", address.IP, port)
log.Printf("[K8s Discovery] [%s] Creating connection to %s", serviceName, target)
// 创建 gRPC 连接,配置 KeepAlive
conn, err := grpc.Dial(
target,
append(k.dialOptions,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)...,
)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to dial %s: %v", serviceName, target, err)
failedTargets = append(failedTargets, target)
// 如果连接失败,关闭已创建的连接并返回错误
for _, c := range newConns {
_ = c.Close()
}
return fmt.Errorf("failed to dial endpoint %s: %w", target, err)
}
// 检查连接状态
state := conn.GetState()
log.Printf("[K8s Discovery] [%s] Connection to %s created, state: %v", serviceName, target, state)
newConns = append(newConns, conn)
newTargets = append(newTargets, target)
}
}
if len(failedTargets) > 0 {
log.Printf("[K8s Discovery] [%s] WARNING: Failed to connect to %d targets: %v", serviceName, len(failedTargets), failedTargets)
}
log.Printf("[K8s Discovery] [%s] Successfully created %d connections", serviceName, len(newConns))
// 步骤 4: 获取旧连接并延迟关闭
k.mu.Lock()
oldConns, exists := k.connMap[serviceName]
var oldConnCount int
if exists {
oldConnCount = len(oldConns)
log.Printf("[K8s Discovery] [%s] Found %d old connections to close", serviceName, oldConnCount)
}
// 步骤 5: 立即替换为新连接,让新请求使用新连接
k.connMap[serviceName] = newConns
k.mu.Unlock()
log.Printf("[K8s Discovery] [%s] Connection map updated: %d old -> %d new", serviceName, oldConnCount, len(newConns))
// 步骤 6: 延迟关闭旧连接,给正在进行的请求时间完成
if exists && len(oldConns) > 0 {
log.Printf("[K8s Discovery] [%s] Scheduling delayed close for %d old connections (5 seconds delay)", serviceName, len(oldConns))
go func() {
// 等待 5 秒,让正在进行的请求完成
time.Sleep(5 * time.Second)
log.Printf("[K8s Discovery] [%s] Closing %d old connections", serviceName, len(oldConns))
closedCount := 0
for _, oldConn := range oldConns {
if err := oldConn.Close(); err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to close old connection: %v", serviceName, err)
} else {
closedCount++
}
}
log.Printf("[K8s Discovery] [%s] Closed %d/%d old connections", serviceName, closedCount, len(oldConns))
}()
}
log.Printf("[K8s Discovery] [%s] Connection initialization completed successfully", serviceName)
return nil
}
// GetConns returns gRPC client connections for a given Kubernetes service name.
func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
log.Printf("[K8s Discovery] [%s] GetConns called", serviceName)
// 步骤 1: 第一次检查缓存(读锁)
k.mu.RLock()
conns, exists := k.connMap[serviceName]
k.mu.RUnlock()
// 步骤 2: 如果缓存中有连接,检查健康状态
if exists {
log.Printf("[K8s Discovery] [%s] Found %d connections in cache, checking health", serviceName, len(conns))
// 检查连接健康状态
validConns := k.filterValidConns(serviceName, conns)
// 如果还有有效连接,更新缓存并返回
if len(validConns) > 0 {
// 如果有效连接数量减少,更新缓存
if len(validConns) < len(conns) {
log.Printf("[K8s Discovery] [%s] Removed %d invalid connections, %d valid connections remaining",
serviceName, len(conns)-len(validConns), len(validConns))
k.mu.Lock()
k.connMap[serviceName] = validConns
k.mu.Unlock()
} else {
log.Printf("[K8s Discovery] [%s] All %d connections are healthy", serviceName, len(validConns))
}
return validConns, nil
}
// 如果所有连接都失效,清除缓存并重新初始化
log.Printf("[K8s Discovery] [%s] All connections are invalid, clearing cache and reinitializing", serviceName)
k.mu.Lock()
delete(k.connMap, serviceName)
k.mu.Unlock()
} else {
log.Printf("[K8s Discovery] [%s] No connections in cache, initializing", serviceName)
}
// 步骤 3: 缓存中没有连接或所有连接都失效,重新初始化
k.mu.Lock()
// 双重检查:在获取写锁后再次检查,避免重复初始化
conns, exists = k.connMap[serviceName]
if exists {
log.Printf("[K8s Discovery] [%s] Connections were initialized by another goroutine", serviceName)
k.mu.Unlock()
return conns, nil
}
k.mu.Unlock()
// 初始化新连接
log.Printf("[K8s Discovery] [%s] Initializing new connections", serviceName)
if err := k.initializeConns(serviceName); err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to initialize connections: %v", serviceName, err)
return nil, fmt.Errorf("failed to initialize connections for service %s: %w", serviceName, err)
}
// 返回新初始化的连接
k.mu.RLock()
conns = k.connMap[serviceName]
k.mu.RUnlock()
log.Printf("[K8s Discovery] [%s] Returning %d connections", serviceName, len(conns))
return conns, nil
}
// filterValidConns 过滤出有效的连接
func (k *KubernetesConnManager) filterValidConns(serviceName string, conns []*grpc.ClientConn) []*grpc.ClientConn {
validConns := make([]*grpc.ClientConn, 0, len(conns))
invalidStates := make(map[connectivity.State]int)
for i, conn := range conns {
state := conn.GetState()
// 只保留 Ready 和 Idle 状态的连接
if state == connectivity.Ready || state == connectivity.Idle {
validConns = append(validConns, conn)
} else {
invalidStates[state]++
log.Printf("[K8s Discovery] [%s] Connection #%d is invalid, state: %v, closing", serviceName, i, state)
// 连接失效,关闭它
if err := conn.Close(); err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to close invalid connection #%d: %v", serviceName, i, err)
}
}
}
if len(invalidStates) > 0 {
log.Printf("[K8s Discovery] [%s] Invalid connection states: %v", serviceName, invalidStates)
}
return validConns
}
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
// 重要GetConn 使用 DNS避免连接被强制关闭
// 原因:
// 1. GetConn 返回的连接可能被长期复用
// 2. 如果使用 Endpoints 直连,连接会在 Endpoints 刷新时被关闭
// 3. 这会导致正在进行的请求失败grpc: the client connection is closing
// 4. DNS 方式由 gRPC 客户端管理连接,不会受到 Endpoints 刷新的影响
func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
log.Printf("[K8s Discovery] [%s] GetConn called (using DNS)", serviceName)
var target string
// 检查是否有自定义目标
if k.rpcTargets[serviceName] == "" {
// 获取 Service 端口
svcPort, err := k.getServicePort(serviceName)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to get service port: %v", serviceName, err)
return nil, err
}
// 构建 K8s DNS 名称
// 格式:<service>.<namespace>.svc.cluster.local:<port>
// K8s DNS 会自动解析到所有后端 Pod并实现负载均衡
target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort)
log.Printf("[K8s Discovery] [%s] Using DNS target: %s", serviceName, target)
} else {
// 使用自定义目标(如果有)
target = k.rpcTargets[serviceName]
log.Printf("[K8s Discovery] [%s] Using custom target: %s", serviceName, target)
}
// 创建 gRPC 连接
log.Printf("[K8s Discovery] [%s] Dialing DNS target: %s", serviceName, target)
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*10), // 最大接收消息大小10MB
grpc.MaxCallSendMsgSize(1024*1024*20), // 最大发送消息大小20MB
),
// 配置 KeepAlive
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
}, k.dialOptions...)...,
)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to dial DNS target %s: %v", serviceName, target, err)
return nil, err
}
// 检查连接状态
state := conn.GetState()
log.Printf("[K8s Discovery] [%s] Connection created, state: %v", serviceName, state)
// 如果连接不是 Ready 状态,等待一下
if state != connectivity.Ready {
log.Printf("[K8s Discovery] [%s] Connection not ready, waiting for state change", serviceName)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if conn.WaitForStateChange(ctx, state) {
newState := conn.GetState()
log.Printf("[K8s Discovery] [%s] Connection state changed: %v -> %v", serviceName, state, newState)
} else {
log.Printf("[K8s Discovery] [%s] WARNING: Connection state change timeout", serviceName)
}
}
log.Printf("[K8s Discovery] [%s] GetConn completed successfully", serviceName)
return conn, nil
}
// watchEndpoints 监听 Endpoints 资源变化
func (k *KubernetesConnManager) watchEndpoints() {
log.Printf("[K8s Discovery] Starting Endpoints watcher")
// 创建 Informer 工厂
// resyncPeriod: 10 分钟,定期重新同步资源
informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute*10)
// 创建 Endpoints Informer
// 注意:这里修复了原来的 bug
// 原来监听的是 Pod现在改为 Endpoints
informer := informerFactory.Core().V1().Endpoints().Informer()
log.Printf("[K8s Discovery] Endpoints Informer created")
// 注册事件处理器
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: 当新的 Endpoints 资源被创建时触发
AddFunc: func(obj interface{}) {
endpoint := obj.(*v1.Endpoints)
log.Printf("[K8s Discovery] [Watcher] Endpoints ADDED: %s", endpoint.Name)
k.handleEndpointChange(obj)
},
// UpdateFunc: 当 Endpoints 资源被更新时触发
UpdateFunc: func(oldObj, newObj interface{}) {
oldEndpoint := oldObj.(*v1.Endpoints)
newEndpoint := newObj.(*v1.Endpoints)
// 检查是否有实际变化
if k.endpointsChanged(oldEndpoint, newEndpoint) {
log.Printf("[K8s Discovery] [Watcher] Endpoints UPDATED: %s", newEndpoint.Name)
k.handleEndpointChange(newObj)
} else {
log.Printf("[K8s Discovery] [Watcher] Endpoints %s updated but no meaningful change", newEndpoint.Name)
}
},
// DeleteFunc: 当 Endpoints 资源被删除时触发
DeleteFunc: func(obj interface{}) {
endpoint := obj.(*v1.Endpoints)
log.Printf("[K8s Discovery] [Watcher] Endpoints DELETED: %s", endpoint.Name)
k.handleEndpointChange(obj)
},
})
// 启动 Informer
log.Printf("[K8s Discovery] Starting Informer factory")
informerFactory.Start(context.Background().Done())
// 等待 Informer 同步完成
log.Printf("[K8s Discovery] Waiting for Informer cache to sync")
if !cache.WaitForCacheSync(context.Background().Done(), informer.HasSynced) {
log.Printf("[K8s Discovery] ERROR: Failed to sync Informer cache")
return
}
log.Printf("[K8s Discovery] Informer cache synced successfully")
// 阻塞等待,直到程序退出
log.Printf("[K8s Discovery] Endpoints watcher is running")
<-context.Background().Done()
log.Printf("[K8s Discovery] Endpoints watcher stopped")
}
// endpointsChanged 检查 Endpoints 是否有实际变化
func (k *KubernetesConnManager) endpointsChanged(old, new *v1.Endpoints) bool {
// 比较地址列表
oldAddresses := make(map[string]bool)
for _, subset := range old.Subsets {
for _, address := range subset.Addresses {
oldAddresses[address.IP] = true
}
}
newAddresses := make(map[string]bool)
for _, subset := range new.Subsets {
for _, address := range subset.Addresses {
newAddresses[address.IP] = true
}
}
// 比较数量
if len(oldAddresses) != len(newAddresses) {
return true
}
// 比较内容
for ip := range oldAddresses {
if !newAddresses[ip] {
return true
}
}
return false
}
// handleEndpointChange 处理 Endpoints 资源变化
func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) {
// 类型断言
endpoint, ok := obj.(*v1.Endpoints)
if !ok {
// 类型断言失败,记录日志但不中断程序
log.Printf("[K8s Discovery] [Watcher] ERROR: Expected *v1.Endpoints, got %T", obj)
return
}
serviceName := endpoint.Name
log.Printf("[K8s Discovery] [Watcher] Handling Endpoints change for service: %s", serviceName)
// 统计 Endpoints 信息
var totalAddresses int
for _, subset := range endpoint.Subsets {
totalAddresses += len(subset.Addresses)
}
log.Printf("[K8s Discovery] [Watcher] Service %s has %d endpoint addresses", serviceName, totalAddresses)
// 重新初始化连接
if err := k.initializeConns(serviceName); err != nil {
// 初始化失败,记录错误但不中断程序
log.Printf("[K8s Discovery] [Watcher] ERROR: Failed to initialize connections for %s: %v", serviceName, err)
} else {
log.Printf("[K8s Discovery] [Watcher] Successfully updated connections for %s", serviceName)
}
}
// getServicePort 获取 Service 的 RPC 端口
func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) {
log.Printf("[K8s Discovery] [%s] Getting service port", serviceName)
svc, err := k.clientset.CoreV1().Services(k.namespace).Get(
context.Background(),
serviceName,
metav1.GetOptions{},
)
if err != nil {
log.Printf("[K8s Discovery] [%s] ERROR: Failed to get service: %v", serviceName, err)
return 0, fmt.Errorf("failed to get service %s: %w", serviceName, err)
}
if len(svc.Spec.Ports) == 0 {
log.Printf("[K8s Discovery] [%s] ERROR: Service has no ports defined", serviceName)
return 0, fmt.Errorf("service %s has no ports defined", serviceName)
}
// 找到 RPC 端口(非 10001
var svcPort int32
for _, port := range svc.Spec.Ports {
if port.Port != 10001 {
svcPort = port.Port
break
}
}
if svcPort == 0 {
log.Printf("[K8s Discovery] [%s] ERROR: Service has no RPC port (all ports are 10001)", serviceName)
return 0, fmt.Errorf("service %s has no RPC port (all ports are 10001)", serviceName)
}
log.Printf("[K8s Discovery] [%s] Service port: %d", serviceName, svcPort)
return svcPort, nil
}
// Close 关闭所有连接
func (k *KubernetesConnManager) Close() {
log.Printf("[K8s Discovery] Closing all connections")
k.mu.Lock()
defer k.mu.Unlock()
totalConns := 0
for serviceName, conns := range k.connMap {
log.Printf("[K8s Discovery] Closing %d connections for service %s", len(conns), serviceName)
for i, conn := range conns {
if err := conn.Close(); err != nil {
log.Printf("[K8s Discovery] ERROR: Failed to close connection #%d for service %s: %v", i, serviceName, err)
}
}
totalConns += len(conns)
}
log.Printf("[K8s Discovery] Closed %d total connections", totalConns)
k.connMap = make(map[string][]*grpc.ClientConn)
}
// GetSelfConnTarget returns the connection target for the current service.
func (k *KubernetesConnManager) GetSelfConnTarget() string {
if k.selfTarget == "" {
hostName := os.Getenv("HOSTNAME")
log.Printf("[K8s Discovery] Getting self connection target, HOSTNAME: %s", hostName)
pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("[K8s Discovery] ERROR: Failed to get pod %s: %v", hostName, err)
}
for pod.Status.PodIP == "" {
log.Printf("[K8s Discovery] Waiting for pod %s IP to be assigned", hostName)
pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{})
if err != nil {
log.Printf("[K8s Discovery] ERROR: Failed to get pod: %v", err)
}
time.Sleep(3 * time.Second)
}
var selfPort int32
for _, port := range pod.Spec.Containers[0].Ports {
if port.ContainerPort != 10001 {
selfPort = port.ContainerPort
break
}
}
k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort)
log.Printf("[K8s Discovery] Self connection target: %s", k.selfTarget)
}
return k.selfTarget
}
// AddOption appends gRPC dial options to the existing options.
func (k *KubernetesConnManager) AddOption(opts ...grpc.DialOption) {
k.mu.Lock()
defer k.mu.Unlock()
k.dialOptions = append(k.dialOptions, opts...)
log.Printf("[K8s Discovery] Added %d dial options", len(opts))
}
// CloseConn closes a given gRPC client connection.
func (k *KubernetesConnManager) CloseConn(conn *grpc.ClientConn) {
log.Printf("[K8s Discovery] Closing single connection")
conn.Close()
}
func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
// K8s 环境下不需要注册,返回 nil
return nil
}
func (k *KubernetesConnManager) UnRegister() error {
// K8s 环境下不需要注销,返回 nil
return nil
}
func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
// K8s 环境下不支持,返回空
return "", nil
}
```
---
## 3. 调试日志说明
### 3.1 日志格式
所有日志都使用统一的前缀:`[K8s Discovery]`,方便过滤和查找。
### 3.2 日志级别
- **INFO**:正常操作流程
- **WARNING**:需要注意但不影响功能
- **ERROR**:错误信息,需要关注
### 3.3 关键日志点
#### 3.3.1 初始化日志
```
[K8s Discovery] Initializing Kubernetes connection manager, namespace: default
[K8s Discovery] Successfully created in-cluster config
[K8s Discovery] Successfully created clientset
[K8s Discovery] Starting Endpoints watcher
[K8s Discovery] Kubernetes connection manager initialized successfully
```
#### 3.3.2 连接初始化日志
```
[K8s Discovery] [user-rpc-service] Starting to initialize connections
[K8s Discovery] [user-rpc-service] Service port: 10320
[K8s Discovery] [user-rpc-service] Found 3 endpoint addresses
[K8s Discovery] [user-rpc-service] Creating connection to 10.244.1.5:10320
[K8s Discovery] [user-rpc-service] Connection to 10.244.1.5:10320 created, state: Connecting
[K8s Discovery] [user-rpc-service] Successfully created 3 connections
[K8s Discovery] [user-rpc-service] Found 2 old connections to close
[K8s Discovery] [user-rpc-service] Connection map updated: 2 old -> 3 new
[K8s Discovery] [user-rpc-service] Scheduling delayed close for 2 old connections (5 seconds delay)
[K8s Discovery] [user-rpc-service] Connection initialization completed successfully
```
#### 3.3.3 Endpoints 监听日志
```
[K8s Discovery] [Watcher] Endpoints UPDATED: user-rpc-service
[K8s Discovery] [Watcher] Service user-rpc-service has 3 endpoint addresses
[K8s Discovery] [Watcher] Handling Endpoints change for service: user-rpc-service
[K8s Discovery] [Watcher] Successfully updated connections for user-rpc-service
```
#### 3.3.4 连接健康检查日志
```
[K8s Discovery] [user-rpc-service] GetConns called
[K8s Discovery] [user-rpc-service] Found 3 connections in cache, checking health
[K8s Discovery] [user-rpc-service] Connection #1 is invalid, state: Shutdown, closing
[K8s Discovery] [user-rpc-service] Removed 1 invalid connections, 2 valid connections remaining
[K8s Discovery] [user-rpc-service] Returning 2 connections
```
#### 3.3.5 GetConn 日志
```
[K8s Discovery] [msg-rpc-service] GetConn called (using DNS)
[K8s Discovery] [msg-rpc-service] Using DNS target: msg-rpc-service.default.svc.cluster.local:10280
[K8s Discovery] [msg-rpc-service] Dialing DNS target: msg-rpc-service.default.svc.cluster.local:10280
[K8s Discovery] [msg-rpc-service] Connection created, state: Ready
[K8s Discovery] [msg-rpc-service] GetConn completed successfully
```
---
## 4. 测试验证
### 4.1 测试场景 1Pod 重建
**步骤**
```bash
# 1. 查看日志
kubectl logs -f <pod-name> | grep "K8s Discovery"
# 2. 触发 Pod 重建
kubectl delete pod <target-pod-name>
# 3. 观察日志输出
# 应该看到:
# - Endpoints UPDATED 事件
# - 连接重新初始化
# - 旧连接延迟关闭
```
**预期日志**
```
[K8s Discovery] [Watcher] Endpoints UPDATED: user-rpc-service
[K8s Discovery] [user-rpc-service] Starting to initialize connections
[K8s Discovery] [user-rpc-service] Found 2 old connections to close
[K8s Discovery] [user-rpc-service] Scheduling delayed close for 2 old connections (5 seconds delay)
[K8s Discovery] [user-rpc-service] Connection initialization completed successfully
[K8s Discovery] [user-rpc-service] Closing 2 old connections
[K8s Discovery] [user-rpc-service] Closed 2/2 old connections
```
### 4.2 测试场景 2消息同步和推送
**步骤**
```bash
# 1. 发送消息
# 2. 触发 Pod 重建
kubectl delete pod <msg-service-pod>
# 3. 验证消息同步和推送不失败
# 4. 查看日志确认连接正常
```
**预期结果**
- ✅ 消息同步不失败GetConn 使用 DNS不受影响
- ✅ 消息推送不失败GetConns 自动更新连接)
- ✅ 日志显示连接自动恢复
### 4.3 测试场景 3连接健康检查
**步骤**
```bash
# 1. 查看当前连接状态
kubectl logs <pod-name> | grep "checking health"
# 2. 模拟连接失效(停止目标服务)
kubectl scale deployment <target-service> --replicas=0
# 3. 等待一段时间后查看日志
# 应该看到连接被标记为无效并移除
```
**预期日志**
```
[K8s Discovery] [user-rpc-service] GetConns called
[K8s Discovery] [user-rpc-service] Found 3 connections in cache, checking health
[K8s Discovery] [user-rpc-service] Connection #0 is invalid, state: TransientFailure, closing
[K8s Discovery] [user-rpc-service] Connection #1 is invalid, state: TransientFailure, closing
[K8s Discovery] [user-rpc-service] Connection #2 is invalid, state: TransientFailure, closing
[K8s Discovery] [user-rpc-service] All connections are invalid, clearing cache and reinitializing
```
---
## 5. 问题排查指南
### 5.1 常见问题
#### 问题 1连接初始化失败
**症状**
```
[K8s Discovery] [user-rpc-service] ERROR: Failed to dial endpoint 10.244.1.5:10320: connection refused
```
**可能原因**
- Pod 还未就绪
- 网络问题
- 端口配置错误
**排查步骤**
1. 检查 Pod 状态:`kubectl get pods`
2. 检查 Service 配置:`kubectl get svc user-rpc-service -o yaml`
3. 检查 Endpoints`kubectl get endpoints user-rpc-service`
#### 问题 2Endpoints 监听不工作
**症状**
- Pod 重建后连接不更新
- 没有看到 `[Watcher] Endpoints UPDATED` 日志
**可能原因**
- Informer 未启动
- 权限问题
- 网络问题
**排查步骤**
1. 检查日志中是否有 `[K8s Discovery] Starting Endpoints watcher`
2. 检查 ServiceAccount 权限
3. 手动检查 Endpoints`kubectl get endpoints -w`
#### 问题 3连接被强制关闭
**症状**
```
grpc: the client connection is closing
```
**可能原因**
- GetConn 使用了 Endpoints 直连(错误)
- 旧连接被立即关闭
**排查步骤**
1. 检查日志中是否有 `[K8s Discovery] [xxx] GetConn called (using DNS)`
2. 确认 GetConn 使用的是 DNS 而不是 Endpoints
3. 检查延迟关闭是否生效
#### 问题 4连接泄漏
**症状**
- 连接数量持续增长
- 内存使用持续增长
**可能原因**
- 旧连接未正确关闭
- 延迟关闭的 goroutine 未执行
**排查步骤**
1. 查看日志中是否有 `[K8s Discovery] [xxx] Closing X old connections`
2. 检查延迟关闭的日志
3. 监控连接数量变化
### 5.2 日志过滤命令
**查看所有 K8s Discovery 日志**
```bash
kubectl logs <pod-name> | grep "K8s Discovery"
```
**查看特定服务的日志**
```bash
kubectl logs <pod-name> | grep "K8s Discovery.*user-rpc-service"
```
**查看错误日志**
```bash
kubectl logs <pod-name> | grep "K8s Discovery.*ERROR"
```
**查看 Watcher 日志**
```bash
kubectl logs <pod-name> | grep "K8s Discovery.*Watcher"
```
**实时监控日志**
```bash
kubectl logs -f <pod-name> | grep "K8s Discovery"
```
### 5.3 调试技巧
1. **启用详细日志**:如果默认日志不够,可以增加日志级别
2. **监控连接状态**:定期检查连接数量和状态
3. **对比 Endpoints**:手动检查 Endpoints 是否与连接列表一致
4. **测试 Pod 重建**:主动触发 Pod 重建,观察连接更新过程
---
## 6. 总结
### 6.1 修复要点
1.**修复监听资源类型**:从 Pod 改为 Endpoints
2.**GetConn 使用 DNS**:避免连接被强制关闭
3.**GetConns 使用 Endpoints**:支持负载均衡和自动更新
4.**延迟关闭旧连接**:避免正在进行的请求失败
5.**添加健康检查**:确保连接有效性
6.**添加 KeepAlive**:支持自动重连
7.**添加详细日志**:方便问题排查
### 6.2 关键改进
- **解决了历史修复尝试中的问题**GetConn 使用 DNS避免连接被强制关闭
- **添加了完整的调试日志**:每个关键操作都有日志记录
- **改进了错误处理**:更好的错误信息和恢复机制
### 6.3 使用建议
1. **部署前**:充分测试 Pod 重建场景
2. **部署后**:监控日志,观察连接更新过程
3. **问题排查**:使用日志过滤命令快速定位问题
4. **持续优化**:根据实际使用情况调整延迟关闭时间
---
## 7. 相关文件
- 修复文件:`pkg/common/discovery/kubernetes/kubernetes.go`
- 测试脚本:可以编写自动化测试脚本验证修复效果
- 监控指标:可以添加 Prometheus 指标监控连接状态