Browse Source

simplify logging

add-ec-vacuum
chrislu 4 months ago
parent
commit
c7f291e4e4
  1. 221
      weed/worker/tasks/erasure_coding/ec_task.go
  2. 29
      weed/worker/types/base/task.go

221
weed/worker/tasks/erasure_coding/ec_task.go

@ -10,7 +10,6 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -70,37 +69,29 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
t.sources = params.Sources // Get unified sources
// Log detailed task information
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"server": t.server,
"collection": t.collection,
t.LogInfo("Starting erasure coding task", map[string]interface{}{
"data_shards": t.dataShards,
"parity_shards": t.parityShards,
"total_shards": t.dataShards + t.parityShards,
"targets": len(t.targets),
"sources": len(t.sources),
}).Info("Starting erasure coding task")
})
// Log detailed target server assignments
for i, target := range t.targets {
t.GetLogger().WithFields(map[string]interface{}{
t.LogInfo("Target server shard assignment", map[string]interface{}{
"target_index": i,
"server": target.Node,
"shard_ids": target.ShardIds,
"shard_count": len(target.ShardIds),
}).Info("Target server shard assignment")
})
}
// Log source information
for i, source := range t.sources {
t.GetLogger().WithFields(map[string]interface{}{
t.LogInfo("Source server information", map[string]interface{}{
"source_index": i,
"server": source.Node,
"volume_id": source.VolumeId,
"disk_id": source.DiskId,
"rack": source.Rack,
"data_center": source.DataCenter,
}).Info("Source server information")
})
}
// Use the working directory from task parameters, or fall back to a default
@ -111,11 +102,12 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
}
glog.V(1).Infof("Created working directory: %s", taskWorkDir)
t.LogInfo("Created working directory", map[string]interface{}{
"working_dir": taskWorkDir,
})
// Update the task's working directory to the specific instance directory
t.workDir = taskWorkDir
glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
// Ensure cleanup of working directory (but preserve logs)
defer func() {
@ -128,23 +120,23 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
}
for _, match := range matches {
if err := os.Remove(match); err != nil {
glog.V(2).Infof("Could not remove %s: %v", match, err)
t.LogWarning("Could not remove file during cleanup", map[string]interface{}{
"file": match,
})
}
}
}
glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir)
t.LogInfo("Cleaned up volume files from working directory")
}()
// Step 1: Mark volume readonly
t.ReportProgressWithStage(10.0, "Marking volume readonly")
t.GetLogger().Info("Marking volume readonly")
if err := t.markVolumeReadonly(); err != nil {
return fmt.Errorf("failed to mark volume readonly: %v", err)
}
// Step 2: Copy volume files to worker
t.ReportProgressWithStage(25.0, "Copying volume files to worker")
t.GetLogger().Info("Copying volume files to worker")
localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
if err != nil {
return fmt.Errorf("failed to copy volume files: %v", err)
@ -152,7 +144,6 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
// Step 3: Generate EC shards locally
t.ReportProgressWithStage(40.0, "Generating EC shards locally")
t.GetLogger().Info("Generating EC shards locally")
shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
if err != nil {
return fmt.Errorf("failed to generate EC shards: %v", err)
@ -160,28 +151,26 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
// Step 4: Distribute shards to destinations
t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations")
t.GetLogger().Info("Distributing EC shards to destinations")
if err := t.distributeEcShards(shardFiles); err != nil {
return fmt.Errorf("failed to distribute EC shards: %v", err)
}
// Step 5: Mount EC shards
t.ReportProgressWithStage(80.0, "Mounting EC shards")
t.GetLogger().Info("Mounting EC shards")
if err := t.mountEcShards(); err != nil {
return fmt.Errorf("failed to mount EC shards: %v", err)
}
// Step 6: Delete original volume
t.ReportProgressWithStage(90.0, "Deleting original volume")
t.GetLogger().Info("Deleting original volume")
if err := t.deleteOriginalVolume(); err != nil {
return fmt.Errorf("failed to delete original volume: %v", err)
}
t.ReportProgressWithStage(100.0, "EC processing complete")
glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
t.volumeID, t.server, len(shardFiles))
t.LogInfo("EC task completed successfully", map[string]interface{}{
"shards_distributed": len(shardFiles),
})
return nil
}
@ -256,11 +245,7 @@ func (t *ErasureCodingTask) markVolumeReadonly() error {
func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
localFiles := make(map[string]string)
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"source": t.server,
"working_dir": workDir,
}).Info("Starting volume file copy from source server")
t.LogInfo("Starting volume file copy from source server")
// Copy .dat file
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
@ -269,16 +254,6 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]
}
localFiles["dat"] = datFile
// Log .dat file size
if info, err := os.Stat(datFile); err == nil {
t.GetLogger().WithFields(map[string]interface{}{
"file_type": ".dat",
"file_path": datFile,
"size_bytes": info.Size(),
"size_mb": float64(info.Size()) / (1024 * 1024),
}).Info("Volume data file copied successfully")
}
// Copy .idx file
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
if err := t.copyFileFromSource(".idx", idxFile); err != nil {
@ -286,15 +261,7 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]
}
localFiles["idx"] = idxFile
// Log .idx file size
if info, err := os.Stat(idxFile); err == nil {
t.GetLogger().WithFields(map[string]interface{}{
"file_type": ".idx",
"file_path": idxFile,
"size_bytes": info.Size(),
"size_mb": float64(info.Size()) / (1024 * 1024),
}).Info("Volume index file copied successfully")
}
t.LogInfo("Volume files copied successfully")
return localFiles, nil
}
@ -340,7 +307,7 @@ func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error {
}
}
glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
// File copying is already logged at higher level
return nil
})
}
@ -358,7 +325,7 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
baseName := strings.TrimSuffix(datFile, ".dat")
shardFiles := make(map[string]string)
glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
t.LogInfo("Generating EC shards from local files")
// Generate EC shard files (.ec00 ~ .ec13)
if err := erasure_coding.WriteEcFiles(baseName); err != nil {
@ -381,27 +348,13 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
shardFiles[shardKey] = shardFile
generatedShards = append(generatedShards, shardKey)
totalShardSize += info.Size()
// Log individual shard details
t.GetLogger().WithFields(map[string]interface{}{
"shard_id": i,
"shard_type": shardKey,
"file_path": shardFile,
"size_bytes": info.Size(),
"size_kb": float64(info.Size()) / 1024,
}).Info("EC shard generated")
}
}
// Add metadata files
ecxFile := baseName + ".ecx"
if info, err := os.Stat(ecxFile); err == nil {
if _, err := os.Stat(ecxFile); err == nil {
shardFiles["ecx"] = ecxFile
t.GetLogger().WithFields(map[string]interface{}{
"file_type": "ecx",
"file_path": ecxFile,
"size_bytes": info.Size(),
}).Info("EC index file generated")
}
// Generate .vif file (volume info)
@ -410,25 +363,16 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
Version: uint32(needle.GetCurrentVersion()),
}
if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
glog.Warningf("Failed to create .vif file: %v", err)
t.LogWarning("Failed to create VIF file")
} else {
shardFiles["vif"] = vifFile
if info, err := os.Stat(vifFile); err == nil {
t.GetLogger().WithFields(map[string]interface{}{
"file_type": "vif",
"file_path": vifFile,
"size_bytes": info.Size(),
}).Info("Volume info file generated")
}
}
// Log summary of generation
t.GetLogger().WithFields(map[string]interface{}{
"total_files": len(shardFiles),
"ec_shards": len(generatedShards),
"generated_shards": generatedShards,
"total_shard_size_mb": float64(totalShardSize) / (1024 * 1024),
}).Info("EC shard generation completed")
t.LogInfo("EC shard generation completed", map[string]interface{}{
"total_shards": len(generatedShards),
"total_mb": float64(totalShardSize) / (1024 * 1024),
})
return shardFiles, nil
}
@ -481,11 +425,10 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err
// Send assigned shards to each destination
for destNode, assignedShards := range shardAssignment {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"assigned_shards": len(assignedShards),
"shard_types": assignedShards,
}).Info("Starting shard distribution to destination server")
t.LogInfo("Distributing shards to destination", map[string]interface{}{
"destination": destNode,
"shard_count": len(assignedShards),
})
// Send only the assigned shards to this destination
var transferredBytes int64
@ -495,38 +438,25 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err
return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
}
// Log file size before transfer
if info, err := os.Stat(filePath); err == nil {
transferredBytes += info.Size()
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
"file_path": filePath,
"size_bytes": info.Size(),
"size_kb": float64(info.Size()) / 1024,
}).Info("Starting shard file transfer")
}
if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
}
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
}).Info("Shard file transfer completed")
}
// Log summary for this destination
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shards_transferred": len(assignedShards),
"total_bytes": transferredBytes,
"total_mb": float64(transferredBytes) / (1024 * 1024),
}).Info("All shards distributed to destination server")
t.LogInfo("Shards distributed to destination", map[string]interface{}{
"destination": destNode,
"shard_count": len(assignedShards),
"total_mb": float64(transferredBytes) / (1024 * 1024),
})
}
glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
t.LogInfo("Successfully distributed EC shards", map[string]interface{}{
"destinations": len(shardAssignment),
})
return nil
}
@ -619,7 +549,7 @@ func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, sha
return fmt.Errorf("server error: %s", resp.Error)
}
glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
// Individual shard transfers are logged at higher level
return nil
})
}
@ -649,19 +579,8 @@ func (t *ErasureCodingTask) mountEcShards() error {
}
}
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"shard_count": len(shardIds),
"metadata_files": metadataFiles,
}).Info("Starting EC shard mount operation")
if len(shardIds) == 0 {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"metadata_files": metadataFiles,
}).Info("No EC shards to mount (only metadata files)")
continue
continue // No shards to mount, only metadata
}
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
@ -675,18 +594,10 @@ func (t *ErasureCodingTask) mountEcShards() error {
})
if err != nil {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"error": err.Error(),
}).Error("Failed to mount EC shards")
} else {
t.GetLogger().WithFields(map[string]interface{}{
t.LogWarning("Failed to mount EC shards", map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"volume_id": t.volumeID,
"collection": t.collection,
}).Info("Successfully mounted EC shards")
"shard_count": len(shardIds),
})
}
}
@ -699,27 +610,18 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error {
replicas := t.getReplicas()
if len(replicas) == 0 {
glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
replicas = []string{t.server}
}
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"replica_count": len(replicas),
"replica_servers": replicas,
}).Info("Starting original volume deletion from replica servers")
t.LogInfo("Deleting original volume from replicas", map[string]interface{}{
"replica_count": len(replicas),
})
// Delete volume from all replica locations
var deleteErrors []string
successCount := 0
for i, replicaServer := range replicas {
t.GetLogger().WithFields(map[string]interface{}{
"replica_index": i + 1,
"total_replicas": len(replicas),
"server": replicaServer,
"volume_id": t.volumeID,
}).Info("Deleting volume from replica server")
for _, replicaServer := range replicas {
err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
@ -732,37 +634,36 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error {
if err != nil {
deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
t.GetLogger().WithFields(map[string]interface{}{
"server": replicaServer,
"volume_id": t.volumeID,
"error": err.Error(),
}).Error("Failed to delete volume from replica server")
t.LogError("Failed to delete volume from replica server", map[string]interface{}{
"server": replicaServer,
"error": err.Error(),
})
} else {
successCount++
t.GetLogger().WithFields(map[string]interface{}{
"server": replicaServer,
"volume_id": t.volumeID,
}).Info("Successfully deleted volume from replica server")
// Only log individual successes for small replica sets
if len(replicas) <= 3 {
t.LogInfo("Successfully deleted volume from replica server", map[string]interface{}{
"server": replicaServer,
})
}
}
}
// Report results
if len(deleteErrors) > 0 {
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
t.LogWarning("Some volume deletions failed", map[string]interface{}{
"successful": successCount,
"failed": len(deleteErrors),
"total_replicas": len(replicas),
"success_rate": float64(successCount) / float64(len(replicas)) * 100,
"errors": deleteErrors,
}).Warning("Some volume deletions failed")
})
// Don't return error - EC task should still be considered successful if shards are mounted
} else {
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
t.LogInfo("Successfully deleted volume from all replica servers", map[string]interface{}{
"replica_count": len(replicas),
"replica_servers": replicas,
}).Info("Successfully deleted volume from all replica servers")
})
}
return nil

29
weed/worker/types/base/task.go

@ -99,6 +99,35 @@ func (t *BaseTask) GetLogger() types.Logger {
return t.logger
}
// Simple logging helpers - these replace glog.V() calls with structured logging
// LogInfo is a simple wrapper for structured info logging
func (t *BaseTask) LogInfo(message string, fields ...map[string]interface{}) {
if len(fields) > 0 {
t.logger.WithFields(fields[0]).Info(message)
} else {
t.logger.Info(message)
}
}
// LogWarning is a simple wrapper for structured warning logging
func (t *BaseTask) LogWarning(message string, fields ...map[string]interface{}) {
if len(fields) > 0 {
t.logger.WithFields(fields[0]).Warning(message)
} else {
t.logger.Warning(message)
}
}
// LogError is a simple wrapper for structured error logging
func (t *BaseTask) LogError(message string, fields ...map[string]interface{}) {
if len(fields) > 0 {
t.logger.WithFields(fields[0]).Error(message)
} else {
t.logger.Error(message)
}
}
// Execute implements the Task interface
func (t *BaseTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
// Subclasses must implement this

Loading…
Cancel
Save