diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 18f192bc9..4678ab534 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -70,37 +69,29 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP t.sources = params.Sources // Get unified sources // Log detailed task information - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "server": t.server, - "collection": t.collection, + t.LogInfo("Starting erasure coding task", map[string]interface{}{ "data_shards": t.dataShards, "parity_shards": t.parityShards, "total_shards": t.dataShards + t.parityShards, "targets": len(t.targets), "sources": len(t.sources), - }).Info("Starting erasure coding task") + }) // Log detailed target server assignments for i, target := range t.targets { - t.GetLogger().WithFields(map[string]interface{}{ + t.LogInfo("Target server shard assignment", map[string]interface{}{ "target_index": i, "server": target.Node, - "shard_ids": target.ShardIds, "shard_count": len(target.ShardIds), - }).Info("Target server shard assignment") + }) } // Log source information for i, source := range t.sources { - t.GetLogger().WithFields(map[string]interface{}{ + t.LogInfo("Source server information", map[string]interface{}{ "source_index": i, "server": source.Node, - "volume_id": source.VolumeId, - "disk_id": source.DiskId, - "rack": source.Rack, - "data_center": source.DataCenter, - }).Info("Source server information") + }) } // Use the working directory from task parameters, or fall back to a default @@ -111,11 +102,12 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP if err := os.MkdirAll(taskWorkDir, 0755); err != nil { return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) } - glog.V(1).Infof("Created working directory: %s", taskWorkDir) + t.LogInfo("Created working directory", map[string]interface{}{ + "working_dir": taskWorkDir, + }) // Update the task's working directory to the specific instance directory t.workDir = taskWorkDir - glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir) // Ensure cleanup of working directory (but preserve logs) defer func() { @@ -128,23 +120,23 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } for _, match := range matches { if err := os.Remove(match); err != nil { - glog.V(2).Infof("Could not remove %s: %v", match, err) + t.LogWarning("Could not remove file during cleanup", map[string]interface{}{ + "file": match, + }) } } } - glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir) + t.LogInfo("Cleaned up volume files from working directory") }() // Step 1: Mark volume readonly t.ReportProgressWithStage(10.0, "Marking volume readonly") - t.GetLogger().Info("Marking volume readonly") if err := t.markVolumeReadonly(); err != nil { return fmt.Errorf("failed to mark volume readonly: %v", err) } // Step 2: Copy volume files to worker t.ReportProgressWithStage(25.0, "Copying volume files to worker") - t.GetLogger().Info("Copying volume files to worker") localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) if err != nil { return fmt.Errorf("failed to copy volume files: %v", err) @@ -152,7 +144,6 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 3: Generate EC shards locally t.ReportProgressWithStage(40.0, "Generating EC shards locally") - t.GetLogger().Info("Generating EC shards locally") shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir) if err != nil { return fmt.Errorf("failed to generate EC shards: %v", err) @@ -160,28 +151,26 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP // Step 4: Distribute shards to destinations t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations") - t.GetLogger().Info("Distributing EC shards to destinations") if err := t.distributeEcShards(shardFiles); err != nil { return fmt.Errorf("failed to distribute EC shards: %v", err) } // Step 5: Mount EC shards t.ReportProgressWithStage(80.0, "Mounting EC shards") - t.GetLogger().Info("Mounting EC shards") if err := t.mountEcShards(); err != nil { return fmt.Errorf("failed to mount EC shards: %v", err) } // Step 6: Delete original volume t.ReportProgressWithStage(90.0, "Deleting original volume") - t.GetLogger().Info("Deleting original volume") if err := t.deleteOriginalVolume(); err != nil { return fmt.Errorf("failed to delete original volume: %v", err) } t.ReportProgressWithStage(100.0, "EC processing complete") - glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed", - t.volumeID, t.server, len(shardFiles)) + t.LogInfo("EC task completed successfully", map[string]interface{}{ + "shards_distributed": len(shardFiles), + }) return nil } @@ -256,11 +245,7 @@ func (t *ErasureCodingTask) markVolumeReadonly() error { func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { localFiles := make(map[string]string) - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "source": t.server, - "working_dir": workDir, - }).Info("Starting volume file copy from source server") + t.LogInfo("Starting volume file copy from source server") // Copy .dat file datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) @@ -269,16 +254,6 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] } localFiles["dat"] = datFile - // Log .dat file size - if info, err := os.Stat(datFile); err == nil { - t.GetLogger().WithFields(map[string]interface{}{ - "file_type": ".dat", - "file_path": datFile, - "size_bytes": info.Size(), - "size_mb": float64(info.Size()) / (1024 * 1024), - }).Info("Volume data file copied successfully") - } - // Copy .idx file idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) if err := t.copyFileFromSource(".idx", idxFile); err != nil { @@ -286,15 +261,7 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] } localFiles["idx"] = idxFile - // Log .idx file size - if info, err := os.Stat(idxFile); err == nil { - t.GetLogger().WithFields(map[string]interface{}{ - "file_type": ".idx", - "file_path": idxFile, - "size_bytes": info.Size(), - "size_mb": float64(info.Size()) / (1024 * 1024), - }).Info("Volume index file copied successfully") - } + t.LogInfo("Volume files copied successfully") return localFiles, nil } @@ -340,7 +307,7 @@ func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { } } - glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath) + // File copying is already logged at higher level return nil }) } @@ -358,7 +325,7 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string baseName := strings.TrimSuffix(datFile, ".dat") shardFiles := make(map[string]string) - glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile) + t.LogInfo("Generating EC shards from local files") // Generate EC shard files (.ec00 ~ .ec13) if err := erasure_coding.WriteEcFiles(baseName); err != nil { @@ -381,27 +348,13 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string shardFiles[shardKey] = shardFile generatedShards = append(generatedShards, shardKey) totalShardSize += info.Size() - - // Log individual shard details - t.GetLogger().WithFields(map[string]interface{}{ - "shard_id": i, - "shard_type": shardKey, - "file_path": shardFile, - "size_bytes": info.Size(), - "size_kb": float64(info.Size()) / 1024, - }).Info("EC shard generated") } } // Add metadata files ecxFile := baseName + ".ecx" - if info, err := os.Stat(ecxFile); err == nil { + if _, err := os.Stat(ecxFile); err == nil { shardFiles["ecx"] = ecxFile - t.GetLogger().WithFields(map[string]interface{}{ - "file_type": "ecx", - "file_path": ecxFile, - "size_bytes": info.Size(), - }).Info("EC index file generated") } // Generate .vif file (volume info) @@ -410,25 +363,16 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string Version: uint32(needle.GetCurrentVersion()), } if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil { - glog.Warningf("Failed to create .vif file: %v", err) + t.LogWarning("Failed to create VIF file") } else { shardFiles["vif"] = vifFile - if info, err := os.Stat(vifFile); err == nil { - t.GetLogger().WithFields(map[string]interface{}{ - "file_type": "vif", - "file_path": vifFile, - "size_bytes": info.Size(), - }).Info("Volume info file generated") - } } // Log summary of generation - t.GetLogger().WithFields(map[string]interface{}{ - "total_files": len(shardFiles), - "ec_shards": len(generatedShards), - "generated_shards": generatedShards, - "total_shard_size_mb": float64(totalShardSize) / (1024 * 1024), - }).Info("EC shard generation completed") + t.LogInfo("EC shard generation completed", map[string]interface{}{ + "total_shards": len(generatedShards), + "total_mb": float64(totalShardSize) / (1024 * 1024), + }) return shardFiles, nil } @@ -481,11 +425,10 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err // Send assigned shards to each destination for destNode, assignedShards := range shardAssignment { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "assigned_shards": len(assignedShards), - "shard_types": assignedShards, - }).Info("Starting shard distribution to destination server") + t.LogInfo("Distributing shards to destination", map[string]interface{}{ + "destination": destNode, + "shard_count": len(assignedShards), + }) // Send only the assigned shards to this destination var transferredBytes int64 @@ -495,38 +438,25 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode) } - // Log file size before transfer if info, err := os.Stat(filePath); err == nil { transferredBytes += info.Size() - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_type": shardType, - "file_path": filePath, - "size_bytes": info.Size(), - "size_kb": float64(info.Size()) / 1024, - }).Info("Starting shard file transfer") } if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil { return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err) } - - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_type": shardType, - }).Info("Shard file transfer completed") } - // Log summary for this destination - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shards_transferred": len(assignedShards), - "total_bytes": transferredBytes, - "total_mb": float64(transferredBytes) / (1024 * 1024), - }).Info("All shards distributed to destination server") + t.LogInfo("Shards distributed to destination", map[string]interface{}{ + "destination": destNode, + "shard_count": len(assignedShards), + "total_mb": float64(transferredBytes) / (1024 * 1024), + }) } - glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) + t.LogInfo("Successfully distributed EC shards", map[string]interface{}{ + "destinations": len(shardAssignment), + }) return nil } @@ -619,7 +549,7 @@ func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, sha return fmt.Errorf("server error: %s", resp.Error) } - glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer) + // Individual shard transfers are logged at higher level return nil }) } @@ -649,19 +579,8 @@ func (t *ErasureCodingTask) mountEcShards() error { } } - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_ids": shardIds, - "shard_count": len(shardIds), - "metadata_files": metadataFiles, - }).Info("Starting EC shard mount operation") - if len(shardIds) == 0 { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "metadata_files": metadataFiles, - }).Info("No EC shards to mount (only metadata files)") - continue + continue // No shards to mount, only metadata } err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(), @@ -675,18 +594,10 @@ func (t *ErasureCodingTask) mountEcShards() error { }) if err != nil { - t.GetLogger().WithFields(map[string]interface{}{ - "destination": destNode, - "shard_ids": shardIds, - "error": err.Error(), - }).Error("Failed to mount EC shards") - } else { - t.GetLogger().WithFields(map[string]interface{}{ + t.LogWarning("Failed to mount EC shards", map[string]interface{}{ "destination": destNode, - "shard_ids": shardIds, - "volume_id": t.volumeID, - "collection": t.collection, - }).Info("Successfully mounted EC shards") + "shard_count": len(shardIds), + }) } } @@ -699,27 +610,18 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { replicas := t.getReplicas() if len(replicas) == 0 { - glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID) replicas = []string{t.server} } - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, - "replica_count": len(replicas), - "replica_servers": replicas, - }).Info("Starting original volume deletion from replica servers") + t.LogInfo("Deleting original volume from replicas", map[string]interface{}{ + "replica_count": len(replicas), + }) // Delete volume from all replica locations var deleteErrors []string successCount := 0 - for i, replicaServer := range replicas { - t.GetLogger().WithFields(map[string]interface{}{ - "replica_index": i + 1, - "total_replicas": len(replicas), - "server": replicaServer, - "volume_id": t.volumeID, - }).Info("Deleting volume from replica server") + for _, replicaServer := range replicas { err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(), func(client volume_server_pb.VolumeServerClient) error { @@ -732,37 +634,36 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { if err != nil { deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err)) - t.GetLogger().WithFields(map[string]interface{}{ - "server": replicaServer, - "volume_id": t.volumeID, - "error": err.Error(), - }).Error("Failed to delete volume from replica server") + t.LogError("Failed to delete volume from replica server", map[string]interface{}{ + "server": replicaServer, + "error": err.Error(), + }) } else { successCount++ - t.GetLogger().WithFields(map[string]interface{}{ - "server": replicaServer, - "volume_id": t.volumeID, - }).Info("Successfully deleted volume from replica server") + // Only log individual successes for small replica sets + if len(replicas) <= 3 { + t.LogInfo("Successfully deleted volume from replica server", map[string]interface{}{ + "server": replicaServer, + }) + } } } // Report results if len(deleteErrors) > 0 { - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, + t.LogWarning("Some volume deletions failed", map[string]interface{}{ "successful": successCount, "failed": len(deleteErrors), "total_replicas": len(replicas), "success_rate": float64(successCount) / float64(len(replicas)) * 100, "errors": deleteErrors, - }).Warning("Some volume deletions failed") + }) // Don't return error - EC task should still be considered successful if shards are mounted } else { - t.GetLogger().WithFields(map[string]interface{}{ - "volume_id": t.volumeID, + t.LogInfo("Successfully deleted volume from all replica servers", map[string]interface{}{ "replica_count": len(replicas), "replica_servers": replicas, - }).Info("Successfully deleted volume from all replica servers") + }) } return nil diff --git a/weed/worker/types/base/task.go b/weed/worker/types/base/task.go index 243df5630..41f6c60d3 100644 --- a/weed/worker/types/base/task.go +++ b/weed/worker/types/base/task.go @@ -99,6 +99,35 @@ func (t *BaseTask) GetLogger() types.Logger { return t.logger } +// Simple logging helpers - these replace glog.V() calls with structured logging + +// LogInfo is a simple wrapper for structured info logging +func (t *BaseTask) LogInfo(message string, fields ...map[string]interface{}) { + if len(fields) > 0 { + t.logger.WithFields(fields[0]).Info(message) + } else { + t.logger.Info(message) + } +} + +// LogWarning is a simple wrapper for structured warning logging +func (t *BaseTask) LogWarning(message string, fields ...map[string]interface{}) { + if len(fields) > 0 { + t.logger.WithFields(fields[0]).Warning(message) + } else { + t.logger.Warning(message) + } +} + +// LogError is a simple wrapper for structured error logging +func (t *BaseTask) LogError(message string, fields ...map[string]interface{}) { + if len(fields) > 0 { + t.logger.WithFields(fields[0]).Error(message) + } else { + t.logger.Error(message) + } +} + // Execute implements the Task interface func (t *BaseTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { // Subclasses must implement this