Browse Source

detect shard sources

add-ec-vacuum
chrislu 5 months ago
parent
commit
5c93557314
  1. 73
      weed/worker/tasks/ec_vacuum/detection.go
  2. 47
      weed/worker/tasks/ec_vacuum/ec_vacuum_task.go
  3. 71
      weed/worker/tasks/ec_vacuum/register.go

73
weed/worker/tasks/ec_vacuum/detection.go

@ -88,20 +88,20 @@ func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo,
VacuumParams: &worker_pb.VacuumTaskParams{
GarbageThreshold: deletionRatio,
ForceVacuum: false,
BatchSize: 1000, // Default batch size
WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead
VerifyChecksum: true, // Enable checksum verification for safety
BatchSize: 1000, // Default batch size
WorkingDir: "/data/ec_vacuum", // Default base directory - worker may use BaseWorkingDir/ec_vacuum instead
VerifyChecksum: true, // Enable checksum verification for safety
},
},
}
result := &wtypes.TaskDetectionResult{
TaskID: taskID,
TaskType: wtypes.TaskType("ec_vacuum"),
VolumeID: volumeID,
Server: ecInfo.PrimaryNode,
Collection: ecInfo.Collection,
Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent
TaskID: taskID,
TaskType: wtypes.TaskType("ec_vacuum"),
VolumeID: volumeID,
Server: ecInfo.PrimaryNode,
Collection: ecInfo.Collection,
Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent
Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%.1fh), size=%.1fMB (>%dMB)",
deletionRatio*100, ecVacuumConfig.DeletionThreshold*100,
ecInfo.Age.Hours(), (time.Duration(ecVacuumConfig.MinVolumeAgeSeconds) * time.Second).Hours(),
@ -255,13 +255,18 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol
ecVolumeInfo.ShardNodes[serverAddr] = erasure_coding.ShardBits(0)
}
// Add all shards on this disk for this volume
for i := 0; i < len(ecShardInfo.ShardSizes); i++ {
ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i))
// Add shards based on actual EcIndexBits, not ShardSizes length
ecIndexBits := ecShardInfo.EcIndexBits
actualShards := make([]int, 0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if (ecIndexBits & (1 << uint(i))) != 0 {
ecVolumeInfo.ShardNodes[serverAddr] = ecVolumeInfo.ShardNodes[serverAddr].AddShardId(erasure_coding.ShardId(i))
actualShards = append(actualShards, i)
}
}
glog.V(3).Infof("EC volume %d: found %d shards on server %s",
volumeID, len(ecShardInfo.ShardSizes), node.Id)
glog.V(2).Infof("EC volume %d: found shards %v on server %s (EcIndexBits=0x%x)",
volumeID, actualShards, node.Id, ecIndexBits)
}
}
}
@ -271,12 +276,19 @@ func populateShardInfo(ecVolumes map[uint32]*EcVolumeInfo, activeTopology *topol
// Log shard distribution summary
for volumeID, ecInfo := range ecVolumes {
totalShards := 0
for _, shardBits := range ecInfo.ShardNodes {
totalShards += shardBits.ShardIdCount()
shardDistribution := make(map[string][]int)
for serverAddr, shardBits := range ecInfo.ShardNodes {
shards := make([]int, 0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if shardBits.HasShardId(erasure_coding.ShardId(i)) {
shards = append(shards, i)
}
}
if len(shards) > 0 {
shardDistribution[string(serverAddr)] = shards
}
}
glog.V(2).Infof("EC volume %d: total shards=%d across %d servers",
volumeID, totalShards, len(ecInfo.ShardNodes))
glog.V(1).Infof("EC volume %d shard distribution: %+v", volumeID, shardDistribution)
}
}
@ -305,15 +317,26 @@ func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) b
return false
}
// Check if we have enough shards for vacuum operation
totalShards := 0
// Check if we have all required data shards (0-9) for vacuum operation
availableDataShards := make(map[int]bool)
for _, shardBits := range ecInfo.ShardNodes {
totalShards += shardBits.ShardIdCount()
for i := 0; i < erasure_coding.DataShardsCount; i++ {
if shardBits.HasShardId(erasure_coding.ShardId(i)) {
availableDataShards[i] = true
}
}
}
missingDataShards := make([]int, 0)
for i := 0; i < erasure_coding.DataShardsCount; i++ {
if !availableDataShards[i] {
missingDataShards = append(missingDataShards, i)
}
}
if totalShards < erasure_coding.DataShardsCount {
glog.V(3).Infof("EC volume %d insufficient shards for vacuum: have=%d, need=%d",
ecInfo.VolumeID, totalShards, erasure_coding.DataShardsCount)
if len(missingDataShards) > 0 {
glog.V(1).Infof("EC volume %d incomplete for vacuum: missing data shards %v (need shards 0-%d)",
ecInfo.VolumeID, missingDataShards, erasure_coding.DataShardsCount-1)
return false
}

47
weed/worker/tasks/ec_vacuum/ec_vacuum_task.go

@ -136,10 +136,16 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
}
}
// Validate we found a target node
if targetNode == "" || maxShardCount == 0 {
return "", fmt.Errorf("no valid target node found: sourceNodes=%d, maxShardCount=%d", len(t.sourceNodes), maxShardCount)
}
t.LogInfo("Selected target node for shard collection", map[string]interface{}{
"target_node": targetNode,
"existing_bits": existingEcIndexBits,
"shard_count": maxShardCount,
"target_node": targetNode,
"existing_bits": existingEcIndexBits,
"shard_count": maxShardCount,
"existing_shards": existingEcIndexBits.ShardIds(),
})
// Copy missing shards to target node
@ -196,6 +202,41 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
existingEcIndexBits = existingEcIndexBits.Plus(needToCopyBits)
}
// Validate that we have all required data shards (0-9) for decoding
missingDataShards := make([]int, 0)
for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
if !existingEcIndexBits.HasShardId(erasure_coding.ShardId(shardId)) {
missingDataShards = append(missingDataShards, shardId)
}
}
if len(missingDataShards) > 0 {
// Log all available shards across all source nodes for debugging
allAvailableShards := make(map[int][]string)
for node, shardBits := range t.sourceNodes {
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if shardBits.HasShardId(erasure_coding.ShardId(shardId)) {
allAvailableShards[shardId] = append(allAvailableShards[shardId], string(node))
}
}
}
t.LogInfo("ERROR: Missing required data shards for decoding", map[string]interface{}{
"volume_id": t.volumeID,
"missing_shards": missingDataShards,
"collected_shards": existingEcIndexBits.ShardIds(),
"all_available_shards": allAvailableShards,
})
return "", fmt.Errorf("missing required data shards %v for EC volume %d decoding", missingDataShards, t.volumeID)
}
t.LogInfo("Successfully collected all required data shards", map[string]interface{}{
"volume_id": t.volumeID,
"target_node": targetNode,
"collected_shards": existingEcIndexBits.ShardIds(),
})
return targetNode, nil
}

71
weed/worker/tasks/ec_vacuum/register.go

@ -49,11 +49,76 @@ func RegisterEcVacuumTask() {
}
// Parse source nodes from task parameters
glog.Infof("Creating EC vacuum task for volume %d with %d sources", params.VolumeId, len(params.Sources))
// Log raw source data for debugging
for i, source := range params.Sources {
glog.Infof("Raw source %d: node=%s, shardIds=%v", i, source.Node, source.ShardIds)
}
sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits)
// For now, we'll collect source nodes during execution since the exact
// EC shard distribution is determined dynamically during detection
// The task will discover and collect all shards during execution
// Populate source nodes from the task parameters
for _, source := range params.Sources {
if source.Node == "" {
continue
}
serverAddr := pb.ServerAddress(source.Node)
var shardBits erasure_coding.ShardBits
// Convert shard IDs to ShardBits
for _, shardId := range source.ShardIds {
if shardId < erasure_coding.TotalShardsCount {
shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId))
}
}
if shardBits.ShardIdCount() > 0 {
sourceNodes[serverAddr] = shardBits
}
}
// Verify we have source nodes
if len(sourceNodes) == 0 {
return nil, fmt.Errorf("no valid source nodes found for EC vacuum task: sources=%d", len(params.Sources))
}
// Log detailed shard distribution for debugging
shardDistribution := make(map[string][]int)
for serverAddr, shardBits := range sourceNodes {
shardDistribution[string(serverAddr)] = make([]int, 0)
for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
if shardBits.HasShardId(erasure_coding.ShardId(shardId)) {
shardDistribution[string(serverAddr)] = append(shardDistribution[string(serverAddr)], shardId)
}
}
}
// Validate that we have all required data shards
allShards := make(map[int]bool)
for _, shardBits := range sourceNodes {
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if shardBits.HasShardId(erasure_coding.ShardId(i)) {
allShards[i] = true
}
}
}
missingShards := make([]int, 0)
for i := 0; i < erasure_coding.DataShardsCount; i++ {
if !allShards[i] {
missingShards = append(missingShards, i)
}
}
if len(missingShards) > 0 {
glog.Warningf("EC vacuum task for volume %d has missing data shards %v - this should not happen! Distribution: %+v",
params.VolumeId, missingShards, shardDistribution)
} else {
glog.Infof("EC vacuum task created for volume %d with complete data shards. Distribution: %+v",
params.VolumeId, shardDistribution)
}
return NewEcVacuumTask(
fmt.Sprintf("ec_vacuum-%d", params.VolumeId),

Loading…
Cancel
Save