diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 9cbb597ab..5431b7f23 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -1428,7 +1428,7 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, } // Create gRPC connection - conn, err := grpc.Dial(brokerAddress, s.grpcDialOption) + conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to broker: %w", err) } diff --git a/weed/admin/task/worker_communication.go b/weed/admin/task/worker_communication.go index b5d7a044d..3e6216a86 100644 --- a/weed/admin/task/worker_communication.go +++ b/weed/admin/task/worker_communication.go @@ -8,9 +8,11 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // WorkerConnection manages the gRPC connection to a single worker @@ -171,10 +173,9 @@ func (wcm *WorkerCommunicationManager) cleanupInactiveConnections() { // NewWorkerConnection creates a new worker connection func NewWorkerConnection(workerID, address string, adminServer *AdminServer) (*WorkerConnection, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) + // Convert address to gRPC address + grpcAddress := pb.ServerToGrpcAddress(address) + conn, err := grpc.NewClient(grpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, fmt.Errorf("failed to connect to worker at %s: %v", address, err) } diff --git a/weed/command/worker.go b/weed/command/worker.go index 1d1296e0a..760809340 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -97,6 +97,9 @@ func runWorker(cmd *Command, args []string) bool { } } + // Create gRPC dial option using TLS configuration + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") + // Create worker configuration config := &types.WorkerConfig{ AdminServer: *workerAdminServer, @@ -105,6 +108,7 @@ func runWorker(cmd *Command, args []string) bool { HeartbeatInterval: *workerHeartbeatInterval, TaskRequestInterval: *workerTaskRequestInterval, BaseWorkingDir: baseWorkingDir, + GrpcDialOption: grpcDialOption, } // Create worker instance @@ -113,9 +117,6 @@ func runWorker(cmd *Command, args []string) bool { glog.Fatalf("Failed to create worker: %v", err) return false } - - // Create admin client with LoadClientTLS - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption) if err != nil { glog.Fatalf("Failed to create admin client: %v", err) diff --git a/weed/worker/client.go b/weed/worker/client.go index 7359b44bd..7fc5ca267 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -115,6 +115,24 @@ func (c *GrpcAdminClient) attemptConnection() error { c.stream = stream c.connected = true + // Always check for worker info and send registration immediately as the very first message + c.mutex.RLock() + workerInfo := c.lastWorkerInfo + c.mutex.RUnlock() + + if workerInfo != nil { + // Send registration synchronously as the very first message + if err := c.sendRegistrationSync(workerInfo); err != nil { + c.conn.Close() + c.connected = false + return fmt.Errorf("failed to register worker: %w", err) + } + glog.Infof("Worker registered successfully with admin server") + } else { + // No worker info yet - stream will wait for registration + glog.V(1).Infof("Connected to admin server, waiting for worker registration info") + } + // Start stream handlers with synchronization outgoingReady := make(chan struct{}) incomingReady := make(chan struct{}) @@ -298,19 +316,7 @@ func (c *GrpcAdminClient) reconnect() error { return fmt.Errorf("failed to reconnect: %w", err) } - // Re-register worker if we have previous registration info - c.mutex.RLock() - workerInfo := c.lastWorkerInfo - c.mutex.RUnlock() - - if workerInfo != nil { - glog.Infof("Re-registering worker after reconnection...") - if err := c.RegisterWorker(workerInfo); err != nil { - glog.Warningf("Failed to re-register worker after reconnection: %v", err) - // Don't fail the reconnection because of registration failure - } - } - + // Registration is now handled in attemptConnection if worker info is available return nil } @@ -390,15 +396,17 @@ func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error { - if !c.connected { - return fmt.Errorf("not connected to admin server") - } - // Store worker info for re-registration after reconnection c.mutex.Lock() c.lastWorkerInfo = worker c.mutex.Unlock() + // If not connected, registration will happen when connection is established + if !c.connected { + glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") + return nil + } + return c.sendRegistration(worker) } @@ -449,6 +457,74 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { } } +// sendRegistrationSync sends the registration message synchronously +func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error { + capabilities := make([]string, len(worker.Capabilities)) + for i, cap := range worker.Capabilities { + capabilities[i] = string(cap) + } + + msg := &worker_pb.WorkerMessage{ + WorkerId: c.workerID, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_Registration{ + Registration: &worker_pb.WorkerRegistration{ + WorkerId: c.workerID, + Address: worker.Address, + Capabilities: capabilities, + MaxConcurrent: int32(worker.MaxConcurrent), + Metadata: make(map[string]string), + }, + }, + } + + // Send directly to stream to ensure it's the first message + if err := c.stream.Send(msg); err != nil { + return fmt.Errorf("failed to send registration message: %w", err) + } + + // Create a channel to receive the response + responseChan := make(chan *worker_pb.AdminMessage, 1) + errChan := make(chan error, 1) + + // Start a goroutine to listen for the response + go func() { + for { + response, err := c.stream.Recv() + if err != nil { + errChan <- fmt.Errorf("failed to receive registration response: %w", err) + return + } + + if regResp := response.GetRegistrationResponse(); regResp != nil { + responseChan <- response + return + } + // Continue waiting if it's not a registration response + } + }() + + // Wait for registration response with timeout + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + + select { + case response := <-responseChan: + if regResp := response.GetRegistrationResponse(); regResp != nil { + if regResp.Success { + glog.V(1).Infof("Worker registered successfully: %s", regResp.Message) + return nil + } + return fmt.Errorf("registration failed: %s", regResp.Message) + } + return fmt.Errorf("unexpected response type") + case err := <-errChan: + return err + case <-timeout.C: + return fmt.Errorf("registration timeout") + } +} + // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { if !c.connected { diff --git a/weed/worker/ec_worker.go b/weed/worker/ec_worker.go index 66fb4621b..1785ba65b 100644 --- a/weed/worker/ec_worker.go +++ b/weed/worker/ec_worker.go @@ -8,10 +8,13 @@ import ( "sync" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "google.golang.org/grpc" ) // ECWorker implements maintenance worker with actual EC functionality @@ -155,10 +158,9 @@ func (w *ECWorker) startGRPCServer() error { // connectToAdmin establishes connection to admin server func (w *ECWorker) connectToAdmin() error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - conn, err := grpc.DialContext(ctx, w.adminAddress, grpc.WithInsecure(), grpc.WithBlock()) + // Convert to gRPC address (HTTP port + 10000) + grpcAddress := pb.ServerToGrpcAddress(w.adminAddress) + conn, err := grpc.NewClient(grpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return fmt.Errorf("failed to connect to admin at %s: %v", w.adminAddress, err) } @@ -448,7 +450,9 @@ func (w *ECWorker) executeECEncode(task *ActiveTask) (bool, error) { return false, fmt.Errorf("no volume server address provided") } - conn, err := grpc.Dial(volumeServerAddress, grpc.WithInsecure()) + // Convert to gRPC address (HTTP port + 10000) + grpcAddress := pb.ServerToGrpcAddress(volumeServerAddress) + conn, err := grpc.NewClient(grpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return false, fmt.Errorf("failed to connect to volume server %s: %v", volumeServerAddress, err) } @@ -510,7 +514,8 @@ func (w *ECWorker) executeECRebuild(task *ActiveTask) (bool, error) { w.sendTaskUpdate(task, 0.1, "Initializing EC rebuild") // Connect to volume server - conn, err := grpc.Dial(task.Server, grpc.WithInsecure()) + grpcAddress := pb.ServerToGrpcAddress(task.Server) + conn, err := grpc.NewClient(grpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return false, fmt.Errorf("failed to connect to volume server: %v", err) } @@ -554,7 +559,8 @@ func (w *ECWorker) executeVacuum(task *ActiveTask) (bool, error) { } // Connect to volume server - conn, err := grpc.Dial(task.Server, grpc.WithInsecure()) + grpcAddress := pb.ServerToGrpcAddress(task.Server) + conn, err := grpc.NewClient(grpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return false, fmt.Errorf("failed to connect to volume server: %v", err) } diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index aeb8acafe..9cf8734ee 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -10,12 +10,14 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Task implements comprehensive erasure coding with local processing and smart distribution @@ -63,8 +65,9 @@ func NewTask(sourceServer string, volumeID uint32) *Task { BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), sourceServer: sourceServer, volumeID: volumeID, - masterClient: "localhost:9333", // Default master client - workDir: "/tmp/seaweedfs_ec_work", // Default work directory + masterClient: "localhost:9333", // Default master client + workDir: "/tmp/seaweedfs_ec_work", // Default work directory + grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure dataShards: 10, parityShards: 4, totalShards: 14, @@ -81,6 +84,7 @@ func NewTaskWithParams(sourceServer string, volumeID uint32, masterClient string volumeID: volumeID, masterClient: masterClient, workDir: workDir, + grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure dataShards: 10, parityShards: 4, totalShards: 14, @@ -89,62 +93,88 @@ func NewTaskWithParams(sourceServer string, volumeID uint32, masterClient string return task } -// Execute performs the comprehensive EC operation +// SetDialOption allows setting a custom gRPC dial option +func (t *Task) SetDialOption(dialOpt grpc.DialOption) { + t.grpcDialOpt = dialOpt +} + +// Execute performs the EC operation using SeaweedFS built-in EC generation func (t *Task) Execute(params types.TaskParams) error { glog.Infof("Starting erasure coding for volume %d from server %s", t.volumeID, t.sourceServer) - // Extract parameters + // Extract parameters - use the actual collection from task params, don't default to "default" t.collection = params.Collection - if t.collection == "" { - t.collection = "default" - } + // Note: Leave collection empty if not specified, as volumes may have been created with empty collection // Override defaults with parameters if provided if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { t.masterClient = mc } - if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { - t.workDir = wd - } - // Create working directory for this task - taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("ec_%d_%d", t.volumeID, time.Now().Unix())) - err := os.MkdirAll(taskWorkDir, 0755) + // Use the built-in SeaweedFS EC generation approach + ctx := context.Background() + + // Step 1: Connect to volume server + grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) if err != nil { - return fmt.Errorf("failed to create work directory %s: %v", taskWorkDir, err) + return fmt.Errorf("failed to connect to volume server %s: %v", t.sourceServer, err) } - defer t.cleanup(taskWorkDir) + defer conn.Close() - // Step 1: Copy volume data to local disk - if err := t.copyVolumeDataLocally(taskWorkDir); err != nil { - return fmt.Errorf("failed to copy volume data: %v", err) - } + client := volume_server_pb.NewVolumeServerClient(conn) - // Step 2: Mark source volume as read-only - if err := t.markVolumeReadOnly(); err != nil { - return fmt.Errorf("failed to mark volume read-only: %v", err) + // Step 2: Generate EC shards on the volume server + t.SetProgress(20.0) + glog.V(1).Infof("Generating EC shards for volume %d with collection '%s'", t.volumeID, t.collection) + + generateReq := &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: t.volumeID, + Collection: t.collection, } - // Step 3: Perform local EC encoding - shardFiles, err := t.performLocalECEncoding(taskWorkDir) + _, err = client.VolumeEcShardsGenerate(ctx, generateReq) if err != nil { - return fmt.Errorf("failed to perform EC encoding: %v", err) + return fmt.Errorf("failed to generate EC shards: %v", err) } - // Step 4: Find optimal shard placement - placements, err := t.calculateOptimalShardPlacement() + t.SetProgress(60.0) + glog.V(1).Infof("EC shards generated successfully for volume %d", t.volumeID) + + // Step 3: Mount EC shards + glog.V(1).Infof("Mounting EC shards for volume %d", t.volumeID) + + // Mount all EC shards (0-13: 10 data + 4 parity) + shardIds := make([]uint32, t.totalShards) + for i := 0; i < t.totalShards; i++ { + shardIds[i] = uint32(i) + } + + mountReq := &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: shardIds, + } + + _, err = client.VolumeEcShardsMount(ctx, mountReq) if err != nil { - return fmt.Errorf("failed to calculate shard placement: %v", err) + return fmt.Errorf("failed to mount EC shards: %v", err) } - // Step 5: Distribute shards to target servers - if err := t.distributeShards(shardFiles, placements); err != nil { - return fmt.Errorf("failed to distribute shards: %v", err) + t.SetProgress(80.0) + glog.V(1).Infof("EC shards mounted successfully for volume %d", t.volumeID) + + // Step 4: Mark original volume as read-only + glog.V(1).Infof("Marking volume %d as read-only", t.volumeID) + + readOnlyReq := &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: t.volumeID, } - // Step 6: Verify and cleanup source volume - if err := t.verifyAndCleanupSource(); err != nil { - return fmt.Errorf("failed to verify and cleanup: %v", err) + _, err = client.VolumeMarkReadonly(ctx, readOnlyReq) + if err != nil { + glog.Warningf("Failed to mark volume %d read-only: %v", t.volumeID, err) + // This is not a critical failure for EC encoding } t.SetProgress(100.0) @@ -160,8 +190,9 @@ func (t *Task) copyVolumeDataLocally(workDir string) error { ctx := context.Background() - // Connect to source volume server - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + // Connect to source volume server (convert to gRPC address) + grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) if err != nil { return fmt.Errorf("failed to connect to source server %s: %v", t.sourceServer, err) } @@ -254,7 +285,9 @@ func (t *Task) markVolumeReadOnly() error { glog.V(1).Infof("Marking volume %d as read-only", t.volumeID) ctx := context.Background() - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + // Convert to gRPC address + grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) if err != nil { return fmt.Errorf("failed to connect to source server: %v", err) } @@ -405,7 +438,7 @@ func (t *Task) calculateOptimalShardPlacement() ([]ShardPlacement, error) { // getAvailableServers retrieves available servers from the master func (t *Task) getAvailableServers() ([]*ServerInfo, error) { ctx := context.Background() - conn, err := grpc.Dial(t.masterClient, grpc.WithInsecure()) + conn, err := grpc.NewClient(t.masterClient, t.grpcDialOpt) if err != nil { return nil, fmt.Errorf("failed to connect to master: %v", err) } @@ -598,7 +631,9 @@ func (t *Task) uploadShardToServer(shardFile string, placement ShardPlacement) e glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr) ctx := context.Background() - conn, err := grpc.Dial(placement.ServerAddr, grpc.WithInsecure()) + // Convert to gRPC address + grpcAddress := pb.ServerToGrpcAddress(placement.ServerAddr) + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) if err != nil { return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err) } @@ -632,7 +667,9 @@ func (t *Task) verifyAndCleanupSource() error { glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID) ctx := context.Background() - conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure()) + // Convert to gRPC address + grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) if err != nil { return fmt.Errorf("failed to connect to source server: %v", err) } diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 70d3fc05f..4fe7e47bf 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -46,6 +46,12 @@ func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { // Create EC task with comprehensive capabilities task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) + + // Set gRPC dial option if provided + if params.GrpcDialOption != nil { + task.SetDialOption(params.GrpcDialOption) + } + task.SetEstimatedDuration(task.EstimateTime(params)) return task, nil diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go index 219318dbc..24233f59b 100644 --- a/weed/worker/tasks/vacuum/vacuum.go +++ b/weed/worker/tasks/vacuum/vacuum.go @@ -8,10 +8,12 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Task implements vacuum operation to reclaim disk space @@ -33,7 +35,7 @@ func NewTask(server string, volumeID uint32) *Task { return task } -// Execute executes the actual vacuum task using real SeaweedFS operations +// Execute performs the vacuum operation func (t *Task) Execute(params types.TaskParams) error { glog.Infof("Starting vacuum for volume %d on server %s", t.volumeID, t.server) @@ -48,8 +50,14 @@ func (t *Task) Execute(params types.TaskParams) error { } } - // Connect to volume server - conn, err := grpc.Dial(t.server, grpc.WithInsecure()) + // Convert server address to gRPC address and use proper dial option + grpcAddress := pb.ServerToGrpcAddress(t.server) + var dialOpt grpc.DialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + if params.GrpcDialOption != nil { + dialOpt = params.GrpcDialOption + } + + conn, err := grpc.NewClient(grpcAddress, dialOpt) if err != nil { return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err) } diff --git a/weed/worker/types/config_types.go b/weed/worker/types/config_types.go index 293ea6ade..5a9e94fd5 100644 --- a/weed/worker/types/config_types.go +++ b/weed/worker/types/config_types.go @@ -3,6 +3,8 @@ package types import ( "sync" "time" + + "google.golang.org/grpc" ) // WorkerConfig represents the configuration for a worker @@ -14,6 +16,7 @@ type WorkerConfig struct { TaskRequestInterval time.Duration `json:"task_request_interval"` BaseWorkingDir string `json:"base_working_dir,omitempty"` CustomParameters map[string]interface{} `json:"custom_parameters,omitempty"` + GrpcDialOption grpc.DialOption `json:"-"` // Not serializable, for runtime use only } // MaintenanceConfig represents the configuration for the maintenance system diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go index 3e3edd9cf..e8fe8f8ce 100644 --- a/weed/worker/types/task_types.go +++ b/weed/worker/types/task_types.go @@ -2,6 +2,8 @@ package types import ( "time" + + "google.golang.org/grpc" ) // TaskType represents the type of maintenance task @@ -57,11 +59,12 @@ type Task struct { // TaskParams represents parameters for task execution type TaskParams struct { - VolumeID uint32 `json:"volume_id,omitempty"` - Server string `json:"server,omitempty"` - Collection string `json:"collection,omitempty"` - WorkingDir string `json:"working_dir,omitempty"` - Parameters map[string]interface{} `json:"parameters,omitempty"` + VolumeID uint32 `json:"volume_id,omitempty"` + Server string `json:"server,omitempty"` + Collection string `json:"collection,omitempty"` + WorkingDir string `json:"working_dir,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` + GrpcDialOption grpc.DialOption `json:"-"` // Not serializable, for runtime use only } // TaskDetectionResult represents the result of scanning for maintenance needs diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 6f78fe6ff..8d869b2bd 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -174,7 +174,23 @@ func (w *Worker) Start() error { w.running = true w.startTime = time.Now() - // Start connection attempt (will retry automatically via reconnection loop) + // Prepare worker info for registration + workerInfo := &types.Worker{ + ID: w.id, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + } + + // Register worker info with client first (this stores it for use during connection) + if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + glog.V(1).Infof("Worker info stored for registration: %v", err) + // This is expected if not connected yet + } + + // Start connection attempt (will register immediately if successful) glog.Infof("Worker %s starting, attempting to connect to admin server...", w.id) // Try initial connection, but don't fail if it doesn't work immediately @@ -182,7 +198,6 @@ func (w *Worker) Start() error { glog.Warningf("Initial connection to admin server failed, will keep retrying: %v", err) // Don't return error - let the reconnection loop handle it } - // Note: Registration is handled by connectionMonitorLoop to ensure proper protocol ordering // Start worker loops regardless of initial connection status // They will handle connection failures gracefully @@ -337,11 +352,12 @@ func (w *Worker) executeTask(task *types.Task) { // Create task instance taskParams := types.TaskParams{ - VolumeID: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - WorkingDir: taskWorkingDir, - Parameters: task.Parameters, + VolumeID: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + WorkingDir: taskWorkingDir, + Parameters: task.Parameters, + GrpcDialOption: w.config.GrpcDialOption, } taskInstance, err := w.registry.CreateTask(task.Type, taskParams) @@ -480,65 +496,22 @@ func (w *Worker) registerWorker() { } } -// connectionMonitorLoop monitors connection status and registers when connected +// connectionMonitorLoop monitors connection status func (w *Worker) connectionMonitorLoop() { - ticker := time.NewTicker(10 * time.Second) // Check every 10 seconds + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds defer ticker.Stop() - lastConnected := false - registrationCompleted := false - for { select { case <-w.stopChan: return case <-ticker.C: - if w.adminClient != nil { - var currentlyConnected bool - - if !registrationCompleted { - // Before registration, test connectivity by attempting registration - glog.Infof("Connection to admin server established, registering worker...") - - workerInfo := &types.Worker{ - ID: w.id, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - Status: "active", - CurrentLoad: 0, - LastHeartbeat: time.Now(), - } - - err := w.adminClient.RegisterWorker(workerInfo) - if err == nil { - currentlyConnected = true - registrationCompleted = true - glog.Infof("Worker %s registered successfully with admin server", w.id) - } else { - glog.V(2).Infof("Registration failed, will retry: %v", err) - currentlyConnected = false - } - } else { - // After registration, use heartbeat to test connectivity - err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ - WorkerID: w.id, - Status: "active", - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), - LastHeartbeat: time.Now(), - }) - - currentlyConnected = (err == nil) - - // If we lost connection, reset registration status - if !currentlyConnected && lastConnected { - glog.Warningf("Lost connection to admin server, will re-register on reconnection") - registrationCompleted = false - } - } - - lastConnected = currentlyConnected + // Just monitor connection status - registration is handled automatically + // by the client's reconnection logic + if w.adminClient != nil && w.adminClient.IsConnected() { + glog.V(2).Infof("Worker %s connection status: connected", w.id) + } else if w.adminClient != nil { + glog.V(1).Infof("Worker %s connection status: disconnected, reconnection in progress", w.id) } } }