Browse Source

Remove min_interval_seconds from plugin workers; vacuum default to 17m (#8790)

remove min_interval_seconds from plugin workers and default vacuum interval to 17m

The worker-level min_interval_seconds was redundant with the admin-side
DetectionIntervalSeconds, complicating scheduling logic. Remove it from
vacuum, volume_balance, erasure_coding, and ec_balance handlers.

Also change the vacuum default DetectionIntervalSeconds from 2 hours to
17 minutes to match the previous default behavior.
pull/6224/merge
Chris Lu 1 day ago
committed by GitHub
parent
commit
2604ec7deb
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      weed/admin/dash/config_persistence.go
  2. 5
      weed/admin/maintenance/maintenance_config_proto.go
  3. 5
      weed/admin/maintenance/maintenance_manager.go
  4. 47
      weed/plugin/worker/ec_balance_handler.go
  5. 47
      weed/plugin/worker/erasure_coding_handler.go
  6. 36
      weed/plugin/worker/erasure_coding_handler_test.go
  7. 43
      weed/plugin/worker/vacuum_handler.go
  8. 29
      weed/plugin/worker/vacuum_handler_test.go
  9. 46
      weed/plugin/worker/volume_balance_handler.go
  10. 37
      weed/plugin/worker/volume_balance_handler_test.go
  11. 28
      weed/worker/tasks/vacuum/config.go
  12. 2
      weed/worker/tasks/vacuum/detection.go

4
weed/admin/dash/config_persistence.go

@ -291,7 +291,6 @@ func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
return &VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}, nil
}
@ -308,7 +307,6 @@ func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, erro
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
},
},
}, nil
@ -329,7 +327,6 @@ func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, erro
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
},
},
}, nil
@ -710,7 +707,6 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
},
},
}

5
weed/admin/maintenance/maintenance_config_proto.go

@ -51,9 +51,8 @@ func (mcm *MaintenanceConfigManager) GetVacuumConfig(taskType string) *worker_pb
}
// Return defaults if not configured
return &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
}
}

5
weed/admin/maintenance/maintenance_manager.go

@ -31,9 +31,8 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
},
},
}

47
weed/plugin/worker/ec_balance_handler.go

@ -36,8 +36,7 @@ func init() {
}
type ecBalanceWorkerConfig struct {
TaskConfig *ecbalancetask.Config
MinIntervalSeconds int
TaskConfig *ecbalancetask.Config
}
// ECBalanceHandler is the plugin job handler for EC shard balancing.
@ -141,15 +140,6 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: ecBalanceMinServerCount}},
},
{
Name: "min_interval_seconds",
Label: "Minimum Detection Interval (s)",
Description: "Skip detection if the last successful run is more recent than this interval.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "preferred_tags",
Label: "Preferred Tags",
@ -164,7 +154,6 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
DefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
},
@ -182,7 +171,6 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
}
@ -204,30 +192,6 @@ func (h *ECBalanceHandler) Detect(
}
workerConfig := deriveECBalanceWorkerConfig(request.GetWorkerConfigValues())
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(BuildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("EC BALANCE: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "ec_balance",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "ec_balance",
Success: true,
TotalProposals: 0,
})
}
// Apply admin-side scope filters
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
@ -440,20 +404,11 @@ func deriveECBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *ecBa
}
taskConfig.MinServerCount = int(minServerCountRaw)
minIntervalRaw := readInt64Config(values, "min_interval_seconds", 60*60)
if minIntervalRaw < 0 {
minIntervalRaw = 0
}
if minIntervalRaw > math.MaxInt32 {
minIntervalRaw = math.MaxInt32
}
minIntervalSeconds := int(minIntervalRaw)
taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags"))
return &ecBalanceWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
}
}

47
weed/plugin/worker/erasure_coding_handler.go

@ -32,8 +32,7 @@ func init() {
}
type erasureCodingWorkerConfig struct {
TaskConfig *erasurecodingtask.Config
MinIntervalSeconds int
TaskConfig *erasurecodingtask.Config
}
// ErasureCodingHandler is the plugin job handler for erasure coding.
@ -131,15 +130,6 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "min_interval_seconds",
Label: "Minimum Detection Interval (s)",
Description: "Skip detection if the last successful run is more recent than this interval.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "preferred_tags",
Label: "Preferred Tags",
@ -161,9 +151,6 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
@ -190,9 +177,6 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
@ -216,30 +200,6 @@ func (h *ErasureCodingHandler) Detect(
}
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(BuildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("ERASURE CODING: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: 0,
})
}
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
if collectionFilter != "" {
@ -629,15 +589,10 @@ func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *
}
taskConfig.MinSizeMB = minSizeMB
minIntervalSeconds := int(readInt64Config(values, "min_interval_seconds", 60*60))
if minIntervalSeconds < 0 {
minIntervalSeconds = 0
}
taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags"))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
}
}

36
weed/plugin/worker/erasure_coding_handler_test.go

