You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
692 lines
18 KiB
692 lines
18 KiB
package worker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// ECWorker implements maintenance worker with actual EC functionality
|
|
type ECWorker struct {
|
|
workerID string
|
|
adminAddress string
|
|
grpcAddress string
|
|
capabilities []string
|
|
maxConcurrent int
|
|
|
|
// gRPC server and client
|
|
server *grpc.Server
|
|
adminConn *grpc.ClientConn
|
|
adminClient worker_pb.WorkerServiceClient
|
|
adminStream worker_pb.WorkerService_WorkerStreamClient
|
|
|
|
// Task management
|
|
currentTasks map[string]*ActiveTask
|
|
taskMutex sync.RWMutex
|
|
|
|
// Control
|
|
running bool
|
|
stopCh chan struct{}
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// ActiveTask represents a task currently being executed
|
|
type ActiveTask struct {
|
|
ID string
|
|
Type string
|
|
VolumeID uint32
|
|
Server string
|
|
Parameters map[string]string
|
|
StartedAt time.Time
|
|
Progress float32
|
|
Status string
|
|
Context context.Context
|
|
Cancel context.CancelFunc
|
|
}
|
|
|
|
// NewECWorker creates a new EC worker
|
|
func NewECWorker(workerID, adminAddress, grpcAddress string) *ECWorker {
|
|
return &ECWorker{
|
|
workerID: workerID,
|
|
adminAddress: adminAddress,
|
|
grpcAddress: grpcAddress,
|
|
capabilities: []string{"ec_encode", "ec_rebuild", "vacuum"},
|
|
maxConcurrent: 2, // Can handle 2 concurrent tasks
|
|
currentTasks: make(map[string]*ActiveTask),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts the worker
|
|
func (w *ECWorker) Start() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if w.running {
|
|
return fmt.Errorf("worker already running")
|
|
}
|
|
|
|
glog.Infof("Starting EC worker %s", w.workerID)
|
|
|
|
// Start gRPC server
|
|
err := w.startGRPCServer()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start gRPC server: %v", err)
|
|
}
|
|
|
|
// Connect to admin server
|
|
err = w.connectToAdmin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to admin: %v", err)
|
|
}
|
|
|
|
w.running = true
|
|
|
|
// Start background goroutines
|
|
go w.adminCommunicationLoop()
|
|
go w.heartbeatLoop()
|
|
go w.taskRequestLoop()
|
|
|
|
glog.Infof("EC worker %s started successfully", w.workerID)
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the worker
|
|
func (w *ECWorker) Stop() {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if !w.running {
|
|
return
|
|
}
|
|
|
|
glog.Infof("Stopping EC worker %s", w.workerID)
|
|
|
|
close(w.stopCh)
|
|
|
|
// Cancel all active tasks
|
|
w.taskMutex.Lock()
|
|
for _, task := range w.currentTasks {
|
|
task.Cancel()
|
|
}
|
|
w.taskMutex.Unlock()
|
|
|
|
// Close connections
|
|
if w.adminConn != nil {
|
|
w.adminConn.Close()
|
|
}
|
|
|
|
if w.server != nil {
|
|
w.server.Stop()
|
|
}
|
|
|
|
w.running = false
|
|
glog.Infof("EC worker %s stopped", w.workerID)
|
|
}
|
|
|
|
// startGRPCServer starts the worker's gRPC server
|
|
func (w *ECWorker) startGRPCServer() error {
|
|
listener, err := net.Listen("tcp", w.grpcAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen on %s: %v", w.grpcAddress, err)
|
|
}
|
|
|
|
w.server = grpc.NewServer()
|
|
// Register any worker-specific services here
|
|
|
|
go func() {
|
|
err := w.server.Serve(listener)
|
|
if err != nil {
|
|
glog.Errorf("gRPC server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
glog.Infof("Worker gRPC server listening on %s", w.grpcAddress)
|
|
return nil
|
|
}
|
|
|
|
// connectToAdmin establishes connection to admin server
|
|
func (w *ECWorker) connectToAdmin() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
conn, err := grpc.DialContext(ctx, w.adminAddress, grpc.WithInsecure(), grpc.WithBlock())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to admin at %s: %v", w.adminAddress, err)
|
|
}
|
|
|
|
w.adminConn = conn
|
|
w.adminClient = worker_pb.NewWorkerServiceClient(conn)
|
|
|
|
// Create bidirectional stream
|
|
stream, err := w.adminClient.WorkerStream(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create admin stream: %v", err)
|
|
}
|
|
|
|
w.adminStream = stream
|
|
|
|
// Send registration message
|
|
err = w.sendRegistration()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register with admin: %v", err)
|
|
}
|
|
|
|
glog.Infof("Connected to admin server at %s", w.adminAddress)
|
|
return nil
|
|
}
|
|
|
|
// sendRegistration sends worker registration to admin
|
|
func (w *ECWorker) sendRegistration() error {
|
|
registration := &worker_pb.WorkerMessage{
|
|
WorkerId: w.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_Registration{
|
|
Registration: &worker_pb.WorkerRegistration{
|
|
WorkerId: w.workerID,
|
|
Address: w.grpcAddress,
|
|
Capabilities: w.capabilities,
|
|
MaxConcurrent: int32(w.maxConcurrent),
|
|
Metadata: map[string]string{
|
|
"version": "1.0",
|
|
"type": "ec_worker",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
return w.adminStream.Send(registration)
|
|
}
|
|
|
|
// adminCommunicationLoop handles messages from admin server
|
|
func (w *ECWorker) adminCommunicationLoop() {
|
|
for {
|
|
select {
|
|
case <-w.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
msg, err := w.adminStream.Recv()
|
|
if err != nil {
|
|
glog.Errorf("Error receiving from admin: %v", err)
|
|
time.Sleep(5 * time.Second) // Retry connection
|
|
continue
|
|
}
|
|
|
|
w.handleAdminMessage(msg)
|
|
}
|
|
}
|
|
|
|
// handleAdminMessage processes messages from admin server
|
|
func (w *ECWorker) handleAdminMessage(msg *worker_pb.AdminMessage) {
|
|
switch message := msg.Message.(type) {
|
|
case *worker_pb.AdminMessage_RegistrationResponse:
|
|
w.handleRegistrationResponse(message.RegistrationResponse)
|
|
case *worker_pb.AdminMessage_TaskAssignment:
|
|
w.handleTaskAssignment(message.TaskAssignment)
|
|
case *worker_pb.AdminMessage_TaskCancellation:
|
|
w.handleTaskCancellation(message.TaskCancellation)
|
|
case *worker_pb.AdminMessage_AdminShutdown:
|
|
w.handleAdminShutdown(message.AdminShutdown)
|
|
default:
|
|
glog.Warningf("Unknown message type from admin")
|
|
}
|
|
}
|
|
|
|
// handleRegistrationResponse processes registration response
|
|
func (w *ECWorker) handleRegistrationResponse(resp *worker_pb.RegistrationResponse) {
|
|
if resp.Success {
|
|
glog.Infof("Worker %s registered successfully with admin", w.workerID)
|
|
} else {
|
|
glog.Errorf("Worker registration failed: %s", resp.Message)
|
|
}
|
|
}
|
|
|
|
// handleTaskAssignment processes task assignment from admin
|
|
func (w *ECWorker) handleTaskAssignment(assignment *worker_pb.TaskAssignment) {
|
|
glog.Infof("Received task assignment: %s (%s) for volume %d",
|
|
assignment.TaskId, assignment.TaskType, assignment.Params.VolumeId)
|
|
|
|
// Check if we can accept the task
|
|
w.taskMutex.RLock()
|
|
currentLoad := len(w.currentTasks)
|
|
w.taskMutex.RUnlock()
|
|
|
|
if currentLoad >= w.maxConcurrent {
|
|
glog.Warningf("Worker at capacity, cannot accept task %s", assignment.TaskId)
|
|
return
|
|
}
|
|
|
|
// Create active task
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
task := &ActiveTask{
|
|
ID: assignment.TaskId,
|
|
Type: assignment.TaskType,
|
|
VolumeID: assignment.Params.VolumeId,
|
|
Server: assignment.Params.Server,
|
|
Parameters: assignment.Params.Parameters,
|
|
StartedAt: time.Now(),
|
|
Progress: 0.0,
|
|
Status: "started",
|
|
Context: ctx,
|
|
Cancel: cancel,
|
|
}
|
|
|
|
w.taskMutex.Lock()
|
|
w.currentTasks[assignment.TaskId] = task
|
|
w.taskMutex.Unlock()
|
|
|
|
// Start task execution
|
|
go w.executeTask(task)
|
|
}
|
|
|
|
// handleTaskCancellation processes task cancellation
|
|
func (w *ECWorker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
|
|
glog.Infof("Received task cancellation: %s", cancellation.TaskId)
|
|
|
|
w.taskMutex.Lock()
|
|
defer w.taskMutex.Unlock()
|
|
|
|
if task, exists := w.currentTasks[cancellation.TaskId]; exists {
|
|
task.Cancel()
|
|
delete(w.currentTasks, cancellation.TaskId)
|
|
glog.Infof("Cancelled task %s", cancellation.TaskId)
|
|
}
|
|
}
|
|
|
|
// handleAdminShutdown processes admin shutdown notification
|
|
func (w *ECWorker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) {
|
|
glog.Infof("Admin server shutting down: %s", shutdown.Reason)
|
|
w.Stop()
|
|
}
|
|
|
|
// heartbeatLoop sends periodic heartbeats to admin
|
|
func (w *ECWorker) heartbeatLoop() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
w.sendHeartbeat()
|
|
case <-w.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat sends heartbeat to admin server
|
|
func (w *ECWorker) sendHeartbeat() {
|
|
w.taskMutex.RLock()
|
|
currentLoad := len(w.currentTasks)
|
|
taskIDs := make([]string, 0, len(w.currentTasks))
|
|
for taskID := range w.currentTasks {
|
|
taskIDs = append(taskIDs, taskID)
|
|
}
|
|
w.taskMutex.RUnlock()
|
|
|
|
heartbeat := &worker_pb.WorkerMessage{
|
|
WorkerId: w.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_Heartbeat{
|
|
Heartbeat: &worker_pb.WorkerHeartbeat{
|
|
WorkerId: w.workerID,
|
|
Status: "active",
|
|
CurrentLoad: int32(currentLoad),
|
|
MaxConcurrent: int32(w.maxConcurrent),
|
|
CurrentTaskIds: taskIDs,
|
|
TasksCompleted: 0, // TODO: Track completed tasks
|
|
TasksFailed: 0, // TODO: Track failed tasks
|
|
UptimeSeconds: int64(time.Since(time.Now()).Seconds()), // TODO: Track actual uptime
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := w.adminStream.Send(heartbeat); err != nil {
|
|
glog.Errorf("Failed to send heartbeat: %v", err)
|
|
}
|
|
}
|
|
|
|
// taskRequestLoop periodically requests new tasks from admin
|
|
func (w *ECWorker) taskRequestLoop() {
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
w.requestTasks()
|
|
case <-w.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// requestTasks requests new tasks from admin if we have capacity
|
|
func (w *ECWorker) requestTasks() {
|
|
w.taskMutex.RLock()
|
|
currentLoad := len(w.currentTasks)
|
|
w.taskMutex.RUnlock()
|
|
|
|
availableSlots := w.maxConcurrent - currentLoad
|
|
if availableSlots <= 0 {
|
|
return // No capacity
|
|
}
|
|
|
|
request := &worker_pb.WorkerMessage{
|
|
WorkerId: w.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskRequest{
|
|
TaskRequest: &worker_pb.TaskRequest{
|
|
WorkerId: w.workerID,
|
|
Capabilities: w.capabilities,
|
|
AvailableSlots: int32(availableSlots),
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := w.adminStream.Send(request); err != nil {
|
|
glog.Errorf("Failed to request tasks: %v", err)
|
|
}
|
|
}
|
|
|
|
// executeTask executes a task based on its type
|
|
func (w *ECWorker) executeTask(task *ActiveTask) {
|
|
defer func() {
|
|
w.taskMutex.Lock()
|
|
delete(w.currentTasks, task.ID)
|
|
w.taskMutex.Unlock()
|
|
}()
|
|
|
|
glog.Infof("Starting execution of task %s (%s) for volume %d",
|
|
task.ID, task.Type, task.VolumeID)
|
|
|
|
var err error
|
|
var success bool
|
|
|
|
switch task.Type {
|
|
case "ec_encode":
|
|
success, err = w.executeECEncode(task)
|
|
case "ec_rebuild":
|
|
success, err = w.executeECRebuild(task)
|
|
case "vacuum":
|
|
success, err = w.executeVacuum(task)
|
|
default:
|
|
err = fmt.Errorf("unknown task type: %s", task.Type)
|
|
success = false
|
|
}
|
|
|
|
// Send completion message
|
|
w.sendTaskCompletion(task, success, err)
|
|
|
|
if success {
|
|
glog.Infof("Task %s completed successfully", task.ID)
|
|
} else {
|
|
glog.Errorf("Task %s failed: %v", task.ID, err)
|
|
}
|
|
}
|
|
|
|
// executeECEncode performs actual EC encoding on a volume
|
|
func (w *ECWorker) executeECEncode(task *ActiveTask) (bool, error) {
|
|
glog.Infof("Performing EC encoding on volume %d", task.VolumeID)
|
|
|
|
// Update progress
|
|
w.sendTaskUpdate(task, 0.1, "Initializing EC encoding")
|
|
|
|
// Connect to volume server
|
|
volumeServerAddress := task.Server
|
|
if volumeServerAddress == "" {
|
|
return false, fmt.Errorf("no volume server address provided")
|
|
}
|
|
|
|
conn, err := grpc.Dial(volumeServerAddress, grpc.WithInsecure())
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to connect to volume server %s: %v", volumeServerAddress, err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := volume_server_pb.NewVolumeServerClient(conn)
|
|
|
|
// Step 1: Generate EC shards
|
|
w.sendTaskUpdate(task, 0.2, "Generating EC shards")
|
|
|
|
generateReq := &volume_server_pb.VolumeEcShardsGenerateRequest{
|
|
VolumeId: task.VolumeID,
|
|
Collection: task.Parameters["collection"],
|
|
}
|
|
|
|
_, err = client.VolumeEcShardsGenerate(task.Context, generateReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("EC shard generation failed: %v", err)
|
|
}
|
|
|
|
w.sendTaskUpdate(task, 0.6, "EC shards generated successfully")
|
|
|
|
// Step 2: Mount EC volume
|
|
w.sendTaskUpdate(task, 0.8, "Mounting EC volume")
|
|
|
|
mountReq := &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: task.VolumeID,
|
|
Collection: task.Parameters["collection"],
|
|
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, // All EC shards
|
|
}
|
|
|
|
_, err = client.VolumeEcShardsMount(task.Context, mountReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("EC shard mount failed: %v", err)
|
|
}
|
|
|
|
// Step 3: Mark original volume as read-only
|
|
w.sendTaskUpdate(task, 0.9, "Marking volume read-only")
|
|
|
|
readOnlyReq := &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: task.VolumeID,
|
|
}
|
|
|
|
_, err = client.VolumeMarkReadonly(task.Context, readOnlyReq)
|
|
if err != nil {
|
|
glog.Warningf("Failed to mark volume %d read-only: %v", task.VolumeID, err)
|
|
// This is not a critical failure for EC encoding
|
|
}
|
|
|
|
w.sendTaskUpdate(task, 1.0, "EC encoding completed")
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// executeECRebuild performs EC shard rebuilding
|
|
func (w *ECWorker) executeECRebuild(task *ActiveTask) (bool, error) {
|
|
glog.Infof("Performing EC rebuild on volume %d", task.VolumeID)
|
|
|
|
w.sendTaskUpdate(task, 0.1, "Initializing EC rebuild")
|
|
|
|
// Connect to volume server
|
|
conn, err := grpc.Dial(task.Server, grpc.WithInsecure())
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to connect to volume server: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := volume_server_pb.NewVolumeServerClient(conn)
|
|
|
|
// Rebuild missing/corrupted shards
|
|
w.sendTaskUpdate(task, 0.5, "Rebuilding EC shards")
|
|
|
|
rebuildReq := &volume_server_pb.VolumeEcShardsRebuildRequest{
|
|
VolumeId: task.VolumeID,
|
|
Collection: task.Parameters["collection"],
|
|
}
|
|
|
|
_, err = client.VolumeEcShardsRebuild(task.Context, rebuildReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("EC rebuild failed: %v", err)
|
|
}
|
|
|
|
w.sendTaskUpdate(task, 1.0, "EC rebuild completed")
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// executeVacuum performs volume vacuum operation
|
|
func (w *ECWorker) executeVacuum(task *ActiveTask) (bool, error) {
|
|
glog.Infof("Performing vacuum on volume %d", task.VolumeID)
|
|
|
|
w.sendTaskUpdate(task, 0.1, "Initializing vacuum")
|
|
|
|
// Parse garbage threshold
|
|
thresholdStr := task.Parameters["garbage_threshold"]
|
|
if thresholdStr == "" {
|
|
thresholdStr = "0.3" // Default 30%
|
|
}
|
|
|
|
threshold, err := strconv.ParseFloat(thresholdStr, 32)
|
|
if err != nil {
|
|
return false, fmt.Errorf("invalid garbage threshold: %v", err)
|
|
}
|
|
|
|
// Connect to volume server
|
|
conn, err := grpc.Dial(task.Server, grpc.WithInsecure())
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to connect to volume server: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := volume_server_pb.NewVolumeServerClient(conn)
|
|
|
|
// Step 1: Check vacuum eligibility
|
|
w.sendTaskUpdate(task, 0.2, "Checking vacuum eligibility")
|
|
|
|
checkReq := &volume_server_pb.VacuumVolumeCheckRequest{
|
|
VolumeId: task.VolumeID,
|
|
}
|
|
|
|
checkResp, err := client.VacuumVolumeCheck(task.Context, checkReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("vacuum check failed: %v", err)
|
|
}
|
|
|
|
if checkResp.GarbageRatio < float64(threshold) {
|
|
return true, fmt.Errorf("volume %d garbage ratio %.2f%% below threshold %.2f%%",
|
|
task.VolumeID, checkResp.GarbageRatio*100, threshold*100)
|
|
}
|
|
|
|
// Step 2: Compact volume
|
|
w.sendTaskUpdate(task, 0.4, "Compacting volume")
|
|
|
|
compactReq := &volume_server_pb.VacuumVolumeCompactRequest{
|
|
VolumeId: task.VolumeID,
|
|
}
|
|
|
|
compactStream, err := client.VacuumVolumeCompact(task.Context, compactReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("vacuum compact failed: %v", err)
|
|
}
|
|
|
|
// Process compact stream
|
|
for {
|
|
resp, err := compactStream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return false, fmt.Errorf("vacuum compact stream error: %v", err)
|
|
}
|
|
|
|
progress := 0.4 + 0.4*(float64(resp.ProcessedBytes)/float64(resp.LoadAvg_1M)) // Rough progress estimate
|
|
w.sendTaskUpdate(task, float32(progress), "Compacting volume")
|
|
}
|
|
|
|
// Step 3: Commit vacuum
|
|
w.sendTaskUpdate(task, 0.9, "Committing vacuum")
|
|
|
|
commitReq := &volume_server_pb.VacuumVolumeCommitRequest{
|
|
VolumeId: task.VolumeID,
|
|
}
|
|
|
|
_, err = client.VacuumVolumeCommit(task.Context, commitReq)
|
|
if err != nil {
|
|
return false, fmt.Errorf("vacuum commit failed: %v", err)
|
|
}
|
|
|
|
// Step 4: Cleanup
|
|
w.sendTaskUpdate(task, 0.95, "Cleaning up")
|
|
|
|
cleanupReq := &volume_server_pb.VacuumVolumeCleanupRequest{
|
|
VolumeId: task.VolumeID,
|
|
}
|
|
|
|
_, err = client.VacuumVolumeCleanup(task.Context, cleanupReq)
|
|
if err != nil {
|
|
glog.Warningf("Vacuum cleanup warning: %v", err)
|
|
// Non-critical error
|
|
}
|
|
|
|
w.sendTaskUpdate(task, 1.0, "Vacuum completed successfully")
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// sendTaskUpdate sends task progress update to admin
|
|
func (w *ECWorker) sendTaskUpdate(task *ActiveTask, progress float32, message string) {
|
|
task.Progress = progress
|
|
task.Status = message
|
|
|
|
update := &worker_pb.WorkerMessage{
|
|
WorkerId: w.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskUpdate{
|
|
TaskUpdate: &worker_pb.TaskUpdate{
|
|
TaskId: task.ID,
|
|
WorkerId: w.workerID,
|
|
Status: task.Status,
|
|
Progress: progress,
|
|
Message: message,
|
|
Metadata: map[string]string{
|
|
"updated_at": time.Now().Format(time.RFC3339),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := w.adminStream.Send(update); err != nil {
|
|
glog.Errorf("Failed to send task update: %v", err)
|
|
}
|
|
}
|
|
|
|
// sendTaskCompletion sends task completion to admin
|
|
func (w *ECWorker) sendTaskCompletion(task *ActiveTask, success bool, taskErr error) {
|
|
var errorMessage string
|
|
if taskErr != nil {
|
|
errorMessage = taskErr.Error()
|
|
}
|
|
|
|
completion := &worker_pb.WorkerMessage{
|
|
WorkerId: w.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskComplete{
|
|
TaskComplete: &worker_pb.TaskComplete{
|
|
TaskId: task.ID,
|
|
WorkerId: w.workerID,
|
|
Success: success,
|
|
ErrorMessage: errorMessage,
|
|
CompletionTime: time.Now().Unix(),
|
|
ResultMetadata: map[string]string{
|
|
"duration": time.Since(task.StartedAt).String(),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := w.adminStream.Send(completion); err != nil {
|
|
glog.Errorf("Failed to send task completion: %v", err)
|
|
}
|
|
}
|