diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 74410aab6..ba52a79e2 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -117,7 +117,7 @@ func (s *WorkerGrpcServer) Stop() error { s.connMutex.Lock() for _, conn := range s.connections { conn.cancel() - close(conn.outgoing) + s.safeCloseOutgoingChannel(conn, "Stop") } s.connections = make(map[string]*WorkerConnection) s.connMutex.Unlock() @@ -182,15 +182,27 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr } conn.capabilities = capabilities - // Register connection + // Register connection - clean up old connection if worker is reconnecting s.connMutex.Lock() + if oldConn, exists := s.connections[workerID]; exists { + glog.Infof("Worker %s reconnected, cleaning up old connection", workerID) + // Cancel old connection to stop its goroutines + oldConn.cancel() + // Don't close oldConn.outgoing here as it may cause panic in handleOutgoingMessages + // Let the goroutine exit naturally when it detects context cancellation + } s.connections[workerID] = conn s.connMutex.Unlock() // Register worker with maintenance manager s.registerWorkerWithManager(conn) - // Send registration response + // IMPORTANT: Start outgoing message handler BEFORE sending registration response + // This ensures the handler is ready to process messages and prevents race conditions + // where the worker might send requests before we're ready to respond + go s.handleOutgoingMessages(conn) + + // Send registration response (after handler is started) regResponse := &worker_pb.AdminMessage{ Timestamp: time.Now().Unix(), Message: &worker_pb.AdminMessage_RegistrationResponse{ @@ -203,13 +215,11 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr select { case conn.outgoing <- regResponse: + glog.V(1).Infof("Registration response sent to worker %s", workerID) case <-time.After(5 * time.Second): glog.Errorf("Failed to send registration response to worker %s", workerID) } - // Start outgoing message handler - go s.handleOutgoingMessages(conn) - // Handle incoming messages for { select { @@ -235,7 +245,9 @@ func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStr return err } + s.connMutex.Lock() conn.lastSeen = time.Now() + s.connMutex.Unlock() s.handleWorkerMessage(conn, msg) } } @@ -440,16 +452,36 @@ func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, respons s.logRequestsMutex.Unlock() } +// safeCloseOutgoingChannel safely closes the outgoing channel for a worker connection. +func (s *WorkerGrpcServer) safeCloseOutgoingChannel(conn *WorkerConnection, source string) { + defer func() { + if r := recover(); r != nil { + glog.V(1).Infof("%s: recovered from panic closing outgoing channel for worker %s: %v", source, conn.workerID, r) + } + }() + close(conn.outgoing) +} + // unregisterWorker removes a worker connection func (s *WorkerGrpcServer) unregisterWorker(workerID string) { s.connMutex.Lock() - if conn, exists := s.connections[workerID]; exists { - conn.cancel() - close(conn.outgoing) - delete(s.connections, workerID) + conn, exists := s.connections[workerID] + if !exists { + s.connMutex.Unlock() + glog.V(2).Infof("unregisterWorker: worker %s not found in connections map (already unregistered)", workerID) + return } + + // Remove from map first to prevent duplicate cleanup attempts + delete(s.connections, workerID) s.connMutex.Unlock() + // Cancel context to signal goroutines to stop + conn.cancel() + + // Safely close the outgoing channel with recover to handle potential double-close + s.safeCloseOutgoingChannel(conn, "unregisterWorker") + glog.V(1).Infof("Unregistered worker %s", workerID) } @@ -479,7 +511,7 @@ func (s *WorkerGrpcServer) cleanupStaleConnections() { if conn.lastSeen.Before(cutoff) { glog.Warningf("Cleaning up stale worker connection: %s", workerID) conn.cancel() - close(conn.outgoing) + s.safeCloseOutgoingChannel(conn, "cleanupStaleConnections") delete(s.connections, workerID) } } diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index 6ac28685e..dfa413e70 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -266,7 +266,26 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo // UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error { - return s.activeTopology.UpdateTopology(topologyInfo) + // Log topology details before update for diagnostics + if topologyInfo != nil { + dcCount, nodeCount, diskCount := topology.CountTopologyResources(topologyInfo) + glog.V(2).Infof("UpdateTopologyInfo: received topology with %d datacenters, %d nodes, %d disks", + dcCount, nodeCount, diskCount) + } else { + glog.Warningf("UpdateTopologyInfo: received nil topologyInfo") + } + + err := s.activeTopology.UpdateTopology(topologyInfo) + + if err != nil { + glog.Errorf("UpdateTopologyInfo: topology update failed: %v", err) + } else { + // Log success with current disk count + currentDiskCount := s.activeTopology.GetDiskCount() + glog.V(1).Infof("UpdateTopologyInfo: topology update successful, active topology now has %d disks", currentDiskCount) + } + + return err } // convertToExistingFormat converts task results to existing system format using dynamic mapping diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 4aab137e0..69762c47c 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -137,6 +137,7 @@ func (mm *MaintenanceManager) Start() error { // Start background processes go mm.scanLoop() go mm.cleanupLoop() + go mm.topologyStatusLoop() // Periodic diagnostic logging glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds) return nil @@ -255,6 +256,54 @@ func (mm *MaintenanceManager) cleanupLoop() { } } +// topologyStatusLoop periodically logs topology status for diagnostics +func (mm *MaintenanceManager) topologyStatusLoop() { + // Log topology status every 5 minutes for diagnostic purposes + statusInterval := 5 * time.Minute + ticker := time.NewTicker(statusInterval) + defer ticker.Stop() + + for mm.running { + select { + case <-mm.stopChan: + return + case <-ticker.C: + mm.logTopologyStatus() + } + } +} + +// logTopologyStatus logs current topology and worker status for diagnostics +func (mm *MaintenanceManager) logTopologyStatus() { + if mm.scanner == nil || mm.scanner.integration == nil { + glog.V(2).Infof("Topology status: scanner/integration not available") + return + } + + activeTopology := mm.scanner.integration.GetActiveTopology() + if activeTopology == nil { + glog.V(1).Infof("Topology status: ActiveTopology is nil") + return + } + + diskCount := activeTopology.GetDiskCount() + nodeCount := len(activeTopology.GetAllNodes()) + + // Get queue stats + stats := mm.queue.GetStats() + workerCount := len(mm.queue.GetWorkers()) + + mm.mutex.RLock() + errorCount := mm.errorCount + mm.mutex.RUnlock() + + glog.V(0).Infof("Topology status: %d nodes, %d disks, %d workers, %d pending tasks, %d running tasks, errors: %d", + nodeCount, diskCount, workerCount, + stats.TasksByStatus[TaskStatusPending], + stats.TasksByStatus[TaskStatusInProgress]+stats.TasksByStatus[TaskStatusAssigned], + errorCount) +} + // performScan executes a maintenance scan with error handling and backoff func (mm *MaintenanceManager) performScan() { defer func() { diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index 65b7dfe7e..25837e09b 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -8,11 +8,53 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) +// CountTopologyResources counts datacenters, nodes, and disks in topology info +func CountTopologyResources(topologyInfo *master_pb.TopologyInfo) (dcCount, nodeCount, diskCount int) { + if topologyInfo == nil { + return 0, 0, 0 + } + dcCount = len(topologyInfo.DataCenterInfos) + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + nodeCount += len(rack.DataNodeInfos) + for _, node := range rack.DataNodeInfos { + diskCount += len(node.DiskInfos) + } + } + } + return +} + // UpdateTopology updates the topology information from master func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error { at.mutex.Lock() defer at.mutex.Unlock() + // Validate topology updates to prevent clearing disk maps with invalid data + if topologyInfo == nil { + glog.Warningf("UpdateTopology received nil topologyInfo, preserving last-known-good topology") + return fmt.Errorf("rejected invalid topology update: nil topologyInfo") + } + + if len(topologyInfo.DataCenterInfos) == 0 { + glog.Warningf("UpdateTopology received empty DataCenterInfos, preserving last-known-good topology (had %d nodes, %d disks)", + len(at.nodes), len(at.disks)) + return fmt.Errorf("rejected invalid topology update: empty DataCenterInfos (had %d nodes, %d disks)", len(at.nodes), len(at.disks)) + } + + // Count incoming topology for validation logging + dcCount, incomingNodes, incomingDisks := CountTopologyResources(topologyInfo) + + // Reject updates that would wipe out a valid topology with an empty one (e.g. during master restart) + if incomingNodes == 0 && len(at.nodes) > 0 { + glog.Warningf("UpdateTopology received topology with 0 nodes, preserving last-known-good topology (had %d nodes, %d disks)", + len(at.nodes), len(at.disks)) + return fmt.Errorf("rejected invalid topology update: 0 nodes (had %d nodes, %d disks)", len(at.nodes), len(at.disks)) + } + + glog.V(2).Infof("UpdateTopology: validating update with %d datacenters, %d nodes, %d disks (current: %d nodes, %d disks)", + dcCount, incomingNodes, incomingDisks, len(at.nodes), len(at.disks)) + at.topologyInfo = topologyInfo at.lastUpdated = time.Now() @@ -142,8 +184,21 @@ func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo { return disks } +// GetDiskCount returns the total number of disks in the active topology +func (at *ActiveTopology) GetDiskCount() int { + at.mutex.RLock() + defer at.mutex.RUnlock() + return len(at.disks) +} + // rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups func (at *ActiveTopology) rebuildIndexes() { + // Nil-safety guard: return early if topology is not valid + if at.topologyInfo == nil || at.topologyInfo.DataCenterInfos == nil { + glog.V(1).Infof("rebuildIndexes: skipping rebuild due to nil topology or DataCenterInfos") + return + } + // Clear existing indexes at.volumeIndex = make(map[uint32][]string) at.ecShardIndex = make(map[uint32][]string) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index e199cddbe..ebd2220df 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -19,7 +19,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -33,6 +35,11 @@ const ( // gRPC keepalive settings - must be consistent between client and server GrpcKeepAliveTime = 60 * time.Second // ping interval when no activity GrpcKeepAliveTimeout = 20 * time.Second // ping timeout + + // Connection recycling for Docker Swarm environments + // Forces connections to be recycled periodically to handle DNS changes + GrpcMaxConnectionAge = 5 * time.Minute // max time a connection may exist + GrpcMaxConnectionAgeGrace = 30 * time.Second // grace period for RPCs to complete ) var ( @@ -56,9 +63,10 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { var options []grpc.ServerOption options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: GrpcKeepAliveTime, // server pings client if no activity for this long - Timeout: GrpcKeepAliveTimeout, // ping timeout - // MaxConnectionAge: 10 * time.Hour, + Time: GrpcKeepAliveTime, // server pings client if no activity for this long + Timeout: GrpcKeepAliveTimeout, // ping timeout + MaxConnectionAge: GrpcMaxConnectionAge, // max connection age for Docker Swarm DNS refresh + MaxConnectionAgeGrace: GrpcMaxConnectionAgeGrace, // grace period for in-flight RPCs }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: GrpcKeepAliveTime, // min time a client should wait before sending a ping @@ -112,9 +120,11 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO existingConnection, found := grpcClients[address] if found { + glog.V(3).Infof("gRPC cache hit for %s (version %d)", address, existingConnection.version) return existingConnection, nil } + glog.V(2).Infof("Creating new gRPC connection to %s", address) ctx := context.Background() grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { @@ -127,6 +137,7 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO 0, } grpcClients[address] = vgc + glog.V(2).Infof("New gRPC connection established to %s (version %d)", address, vgc.version) return vgc, nil } @@ -163,6 +174,33 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor { } } +// shouldInvalidateConnection checks if an error indicates the cached connection should be invalidated +func shouldInvalidateConnection(err error) bool { + if err == nil { + return false + } + + // Check gRPC status codes first (more reliable) + if s, ok := status.FromError(err); ok { + code := s.Code() + switch code { + case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.Internal: + return true + } + } + + // Fall back to string matching for transport-level errors not captured by gRPC codes + errStr := err.Error() + errLower := strings.ToLower(errStr) + return strings.Contains(errLower, "transport") || + strings.Contains(errLower, "connection closed") || + strings.Contains(errLower, "dns") || + strings.Contains(errLower, "connection refused") || + strings.Contains(errLower, "no route to host") || + strings.Contains(errLower, "network is unreachable") || + strings.Contains(errLower, "connection reset") +} + // WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { @@ -173,11 +211,11 @@ func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientCon } executionErr := fn(vgc.ClientConn) if executionErr != nil { - if strings.Contains(executionErr.Error(), "transport") || - strings.Contains(executionErr.Error(), "connection closed") { + if shouldInvalidateConnection(executionErr) { grpcClientsLock.Lock() if t, ok := grpcClients[address]; ok { if t.version == vgc.version { + glog.V(1).Infof("Removing cached gRPC connection to %s due to error: %v", address, executionErr) vgc.Close() delete(grpcClients, address) } diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 81de30894..9e5cb0cec 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -165,6 +165,7 @@ func (mc *MasterClient) tryAllMasters(ctx context.Context) { func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) stats.MasterClientConnectCounter.WithLabelValues("total").Inc() + connectStartTime := time.Now() gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -175,6 +176,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc() return err } + glog.V(0).Infof("%s.%s masterClient gRPC stream established to %s in %v", mc.FilerGroup, mc.clientType, master, time.Since(connectStartTime)) if err = stream.Send(&master_pb.KeepConnectedRequest{ FilerGroup: mc.FilerGroup, @@ -380,14 +382,22 @@ func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { } func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { - glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) + glog.V(0).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) + reconnectCount := 0 for { select { case <-ctx.Done(): glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) return default: + reconnectStart := time.Now() + if reconnectCount > 0 { + glog.V(0).Infof("%s.%s masterClient reconnection attempt #%d", mc.FilerGroup, mc.clientType, reconnectCount) + } mc.tryAllMasters(ctx) + reconnectCount++ + glog.V(1).Infof("%s.%s masterClient connection cycle completed in %v, sleeping before retry", + mc.FilerGroup, mc.clientType, time.Since(reconnectStart)) time.Sleep(time.Second) } } diff --git a/weed/worker/client.go b/weed/worker/client.go index 4485154a7..74c80662c 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -211,10 +211,18 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { s.connected = true s.stream = stream + // Start stream handlers BEFORE sending registration + // This ensures handleIncoming is ready to receive the registration response + s.streamExit = make(chan struct{}) + go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) + go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) + // Always check for worker info and send registration immediately as the very first message if s.lastWorkerInfo != nil { - // Send registration synchronously as the very first message - if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil { + // Send registration via the normal outgoing channel and wait for response via incoming + if err := c.sendRegistration(s.lastWorkerInfo); err != nil { + close(s.streamExit) + s.streamCancel() s.conn.Close() s.connected = false return fmt.Errorf("failed to register worker: %w", err) @@ -225,11 +233,6 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { glog.V(1).Infof("Connected to admin server, waiting for worker registration info") } - // Start stream handlers - s.streamExit = make(chan struct{}) - go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) - go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) - glog.Infof("Connected to admin server at %s", c.adminAddress) return nil } @@ -237,6 +240,9 @@ func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // reconnect attempts to re-establish the connection func (c *GrpcAdminClient) reconnect(s *grpcState) error { // Clean up existing connection completely + if s.streamExit != nil { + close(s.streamExit) + } if s.streamCancel != nil { s.streamCancel() } @@ -456,7 +462,8 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { s.conn.Close() } - // Close channels + // Close channels to signal all goroutines to stop + // This will cause any pending sends/receives to fail gracefully close(c.outgoing) close(c.incoming) @@ -525,76 +532,6 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { } } -// sendRegistrationSync sends the registration message synchronously -func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream worker_pb.WorkerService_WorkerStreamClient) error { - capabilities := make([]string, len(worker.Capabilities)) - for i, cap := range worker.Capabilities { - capabilities[i] = string(cap) - } - - msg := &worker_pb.WorkerMessage{ - WorkerId: c.workerID, - Timestamp: time.Now().Unix(), - Message: &worker_pb.WorkerMessage_Registration{ - Registration: &worker_pb.WorkerRegistration{ - WorkerId: c.workerID, - Address: worker.Address, - Capabilities: capabilities, - MaxConcurrent: int32(worker.MaxConcurrent), - Metadata: make(map[string]string), - }, - }, - } - - // Send directly to stream to ensure it's the first message - if err := stream.Send(msg); err != nil { - return fmt.Errorf("failed to send registration message: %w", err) - } - - // Create a channel to receive the response - responseChan := make(chan *worker_pb.AdminMessage, 1) - errChan := make(chan error, 1) - - // Start a goroutine to listen for the response - go func() { - for { - response, err := stream.Recv() - if err != nil { - errChan <- fmt.Errorf("failed to receive registration response: %w", err) - return - } - - if regResp := response.GetRegistrationResponse(); regResp != nil { - responseChan <- response - return - } - // Continue waiting if it's not a registration response - // If stream is stuck, reconnect() will kill it, cleaning up this - // goroutine - } - }() - - // Wait for registration response with timeout - timeout := time.NewTimer(10 * time.Second) - defer timeout.Stop() - - select { - case response := <-responseChan: - if regResp := response.GetRegistrationResponse(); regResp != nil { - if regResp.Success { - glog.V(1).Infof("Worker registered successfully: %s", regResp.Message) - return nil - } - return fmt.Errorf("registration failed: %s", regResp.Message) - } - return fmt.Errorf("unexpected response type") - case err := <-errChan: - return err - case <-timeout.C: - return fmt.Errorf("registration timeout") - } -} - func (c *GrpcAdminClient) IsConnected() bool { respCh := make(chan bool, 1)