Browse Source

refactor ec shard distribution (#8465)

* refactor ec shard distribution

* fix shard assignment merge and mount errors

* fix mount error aggregation scope

* make WithFields compatible and wrap errors
pull/8473/head
Chris Lu 3 days ago
committed by GitHub
parent
commit
7354fa87f1
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 367
      weed/storage/erasure_coding/shard_distribution.go
  2. 265
      weed/worker/tasks/erasure_coding/ec_task.go

367
weed/storage/erasure_coding/shard_distribution.go

@ -0,0 +1,367 @@
package erasure_coding
import (
"context"
"errors"
"fmt"
"io"
"os"
"reflect"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"google.golang.org/grpc"
)
func ensureLogger(logger logger) logger {
if logger == nil {
return &glogFallbackLogger{}
}
return logger
}
type logger interface {
Info(string, ...interface{})
Warning(string, ...interface{})
Error(string, ...interface{})
}
type withFieldLogger interface {
WithFields(map[string]interface{}) logger
}
func withFields(log logger, fields map[string]interface{}) logger {
if fields == nil {
return log
}
if wf, ok := log.(withFieldLogger); ok {
return wf.WithFields(fields)
}
val := reflect.ValueOf(log)
method := val.MethodByName("WithFields")
if !method.IsValid() {
return log
}
methodType := method.Type()
if methodType.NumIn() != 1 || methodType.NumOut() != 1 {
return log
}
argType := methodType.In(0)
mapType := reflect.TypeOf(map[string]interface{}{})
var arg reflect.Value
if mapType.AssignableTo(argType) {
arg = reflect.ValueOf(fields)
} else if mapType.ConvertibleTo(argType) {
arg = reflect.ValueOf(fields).Convert(argType)
} else {
return log
}
result := method.Call([]reflect.Value{arg})[0]
if result.IsValid() && result.CanInterface() {
if enhanced, ok := result.Interface().(logger); ok {
return enhanced
}
}
return log
}
type glogFallbackLogger struct{}
func (g *glogFallbackLogger) Info(msg string, args ...interface{}) {
if len(args) > 0 {
glog.Infof(msg, args...)
return
}
glog.Info(msg)
}
func (g *glogFallbackLogger) Warning(msg string, args ...interface{}) {
if len(args) > 0 {
glog.Warningf(msg, args...)
return
}
glog.Warning(msg)
}
func (g *glogFallbackLogger) Error(msg string, args ...interface{}) {
if len(args) > 0 {
glog.Errorf(msg, args...)
return
}
glog.Error(msg)
}
// DistributeEcShards distributes locally generated EC shards to destination servers.
// Returns the shard assignment map used for mounting.
func DistributeEcShards(volumeID uint32, collection string, targets []*worker_pb.TaskTarget, shardFiles map[string]string, dialOption grpc.DialOption, logger logger) (map[string][]string, error) {
if len(targets) == 0 {
return nil, fmt.Errorf("no targets specified for EC shard distribution")
}
if len(shardFiles) == 0 {
return nil, fmt.Errorf("no shard files available for distribution")
}
log := ensureLogger(logger)
shardAssignment := make(map[string][]string)
for _, target := range targets {
if len(target.ShardIds) == 0 {
continue
}
var assignedShards []string
for _, shardId := range target.ShardIds {
shardType := fmt.Sprintf("ec%02d", shardId)
assignedShards = append(assignedShards, shardType)
}
if len(assignedShards) > 0 {
if _, hasEcx := shardFiles["ecx"]; hasEcx {
assignedShards = append(assignedShards, "ecx")
}
if _, hasEcj := shardFiles["ecj"]; hasEcj {
assignedShards = append(assignedShards, "ecj")
}
if _, hasVif := shardFiles["vif"]; hasVif {
assignedShards = append(assignedShards, "vif")
}
}
existing := shardAssignment[target.Node]
if len(existing) == 0 {
shardAssignment[target.Node] = assignedShards
continue
}
seen := make(map[string]struct{}, len(existing))
for _, shard := range existing {
seen[shard] = struct{}{}
}
for _, shard := range assignedShards {
if _, ok := seen[shard]; ok {
continue
}
seen[shard] = struct{}{}
existing = append(existing, shard)
}
shardAssignment[target.Node] = existing
}
if len(shardAssignment) == 0 {
return nil, fmt.Errorf("no shard assignments found from planning phase")
}
for destNode, assignedShards := range shardAssignment {
withFields(log, map[string]interface{}{
"destination": destNode,
"assigned_shards": len(assignedShards),
"shard_types": assignedShards,
}).Info("Starting shard distribution to destination server")
var transferredBytes int64
for _, shardType := range assignedShards {
filePath, exists := shardFiles[shardType]
if !exists {
return nil, fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
}
if info, err := os.Stat(filePath); err == nil {
transferredBytes += info.Size()
withFields(log, map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
"file_path": filePath,
"size_bytes": info.Size(),
"size_kb": float64(info.Size()) / 1024,
}).Info("Starting shard file transfer")
}
if err := sendShardFileToDestination(volumeID, collection, dialOption, destNode, filePath, shardType); err != nil {
return nil, fmt.Errorf("failed to send %s to %s: %w", shardType, destNode, err)
}
withFields(log, map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
}).Info("Shard file transfer completed")
}
withFields(log, map[string]interface{}{
"destination": destNode,
"shards_transferred": len(assignedShards),
"total_bytes": transferredBytes,
"total_mb": float64(transferredBytes) / (1024 * 1024),
}).Info("All shards distributed to destination server")
}
glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
return shardAssignment, nil
}
// MountEcShards mounts EC shards on destination servers using an assignment map.
func MountEcShards(volumeID uint32, collection string, shardAssignment map[string][]string, dialOption grpc.DialOption, logger logger) error {
if shardAssignment == nil {
return fmt.Errorf("shard assignment not available for mounting")
}
log := ensureLogger(logger)
var mountErrors []error
for destNode, assignedShards := range shardAssignment {
var shardIds []uint32
var metadataFiles []string
for _, shardType := range assignedShards {
if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
var shardId uint32
if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
shardIds = append(shardIds, shardId)
}
} else {
metadataFiles = append(metadataFiles, shardType)
}
}
withFields(log, map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"shard_count": len(shardIds),
"metadata_files": metadataFiles,
}).Info("Starting EC shard mount operation")
if len(shardIds) == 0 {
withFields(log, map[string]interface{}{
"destination": destNode,
"metadata_files": metadataFiles,
}).Info("No EC shards to mount (only metadata files)")
continue
}
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID,
Collection: collection,
ShardIds: shardIds,
})
return mountErr
})
if err != nil {
mountErrors = append(mountErrors, fmt.Errorf("mount %s shards %v: %w", destNode, shardIds, err))
withFields(log, map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"error": err.Error(),
}).Error("Failed to mount EC shards")
} else {
withFields(log, map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"volume_id": volumeID,
"collection": collection,
}).Info("Successfully mounted EC shards")
}
}
if len(mountErrors) > 0 {
return errors.Join(mountErrors...)
}
return nil
}
// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API.
func sendShardFileToDestination(volumeID uint32, collection string, dialOption grpc.DialOption, destServer, filePath, shardType string) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
}
var ext string
var shardId uint32
if shardType == "ecx" {
ext = ".ecx"
shardId = 0
} else if shardType == "ecj" {
ext = ".ecj"
shardId = 0
} else if shardType == "vif" {
ext = ".vif"
shardId = 0
} else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
ext = "." + shardType
fmt.Sscanf(shardType[2:], "%d", &shardId)
} else {
return fmt.Errorf("unknown shard type: %s", shardType)
}
stream, err := client.ReceiveFile(context.Background())
if err != nil {
return fmt.Errorf("failed to create receive stream: %v", err)
}
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_Info{
Info: &volume_server_pb.ReceiveFileInfo{
VolumeId: volumeID,
Ext: ext,
Collection: collection,
IsEcVolume: true,
ShardId: shardId,
FileSize: uint64(fileInfo.Size()),
},
},
})
if err != nil {
return fmt.Errorf("failed to send file info: %v", err)
}
buffer := make([]byte, 64*1024)
for {
n, readErr := file.Read(buffer)
if n > 0 {
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_FileContent{
FileContent: buffer[:n],
},
})
if err != nil {
return fmt.Errorf("failed to send file content: %v", err)
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
return fmt.Errorf("failed to read file: %v", readErr)
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("failed to close stream: %v", err)
}
if resp.Error != "" {
return fmt.Errorf("server error: %s", resp.Error)
}
glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
return nil
})
}

