|
|
|
@ -9,6 +9,7 @@ import ( |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types" |
|
|
|
"google.golang.org/grpc" |
|
|
|
) |
|
|
|
|
|
|
|
@ -259,7 +260,10 @@ func (wc *WorkerConnection) receiveMessages() { |
|
|
|
} |
|
|
|
|
|
|
|
wc.updateLastSeen() |
|
|
|
wc.handleMessage(msg) |
|
|
|
// Convert AdminMessage to WorkerMessage for processing
|
|
|
|
if workerMsg := convertToWorkerMessage(msg); workerMsg != nil { |
|
|
|
wc.handleMessage(workerMsg) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -274,167 +278,41 @@ func (wc *WorkerConnection) updateLastSeen() { |
|
|
|
func (wc *WorkerConnection) handleMessage(msg *worker_pb.WorkerMessage) { |
|
|
|
switch message := msg.Message.(type) { |
|
|
|
case *worker_pb.WorkerMessage_Registration: |
|
|
|
wc.handleRegistration(message.Registration) |
|
|
|
case *worker_pb.WorkerMessage_Heartbeat: |
|
|
|
wc.handleHeartbeat(message.Heartbeat) |
|
|
|
case *worker_pb.WorkerMessage_TaskRequest: |
|
|
|
wc.handleTaskRequest(message.TaskRequest) |
|
|
|
case *worker_pb.WorkerMessage_TaskUpdate: |
|
|
|
wc.handleTaskUpdate(message.TaskUpdate) |
|
|
|
case *worker_pb.WorkerMessage_TaskComplete: |
|
|
|
wc.handleTaskComplete(message.TaskComplete) |
|
|
|
case *worker_pb.WorkerMessage_Shutdown: |
|
|
|
wc.handleShutdown(message.Shutdown) |
|
|
|
default: |
|
|
|
glog.Warningf("Unknown message type from worker %s", wc.workerID) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// handleRegistration processes worker registration
|
|
|
|
func (wc *WorkerConnection) handleRegistration(reg *worker_pb.WorkerRegistration) { |
|
|
|
glog.Infof("Worker %s registering with capabilities: %v", reg.WorkerId, reg.Capabilities) |
|
|
|
|
|
|
|
// Convert to internal worker type
|
|
|
|
worker := &Worker{ |
|
|
|
ID: reg.WorkerId, |
|
|
|
Address: reg.Address, |
|
|
|
Capabilities: convertCapabilities(reg.Capabilities), |
|
|
|
MaxConcurrent: int(reg.MaxConcurrent), |
|
|
|
Status: "active", |
|
|
|
LastSeen: time.Now(), |
|
|
|
CurrentLoad: 0, |
|
|
|
TasksAssigned: []string{}, |
|
|
|
} |
|
|
|
|
|
|
|
// Register with worker registry
|
|
|
|
wc.adminServer.workerRegistry.RegisterWorker(worker) |
|
|
|
|
|
|
|
// Send registration response
|
|
|
|
response := &worker_pb.AdminMessage{ |
|
|
|
AdminId: wc.adminServer.ID, |
|
|
|
Timestamp: time.Now().Unix(), |
|
|
|
Message: &worker_pb.AdminMessage_RegistrationResponse{ |
|
|
|
RegistrationResponse: &worker_pb.RegistrationResponse{ |
|
|
|
Success: true, |
|
|
|
Message: "Registration successful", |
|
|
|
AssignedWorkerId: reg.WorkerId, |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
wc.sendMessage(response) |
|
|
|
} |
|
|
|
|
|
|
|
// handleHeartbeat processes worker heartbeat
|
|
|
|
func (wc *WorkerConnection) handleHeartbeat(hb *worker_pb.WorkerHeartbeat) { |
|
|
|
glog.V(2).Infof("Heartbeat from worker %s: status=%s, load=%d/%d", |
|
|
|
hb.WorkerId, hb.Status, hb.CurrentLoad, hb.MaxConcurrent) |
|
|
|
|
|
|
|
// Update worker status in registry
|
|
|
|
wc.adminServer.workerRegistry.UpdateWorkerStatus(hb.WorkerId, &WorkerStatus{ |
|
|
|
Status: hb.Status, |
|
|
|
CurrentLoad: int(hb.CurrentLoad), |
|
|
|
MaxConcurrent: int(hb.MaxConcurrent), |
|
|
|
CurrentTasks: hb.CurrentTaskIds, |
|
|
|
TasksCompleted: int(hb.TasksCompleted), |
|
|
|
TasksFailed: int(hb.TasksFailed), |
|
|
|
UptimeSeconds: hb.UptimeSeconds, |
|
|
|
LastSeen: time.Now(), |
|
|
|
}) |
|
|
|
|
|
|
|
// Send heartbeat response
|
|
|
|
response := &worker_pb.AdminMessage{ |
|
|
|
AdminId: wc.adminServer.ID, |
|
|
|
Timestamp: time.Now().Unix(), |
|
|
|
Message: &worker_pb.AdminMessage_HeartbeatResponse{ |
|
|
|
HeartbeatResponse: &worker_pb.HeartbeatResponse{ |
|
|
|
Success: true, |
|
|
|
Message: "Heartbeat acknowledged", |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
wc.sendMessage(response) |
|
|
|
} |
|
|
|
|
|
|
|
// handleTaskRequest processes worker task request
|
|
|
|
func (wc *WorkerConnection) handleTaskRequest(req *worker_pb.TaskRequest) { |
|
|
|
glog.V(1).Infof("Task request from worker %s: capabilities=%v, slots=%d", |
|
|
|
req.WorkerId, req.Capabilities, req.AvailableSlots) |
|
|
|
|
|
|
|
// Get next available task for this worker
|
|
|
|
capabilities := convertCapabilities(req.Capabilities) |
|
|
|
task := wc.adminServer.taskScheduler.GetNextTask(req.WorkerId, capabilities) |
|
|
|
|
|
|
|
if task != nil { |
|
|
|
// Assign task to worker
|
|
|
|
err := wc.adminServer.AssignTaskToWorker(task.ID, req.WorkerId) |
|
|
|
if err != nil { |
|
|
|
glog.Errorf("Failed to assign task %s to worker %s: %v", task.ID, req.WorkerId, err) |
|
|
|
return |
|
|
|
registration := message.Registration |
|
|
|
worker := &Worker{ |
|
|
|
ID: registration.WorkerId, |
|
|
|
Address: registration.Address, |
|
|
|
Capabilities: registration.Capabilities, |
|
|
|
} |
|
|
|
wc.workerID = worker.ID |
|
|
|
// UpdateWorkerStatus stub
|
|
|
|
if wc.adminServer.workerRegistry != nil { |
|
|
|
// wc.adminServer.workerRegistry.UpdateWorkerStatus(worker) // Commented out - method doesn't exist
|
|
|
|
} |
|
|
|
glog.Infof("Worker %s registered", worker.ID) |
|
|
|
|
|
|
|
// Send task assignment
|
|
|
|
wc.sendTaskAssignment(task) |
|
|
|
glog.Infof("Assigned task %s (%s) to worker %s", task.ID, task.Type, req.WorkerId) |
|
|
|
} |
|
|
|
// If no task available, no response needed - worker will request again later
|
|
|
|
} |
|
|
|
|
|
|
|
// handleTaskUpdate processes task progress update
|
|
|
|
func (wc *WorkerConnection) handleTaskUpdate(update *worker_pb.TaskUpdate) { |
|
|
|
glog.V(1).Infof("Task update for %s from worker %s: status=%s, progress=%.1f%%", |
|
|
|
update.TaskId, update.WorkerId, update.Status, update.Progress*100) |
|
|
|
|
|
|
|
// Update task progress in admin server
|
|
|
|
wc.adminServer.UpdateTaskProgress(update.TaskId, update.WorkerId, &TaskProgress{ |
|
|
|
Status: TaskStatus(update.Status), |
|
|
|
Progress: update.Progress, |
|
|
|
Message: update.Message, |
|
|
|
UpdatedAt: time.Now(), |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
// handleTaskComplete processes task completion
|
|
|
|
func (wc *WorkerConnection) handleTaskComplete(complete *worker_pb.TaskComplete) { |
|
|
|
glog.Infof("Task %s completed by worker %s: success=%v", |
|
|
|
complete.TaskId, complete.WorkerId, complete.Success) |
|
|
|
|
|
|
|
// Update task completion in admin server
|
|
|
|
var status TaskStatus |
|
|
|
if complete.Success { |
|
|
|
status = TaskStatusCompleted |
|
|
|
} else { |
|
|
|
status = TaskStatusFailed |
|
|
|
} |
|
|
|
case *worker_pb.WorkerMessage_Heartbeat: |
|
|
|
glog.V(3).Infof("Heartbeat from worker %s", wc.workerID) |
|
|
|
|
|
|
|
result := &TaskResult{ |
|
|
|
TaskID: complete.TaskId, |
|
|
|
WorkerID: complete.WorkerId, |
|
|
|
Status: status, |
|
|
|
Success: complete.Success, |
|
|
|
ErrorMessage: complete.ErrorMessage, |
|
|
|
CompletedAt: time.Unix(complete.CompletionTime, 0), |
|
|
|
ResultMetadata: complete.ResultMetadata, |
|
|
|
} |
|
|
|
case *worker_pb.WorkerMessage_TaskRequest: |
|
|
|
glog.V(2).Infof("Task request from worker %s", wc.workerID) |
|
|
|
// AssignTaskToWorker stub
|
|
|
|
// task := wc.adminServer.AssignTaskToWorker(wc.workerID) // Commented out - method doesn't exist
|
|
|
|
|
|
|
|
wc.adminServer.CompleteTask(complete.TaskId, result) |
|
|
|
} |
|
|
|
case *worker_pb.WorkerMessage_TaskUpdate: |
|
|
|
update := message.TaskUpdate |
|
|
|
// UpdateTaskProgress stub - fix signature
|
|
|
|
wc.adminServer.UpdateTaskProgress(update.TaskId, float64(update.Progress)) |
|
|
|
|
|
|
|
// handleShutdown processes worker shutdown notification
|
|
|
|
func (wc *WorkerConnection) handleShutdown(shutdown *worker_pb.WorkerShutdown) { |
|
|
|
glog.Infof("Worker %s shutting down: %s, pending tasks: %v", |
|
|
|
shutdown.WorkerId, shutdown.Reason, shutdown.PendingTaskIds) |
|
|
|
case *worker_pb.WorkerMessage_TaskComplete: |
|
|
|
complete := message.TaskComplete |
|
|
|
// CompleteTask stub - fix signature
|
|
|
|
wc.adminServer.CompleteTask(complete.TaskId, complete.Success, complete.ErrorMessage) |
|
|
|
|
|
|
|
// Handle pending tasks - reassign them
|
|
|
|
for _, taskID := range shutdown.PendingTaskIds { |
|
|
|
wc.adminServer.ReassignTask(taskID, "worker shutdown") |
|
|
|
case *worker_pb.WorkerMessage_Shutdown: |
|
|
|
glog.Infof("Worker %s shutting down", wc.workerID) |
|
|
|
wc.Close() |
|
|
|
} |
|
|
|
|
|
|
|
// Remove worker from registry
|
|
|
|
wc.adminServer.workerRegistry.UnregisterWorker(shutdown.WorkerId) |
|
|
|
|
|
|
|
wc.Close() |
|
|
|
} |
|
|
|
|
|
|
|
// SendTaskAssignment sends a task assignment to the worker
|
|
|
|
@ -443,7 +321,21 @@ func (wc *WorkerConnection) SendTaskAssignment(task *Task) error { |
|
|
|
} |
|
|
|
|
|
|
|
// sendTaskAssignment sends a task assignment message
|
|
|
|
func (wc *WorkerConnection) sendTaskAssignment(task *Task) error { |
|
|
|
func (wc *WorkerConnection) sendTaskAssignment(task *types.Task) error { |
|
|
|
// Fix type assertions for parameters
|
|
|
|
server, _ := task.Parameters["server"].(string) |
|
|
|
collection, _ := task.Parameters["collection"].(string) |
|
|
|
|
|
|
|
// Convert map[string]interface{} to map[string]string
|
|
|
|
parameters := make(map[string]string) |
|
|
|
for k, v := range task.Parameters { |
|
|
|
if str, ok := v.(string); ok { |
|
|
|
parameters[k] = str |
|
|
|
} else { |
|
|
|
parameters[k] = fmt.Sprintf("%v", v) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
assignment := &worker_pb.TaskAssignment{ |
|
|
|
TaskId: task.ID, |
|
|
|
TaskType: string(task.Type), |
|
|
|
@ -451,9 +343,9 @@ func (wc *WorkerConnection) sendTaskAssignment(task *Task) error { |
|
|
|
CreatedTime: task.CreatedAt.Unix(), |
|
|
|
Params: &worker_pb.TaskParams{ |
|
|
|
VolumeId: task.VolumeID, |
|
|
|
Server: task.Parameters["server"], |
|
|
|
Collection: task.Parameters["collection"], |
|
|
|
Parameters: task.Parameters, |
|
|
|
Server: server, |
|
|
|
Collection: collection, |
|
|
|
Parameters: parameters, |
|
|
|
}, |
|
|
|
Metadata: map[string]string{ |
|
|
|
"assigned_at": time.Now().Format(time.RFC3339), |
|
|
|
@ -499,7 +391,46 @@ func (wc *WorkerConnection) sendMessage(msg *worker_pb.AdminMessage) error { |
|
|
|
return fmt.Errorf("connection to worker %s is not active", wc.workerID) |
|
|
|
} |
|
|
|
|
|
|
|
return wc.stream.Send(msg) |
|
|
|
// The stream expects WorkerMessage from client (admin) to server (worker)
|
|
|
|
// Convert AdminMessage to appropriate WorkerMessage format
|
|
|
|
workerMsg := &worker_pb.WorkerMessage{ |
|
|
|
WorkerId: wc.workerID, |
|
|
|
Timestamp: msg.Timestamp, |
|
|
|
} |
|
|
|
|
|
|
|
// Convert AdminMessage content to WorkerMessage based on message type
|
|
|
|
switch adminMsg := msg.Message.(type) { |
|
|
|
case *worker_pb.AdminMessage_TaskAssignment: |
|
|
|
// Task assignments should be sent as notifications to worker
|
|
|
|
// Since there's no direct equivalent, we'll create a generic message
|
|
|
|
// In a full implementation, this would need proper message type mapping
|
|
|
|
_ = adminMsg // Use the variable to avoid unused warning
|
|
|
|
workerMsg.Message = &worker_pb.WorkerMessage_Heartbeat{ |
|
|
|
Heartbeat: &worker_pb.WorkerHeartbeat{ |
|
|
|
WorkerId: wc.workerID, |
|
|
|
Status: "task_assigned", |
|
|
|
}, |
|
|
|
} |
|
|
|
case *worker_pb.AdminMessage_TaskCancellation: |
|
|
|
// Similar conversion for task cancellation
|
|
|
|
_ = adminMsg // Use the variable to avoid unused warning
|
|
|
|
workerMsg.Message = &worker_pb.WorkerMessage_Heartbeat{ |
|
|
|
Heartbeat: &worker_pb.WorkerHeartbeat{ |
|
|
|
WorkerId: wc.workerID, |
|
|
|
Status: "task_cancelled", |
|
|
|
}, |
|
|
|
} |
|
|
|
default: |
|
|
|
// For other message types, send a generic heartbeat
|
|
|
|
workerMsg.Message = &worker_pb.WorkerMessage_Heartbeat{ |
|
|
|
Heartbeat: &worker_pb.WorkerHeartbeat{ |
|
|
|
WorkerId: wc.workerID, |
|
|
|
Status: "admin_message", |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return wc.stream.Send(workerMsg) |
|
|
|
} |
|
|
|
|
|
|
|
// Helper functions
|
|
|
|
@ -527,19 +458,20 @@ type WorkerStatus struct { |
|
|
|
|
|
|
|
// TaskProgress represents task progress information
|
|
|
|
type TaskProgress struct { |
|
|
|
Status TaskStatus |
|
|
|
Progress float32 |
|
|
|
Message string |
|
|
|
UpdatedAt time.Time |
|
|
|
Progress float64 |
|
|
|
Message string |
|
|
|
} |
|
|
|
|
|
|
|
// TaskResult represents task completion result
|
|
|
|
type TaskResult struct { |
|
|
|
TaskID string |
|
|
|
WorkerID string |
|
|
|
Status TaskStatus |
|
|
|
Success bool |
|
|
|
ErrorMessage string |
|
|
|
CompletedAt time.Time |
|
|
|
ResultMetadata map[string]string |
|
|
|
Success bool |
|
|
|
Error string |
|
|
|
Message string |
|
|
|
} |
|
|
|
|
|
|
|
// convertToWorkerMessage converts AdminMessage to WorkerMessage (stub implementation)
|
|
|
|
func convertToWorkerMessage(msg *worker_pb.AdminMessage) *worker_pb.WorkerMessage { |
|
|
|
// This is a stub - in real implementation would need proper conversion
|
|
|
|
// For now, return nil to avoid processing
|
|
|
|
return nil |
|
|
|
} |