@ -12,7 +12,6 @@ import (
erasurecodingtask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestDecodeErasureCodingTaskParamsFromPayload(t *testing.T) {
@ -118,9 +117,6 @@ func TestDeriveErasureCodingWorkerConfig(t *testing.T) {
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 128},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 55},
},
}
cfg := deriveErasureCodingWorkerConfig(values)
@ -133,9 +129,6 @@ func TestDeriveErasureCodingWorkerConfig(t *testing.T) {
if cfg.TaskConfig.MinSizeMB != 128 {
t.Fatalf("expected min_size_mb 128, got %d", cfg.TaskConfig.MinSizeMB)
}
if cfg.MinIntervalSeconds != 55 {
t.Fatalf("expected min_interval_seconds 55, got %d", cfg.MinIntervalSeconds)
}
}
func TestBuildErasureCodingProposal(t *testing.T) {
@ -211,35 +204,6 @@ func TestErasureCodingHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestErasureCodingHandlerDetectSkipsByMinInterval(t *testing.T) {
handler := NewErasureCodingHandler(nil, "")
sender := &recordingDetectionSender{}
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
JobType: "erasure_coding",
LastSuccessfulRun: timestamppb.New(time.Now().Add(-3 * time.Second)),
WorkerConfigValues: map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}},
},
}, sender)
if err != nil {
t.Fatalf("detect returned err = %v", err)
}
if sender.proposals == nil {
t.Fatalf("expected proposals message")
}
if len(sender.proposals.Proposals) != 0 {
t.Fatalf("expected zero proposals, got %d", len(sender.proposals.Proposals))
}
if sender.complete == nil || !sender.complete.Success {
t.Fatalf("expected successful completion message")
}
if len(sender.events) == 0 {
t.Fatalf("expected detector activity events")
}
if !strings.Contains(sender.events[0].Message, "min interval") {
t.Fatalf("unexpected skip-by-interval message: %q", sender.events[0].Message)
}
}
func TestEmitErasureCodingDetectionDecisionTraceNoTasks(t *testing.T) {
sender := &recordingDetectionSender{}

43
weed/plugin/worker/vacuum_handler.go

@ -178,15 +178,6 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "min_interval_seconds",
Label: "Min Interval (s)",
Description: "Minimum interval between vacuum on the same volume.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
},
},
},
@ -197,14 +188,11 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_volume_age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 24 * 60 * 60},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 7 * 24 * 60 * 60},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalSeconds: 2 * 60 * 60,
DetectionIntervalSeconds: 17 * 60,
DetectionTimeoutSeconds: 120,
MaxJobsPerDetection: 200,
GlobalExecutionConcurrency: 16,
@ -220,9 +208,6 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_volume_age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 24 * 60 * 60},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 7 * 24 * 60 * 60},
},
},
}
}
@ -239,31 +224,6 @@ func (h *VacuumHandler) Detect(ctx context.Context, request *plugin_pb.RunDetect
}
workerConfig := deriveVacuumConfig(request.GetWorkerConfigValues())
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(BuildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("VACUUM: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "vacuum",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "vacuum",
Success: true,
TotalProposals: 0,
})
}
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
masters := make([]string, 0)
if request.ClusterContext != nil {
@ -572,7 +532,6 @@ func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *vacuumtask.Co
config := vacuumtask.NewDefaultConfig()
config.GarbageThreshold = readDoubleConfig(values, "garbage_threshold", config.GarbageThreshold)
config.MinVolumeAgeSeconds = int(readInt64Config(values, "min_volume_age_seconds", int64(config.MinVolumeAgeSeconds)))
config.MinIntervalSeconds = int(readInt64Config(values, "min_interval_seconds", int64(config.MinIntervalSeconds)))
return config
}

29
weed/plugin/worker/vacuum_handler_test.go

@ -87,9 +87,6 @@ func TestDeriveVacuumConfigAllowsZeroValues(t *testing.T) {
"min_volume_age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0},
},
}
cfg := deriveVacuumConfig(values)
@ -99,9 +96,6 @@ func TestDeriveVacuumConfigAllowsZeroValues(t *testing.T) {
if cfg.MinVolumeAgeSeconds != 0 {
t.Fatalf("expected min_volume_age_seconds 0, got %d", cfg.MinVolumeAgeSeconds)
}
if cfg.MinIntervalSeconds != 0 {
t.Fatalf("expected min_interval_seconds 0, got %d", cfg.MinIntervalSeconds)
}
}
func TestMasterAddressCandidates(t *testing.T) {
@ -157,29 +151,6 @@ func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestVacuumHandlerDetectSkipsByMinInterval(t *testing.T) {
handler := NewVacuumHandler(nil, 0)
sender := &recordingDetectionSender{}
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
JobType: "vacuum",
LastSuccessfulRun: timestamppb.New(time.Now().Add(-3 * time.Second)),
WorkerConfigValues: map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}},
},
}, sender)
if err != nil {
t.Fatalf("detect returned err = %v", err)
}
if sender.proposals == nil {
t.Fatalf("expected proposals message")
}
if len(sender.proposals.Proposals) != 0 {
t.Fatalf("expected zero proposals, got %d", len(sender.proposals.Proposals))
}
if sender.complete == nil || !sender.complete.Success {
t.Fatalf("expected successful completion message")
}
}
func TestBuildExecutorActivity(t *testing.T) {
activity := BuildExecutorActivity("running", "vacuum in progress")

46
weed/plugin/worker/volume_balance_handler.go

@ -56,7 +56,6 @@ func init() {
type volumeBalanceWorkerConfig struct {
TaskConfig *balancetask.Config
MinIntervalSeconds int
MaxConcurrentMoves int
BatchSize int
}
@ -194,15 +193,6 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}},
},
{
Name: "min_interval_seconds",
Label: "Minimum Detection Interval (s)",
Description: "Skip detection if the last successful run is more recent than this interval.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
},
},
{
@ -238,9 +228,6 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
@ -267,9 +254,6 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
"max_concurrent_moves": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
},
@ -296,31 +280,6 @@ func (h *VolumeBalanceHandler) Detect(
}
workerConfig := deriveBalanceWorkerConfig(request.GetWorkerConfigValues())
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(BuildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("VOLUME BALANCE: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "volume_balance",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "volume_balance",
Success: true,
TotalProposals: 0,
})
}
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
masters := make([]string, 0)
if request.ClusterContext != nil {
@ -1110,10 +1069,6 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
}
taskConfig.MinServerCount = minServerCount
minIntervalSeconds := int(readInt64Config(values, "min_interval_seconds", 0))
if minIntervalSeconds < 0 {
minIntervalSeconds = 0
}
maxConcurrentMoves64 := readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves))
if maxConcurrentMoves64 < 1 {
@ -1135,7 +1090,6 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
return &volumeBalanceWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
MaxConcurrentMoves: maxConcurrentMoves,
BatchSize: batchSize,
}

37
weed/plugin/worker/volume_balance_handler_test.go

@ -6,14 +6,12 @@ import (
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestDecodeVolumeBalanceTaskParamsFromPayload(t *testing.T) {
@ -98,9 +96,6 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) {
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 5},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 33},
},
}
cfg := deriveBalanceWorkerConfig(values)
@ -110,9 +105,6 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) {
if cfg.TaskConfig.MinServerCount != 5 {
t.Fatalf("expected min_server_count 5, got %d", cfg.TaskConfig.MinServerCount)
}
if cfg.MinIntervalSeconds != 33 {
t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds)
}
// Defaults for batch config when not specified
if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves {
t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves)
@ -348,35 +340,6 @@ func TestVolumeBalanceHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestVolumeBalanceHandlerDetectSkipsByMinInterval(t *testing.T) {
handler := NewVolumeBalanceHandler(nil)
sender := &recordingDetectionSender{}
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
JobType: "volume_balance",
LastSuccessfulRun: timestamppb.New(time.Now().Add(-3 * time.Second)),
WorkerConfigValues: map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}},
},
}, sender)
if err != nil {
t.Fatalf("detect returned err = %v", err)
}
if sender.proposals == nil {
t.Fatalf("expected proposals message")
}
if len(sender.proposals.Proposals) != 0 {
t.Fatalf("expected zero proposals, got %d", len(sender.proposals.Proposals))
}
if sender.complete == nil || !sender.complete.Success {
t.Fatalf("expected successful completion message")
}
if len(sender.events) == 0 {
t.Fatalf("expected detector activity events")
}
if !strings.Contains(sender.events[0].Message, "min interval") {
t.Fatalf("unexpected skip-by-interval message: %q", sender.events[0].Message)
}
}
func TestEmitVolumeBalanceDetectionDecisionTraceNoTasks(t *testing.T) {
sender := &recordingDetectionSender{}

28
weed/worker/tasks/vacuum/config.go

@ -14,7 +14,6 @@ type Config struct {
base.BaseConfig
GarbageThreshold float64 `json:"garbage_threshold"`
MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
MinIntervalSeconds int `json:"min_interval_seconds"`
}
// NewDefaultConfig creates a new default vacuum configuration
@ -25,9 +24,8 @@ func NewDefaultConfig() *Config {
ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
MaxConcurrent: 2,
},
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
GarbageThreshold: 0.3, // 30%
MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
}
}
@ -40,9 +38,8 @@ func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(c.GarbageThreshold),
MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(c.MinIntervalSeconds),
GarbageThreshold: float64(c.GarbageThreshold),
MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
},
},
}
@ -63,7 +60,6 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold)
c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds
c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds)
}
return nil
@ -169,22 +165,6 @@ func GetConfigSpec() base.ConfigSpec {
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "min_interval_seconds",
JSONName: "min_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60,
MinValue: 1 * 24 * 60 * 60,
MaxValue: 30 * 24 * 60 * 60,
Required: true,
DisplayName: "Minimum Interval",
Description: "Minimum time between vacuum operations on the same volume",
HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
},
}
}

2
weed/worker/tasks/vacuum/detection.go

@ -105,7 +105,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
if vacuumConfig != nil {
garbageThreshold = vacuumConfig.GarbageThreshold
// Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
// Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours
// Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
// to the protobuf definition if they should be configurable
}

Loading…
Cancel
Save