265
weed/worker/tasks/erasure_coding/ec_task.go

@ -413,6 +413,16 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
}).Info("EC index file generated")
}
ecjFile := baseName + ".ecj"
if info, err := os.Stat(ecjFile); err == nil {
shardFiles["ecj"] = ecjFile
t.GetLogger().WithFields(map[string]interface{}{
"file_type": "ecj",
"file_path": ecjFile,
"size_bytes": info.Size(),
}).Info("EC journal file generated")
}
// Generate .vif file (volume info)
vifFile := baseName + ".vif"
volumeInfo := &volume_server_pb.VolumeInfo{
@ -444,262 +454,17 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
// distributeEcShards distributes locally generated EC shards to destination servers
// using pre-assigned shard IDs from planning phase
func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
if len(t.targets) == 0 {
return fmt.Errorf("no targets specified for EC shard distribution")
}
if len(shardFiles) == 0 {
return fmt.Errorf("no shard files available for distribution")
}
// Build shard assignment from pre-assigned target shard IDs (from planning phase)
shardAssignment := make(map[string][]string)
for _, target := range t.targets {
if len(target.ShardIds) == 0 {
continue // Skip targets with no assigned shards
}
var assignedShards []string
// Convert shard IDs to shard file names (e.g., 0 → "ec00", 1 → "ec01")
for _, shardId := range target.ShardIds {
shardType := fmt.Sprintf("ec%02d", shardId)
assignedShards = append(assignedShards, shardType)
}
// Add metadata files (.ecx, .vif) to targets that have shards
if len(assignedShards) > 0 {
if _, hasEcx := shardFiles["ecx"]; hasEcx {
assignedShards = append(assignedShards, "ecx")
}
if _, hasVif := shardFiles["vif"]; hasVif {
assignedShards = append(assignedShards, "vif")
}
}
shardAssignment[target.Node] = assignedShards
}
if len(shardAssignment) == 0 {
return fmt.Errorf("no shard assignments found from planning phase")
}
// Store assignment for use during mounting
t.shardAssignment = shardAssignment
// Send assigned shards to each destination
for destNode, assignedShards := range shardAssignment {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"assigned_shards": len(assignedShards),
"shard_types": assignedShards,
}).Info("Starting shard distribution to destination server")
// Send only the assigned shards to this destination
var transferredBytes int64
for _, shardType := range assignedShards {
filePath, exists := shardFiles[shardType]
if !exists {
return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
}
// Log file size before transfer
if info, err := os.Stat(filePath); err == nil {
transferredBytes += info.Size()
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
"file_path": filePath,
"size_bytes": info.Size(),
"size_kb": float64(info.Size()) / 1024,
}).Info("Starting shard file transfer")
}
if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
}
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_type": shardType,
}).Info("Shard file transfer completed")
}
// Log summary for this destination
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shards_transferred": len(assignedShards),
"total_bytes": transferredBytes,
"total_mb": float64(transferredBytes) / (1024 * 1024),
}).Info("All shards distributed to destination server")
assignment, err := erasure_coding.DistributeEcShards(t.volumeID, t.collection, t.targets, shardFiles, t.grpcDialOption, t.GetLogger())
if err != nil {
return err
}
glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
t.shardAssignment = assignment
return nil
}
// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error {
return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
// Open the local shard file
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
}
defer file.Close()
// Get file size
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
}
// Determine file extension and shard ID
var ext string
var shardId uint32
if shardType == "ecx" {
ext = ".ecx"
shardId = 0 // ecx file doesn't have a specific shard ID
} else if shardType == "vif" {
ext = ".vif"
shardId = 0 // vif file doesn't have a specific shard ID
} else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
// EC shard file like "ec00", "ec01", etc.
ext = "." + shardType
fmt.Sscanf(shardType[2:], "%d", &shardId)
} else {
return fmt.Errorf("unknown shard type: %s", shardType)
}
// Create streaming client
stream, err := client.ReceiveFile(context.Background())
if err != nil {
return fmt.Errorf("failed to create receive stream: %v", err)
}
// Send file info first
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_Info{
Info: &volume_server_pb.ReceiveFileInfo{
VolumeId: t.volumeID,
Ext: ext,
Collection: t.collection,
IsEcVolume: true,
ShardId: shardId,
FileSize: uint64(fileInfo.Size()),
},
},
})
if err != nil {
return fmt.Errorf("failed to send file info: %v", err)
}
// Send file content in chunks
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, readErr := file.Read(buffer)
if n > 0 {
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_FileContent{
FileContent: buffer[:n],
},
})
if err != nil {
return fmt.Errorf("failed to send file content: %v", err)
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
return fmt.Errorf("failed to read file: %v", readErr)
}
}
// Close stream and get response
resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("failed to close stream: %v", err)
}
if resp.Error != "" {
return fmt.Errorf("server error: %s", resp.Error)
}
glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
return nil
})
}
// mountEcShards mounts EC shards on destination servers
func (t *ErasureCodingTask) mountEcShards() error {
if t.shardAssignment == nil {
return fmt.Errorf("shard assignment not available for mounting")
}
// Mount only assigned shards on each destination
for destNode, assignedShards := range t.shardAssignment {
// Convert shard names to shard IDs for mounting
var shardIds []uint32
var metadataFiles []string
for _, shardType := range assignedShards {
// Skip metadata files (.ecx, .vif) - only mount EC shards
if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
// Parse shard ID from "ec00", "ec01", etc.
var shardId uint32
if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
shardIds = append(shardIds, shardId)
}
} else {
metadataFiles = append(metadataFiles, shardType)
}
}
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"shard_count": len(shardIds),
"metadata_files": metadataFiles,
}).Info("Starting EC shard mount operation")
if len(shardIds) == 0 {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"metadata_files": metadataFiles,
}).Info("No EC shards to mount (only metadata files)")
continue
}
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), t.grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: shardIds,
})
return mountErr
})
if err != nil {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"error": err.Error(),
}).Error("Failed to mount EC shards")
} else {
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"shard_ids": shardIds,
"volume_id": t.volumeID,
"collection": t.collection,
}).Info("Successfully mounted EC shards")
}
}
return nil
return erasure_coding.MountEcShards(t.volumeID, t.collection, t.shardAssignment, t.grpcDialOption, t.GetLogger())
}
// deleteOriginalVolume deletes the original volume and all its replicas from all servers

Loading…
Cancel
Save