diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 0c4e8e148..70ec51be8 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -332,9 +332,11 @@ func emitErasureCodingDetectionDecisionTrace( } if metric.Age < quietThreshold { skippedQuietTime++ + continue } if metric.FullnessRatio < taskConfig.FullnessRatio { skippedFullness++ + continue } } @@ -482,6 +484,7 @@ func (h *ErasureCodingHandler) Execute( params.Sources[0].Node, params.VolumeId, params.Collection, + h.grpcDialOption, ) task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("erasure coding progress %.0f%%", progress) diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 4872e0538..ec0ebf056 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -403,6 +403,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ params.Sources[0].Node, params.VolumeId, params.Collection, + h.grpcDialOption, ) task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("vacuum progress %.0f%%", progress) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index b976b323d..0b892a30b 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -517,6 +517,7 @@ func (h *VolumeBalanceHandler) Execute( params.Sources[0].Node, params.VolumeId, params.Collection, + h.grpcDialOption, ) task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("balance progress %.0f%%", progress) diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go index e36885add..ae7af01f3 100644 --- a/weed/worker/tasks/balance/balance_task.go +++ b/weed/worker/tasks/balance/balance_task.go @@ -21,19 +21,21 @@ import ( // BalanceTask implements the Task interface type BalanceTask struct { *base.BaseTask - server string - volumeID uint32 - collection string - progress float64 + server string + volumeID uint32 + collection string + progress float64 + grpcDialOption grpc.DialOption } // NewBalanceTask creates a new balance task instance -func NewBalanceTask(id string, server string, volumeID uint32, collection string) *BalanceTask { +func NewBalanceTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *BalanceTask { return &BalanceTask{ - BaseTask: base.NewBaseTask(id, types.TaskTypeBalance), - server: server, - volumeID: volumeID, - collection: collection, + BaseTask: base.NewBaseTask(id, types.TaskTypeBalance), + server: server, + volumeID: volumeID, + collection: collection, + grpcDialOption: grpcDialOption, } } @@ -115,7 +117,7 @@ func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) t.ReportProgress(100.0) glog.Infof("Balance task completed successfully: volume %d moved from %s to %s", - t.volumeID, t.server, destNode) + t.volumeID, sourceNode, destNode) return nil } @@ -164,7 +166,7 @@ func (t *BalanceTask) GetProgress() float64 { // markVolumeReadonly marks the volume readonly func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, server, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), @@ -177,7 +179,7 @@ func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needl func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) { var lastAppendAtNs uint64 - err := operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(), + err := operation.WithVolumeServerClient(true, targetServer, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), @@ -213,7 +215,7 @@ func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, vo // mountVolume mounts the volume on the target server func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, server, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), @@ -224,7 +226,7 @@ func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.Volum // tailVolume syncs remaining updates from source to target func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error { - return operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(), + return operation.WithVolumeServerClient(true, targetServer, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), @@ -236,20 +238,9 @@ func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, vo }) } -// unmountVolume unmounts the volume from the server -func (t *BalanceTask) unmountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ - VolumeId: uint32(volumeId), - }) - return err - }) -} - // deleteVolume deletes the volume from the server func (t *BalanceTask) deleteVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, server, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index 76d56c7c5..68e3458e9 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -6,6 +6,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -27,6 +29,9 @@ func RegisterBalanceTask() { // Create configuration instance config := NewDefaultConfig() + // Create shared gRPC dial option using TLS configuration + dialOpt := security.LoadClientTLS(util.GetViper(), "grpc.worker") + // Create complete task definition taskDef := &base.TaskDefinition{ Type: types.TaskTypeBalance, @@ -50,6 +55,7 @@ func RegisterBalanceTask() { params.Sources[0].Node, // Use first source node params.VolumeId, params.Collection, + dialOpt, ), nil }, DetectionFunc: Detection, diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index df7fc94f9..3613930df 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -26,11 +26,12 @@ import ( // ErasureCodingTask implements the Task interface type ErasureCodingTask struct { *base.BaseTask - server string - volumeID uint32 - collection string - workDir string - progress float64 + server string + volumeID uint32 + collection string + workDir string + progress float64 + grpcDialOption grpc.DialOption // EC parameters dataShards int32 @@ -41,14 +42,15 @@ type ErasureCodingTask struct { } // NewErasureCodingTask creates a new unified EC task instance -func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask { +func NewErasureCodingTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *ErasureCodingTask { return &ErasureCodingTask{ - BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding), - server: server, - volumeID: volumeID, - collection: collection, - dataShards: erasure_coding.DataShardsCount, // Default values - parityShards: erasure_coding.ParityShardsCount, // Default values + BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding), + server: server, + volumeID: volumeID, + collection: collection, + dataShards: erasure_coding.DataShardsCount, // Default values + parityShards: erasure_coding.ParityShardsCount, // Default values + grpcDialOption: grpcDialOption, } } @@ -243,7 +245,7 @@ func (t *ErasureCodingTask) GetProgress() float64 { // markVolumeReadonly marks the volume as readonly on the source server func (t *ErasureCodingTask) markVolumeReadonly() error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: t.volumeID, @@ -301,7 +303,7 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] // copyFileFromSource copies a file from source server to local path using gRPC streaming func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: t.volumeID, @@ -533,7 +535,7 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err // sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // Open the local shard file file, err := os.Open(filePath) @@ -665,7 +667,7 @@ func (t *ErasureCodingTask) mountEcShards() error { continue } - err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(), + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: t.volumeID, @@ -722,7 +724,7 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { "volume_id": t.volumeID, }).Info("Deleting volume from replica server") - err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(), + err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: t.volumeID, diff --git a/weed/worker/tasks/erasure_coding/register.go b/weed/worker/tasks/erasure_coding/register.go index e574e0033..882b6cda6 100644 --- a/weed/worker/tasks/erasure_coding/register.go +++ b/weed/worker/tasks/erasure_coding/register.go @@ -6,6 +6,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -27,6 +29,9 @@ func RegisterErasureCodingTask() { // Create configuration instance config := NewDefaultConfig() + // Create shared gRPC dial option using TLS configuration + dialOpt := security.LoadClientTLS(util.GetViper(), "grpc.worker") + // Create complete task definition taskDef := &base.TaskDefinition{ Type: types.TaskTypeErasureCoding, @@ -50,6 +55,7 @@ func RegisterErasureCodingTask() { params.Sources[0].Node, // Use first source node params.VolumeId, params.Collection, + dialOpt, ), nil }, DetectionFunc: Detection, diff --git a/weed/worker/tasks/vacuum/register.go b/weed/worker/tasks/vacuum/register.go index 2c1360b5b..d918ba469 100644 --- a/weed/worker/tasks/vacuum/register.go +++ b/weed/worker/tasks/vacuum/register.go @@ -6,6 +6,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -27,6 +29,9 @@ func RegisterVacuumTask() { // Create configuration instance config := NewDefaultConfig() + // Create shared gRPC dial option using TLS configuration + dialOpt := security.LoadClientTLS(util.GetViper(), "grpc.worker") + // Create complete task definition taskDef := &base.TaskDefinition{ Type: types.TaskTypeVacuum, @@ -50,6 +55,7 @@ func RegisterVacuumTask() { params.Sources[0].Node, // Use first source node params.VolumeId, params.Collection, + dialOpt, ), nil }, DetectionFunc: Detection, diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index ebb41564f..4b890fada 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -24,16 +24,18 @@ type VacuumTask struct { collection string garbageThreshold float64 progress float64 + grpcDialOption grpc.DialOption } // NewVacuumTask creates a new unified vacuum task instance -func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask { +func NewVacuumTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *VacuumTask { return &VacuumTask{ BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum), server: server, volumeID: volumeID, collection: collection, garbageThreshold: 0.3, // Default 30% threshold + grpcDialOption: grpcDialOption, } } @@ -150,7 +152,7 @@ func (t *VacuumTask) GetProgress() float64 { func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { var garbageRatio float64 - err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, @@ -177,7 +179,7 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { // performVacuum executes the actual vacuum operation func (t *VacuumTask) performVacuum() error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // Step 1: Compact the volume t.GetLogger().Info("Compacting volume") @@ -225,7 +227,7 @@ func (t *VacuumTask) performVacuum() error { // verifyVacuumResults checks the volume status after vacuum func (t *VacuumTask) verifyVacuumResults() error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID,