diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index b9e3d61d0..41de0676e 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -289,6 +289,7 @@ message TaskPolicy { int32 max_concurrent = 2; int32 repeat_interval_seconds = 3; // Seconds to wait before repeating int32 check_interval_seconds = 4; // Seconds between checks + string description = 9; // Generic field for task-specific config (JSON format) // Typed task-specific configuration (replaces generic map) oneof task_config { diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index be2e877fc..001fd1663 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -2379,6 +2379,7 @@ type TaskPolicy struct { MaxConcurrent int32 `protobuf:"varint,2,opt,name=max_concurrent,json=maxConcurrent,proto3" json:"max_concurrent,omitempty"` RepeatIntervalSeconds int32 `protobuf:"varint,3,opt,name=repeat_interval_seconds,json=repeatIntervalSeconds,proto3" json:"repeat_interval_seconds,omitempty"` // Seconds to wait before repeating CheckIntervalSeconds int32 `protobuf:"varint,4,opt,name=check_interval_seconds,json=checkIntervalSeconds,proto3" json:"check_interval_seconds,omitempty"` // Seconds between checks + Description string `protobuf:"bytes,9,opt,name=description,proto3" json:"description,omitempty"` // Generic field for task-specific config (JSON format) // Typed task-specific configuration (replaces generic map) // // Types that are valid to be assigned to TaskConfig: @@ -2450,6 +2451,13 @@ func (x *TaskPolicy) GetCheckIntervalSeconds() int32 { return 0 } +func (x *TaskPolicy) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + func (x *TaskPolicy) GetTaskConfig() isTaskPolicy_TaskConfig { if x != nil { return x.TaskConfig @@ -3544,13 +3552,14 @@ const file_worker_proto_rawDesc = "" + "\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" + "\x11TaskPoliciesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12+\n" + - "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\x82\x04\n" + + "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\xa4\x04\n" + "\n" + "TaskPolicy\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" + "\x0emax_concurrent\x18\x02 \x01(\x05R\rmaxConcurrent\x126\n" + "\x17repeat_interval_seconds\x18\x03 \x01(\x05R\x15repeatIntervalSeconds\x124\n" + - "\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12B\n" + + "\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12 \n" + + "\vdescription\x18\t \x01(\tR\vdescription\x12B\n" + "\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" + "\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" + "\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" + diff --git a/weed/worker/tasks/table_maintenance/config.go b/weed/worker/tasks/table_maintenance/config.go index 2c7e171e4..f2f455df5 100644 --- a/weed/worker/tasks/table_maintenance/config.go +++ b/weed/worker/tasks/table_maintenance/config.go @@ -1,6 +1,7 @@ package table_maintenance import ( + "encoding/json" "fmt" "github.com/seaweedfs/seaweedfs/weed/admin/config" @@ -36,14 +37,23 @@ func NewDefaultConfig() *Config { // ToTaskPolicy converts configuration to a TaskPolicy protobuf message func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { - return &worker_pb.TaskPolicy{ + policy := &worker_pb.TaskPolicy{ Enabled: c.Enabled, MaxConcurrent: int32(c.MaxConcurrent), RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60), - // Table maintenance doesn't have a specific protobuf config yet - // Would need to add TableMaintenanceTaskConfig to worker_pb } + + // Encode config-specific fields in Description as JSON + configData := map[string]interface{}{ + "compaction_file_threshold": c.CompactionFileThreshold, + "snapshot_retention_days": c.SnapshotRetentionDays, + } + if data, err := json.Marshal(configData); err == nil { + policy.Description = string(data) + } + + return policy } // FromTaskPolicy loads configuration from a TaskPolicy protobuf message @@ -57,6 +67,23 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60) + // Decode config-specific fields from Description if present + if policy.Description != "" { + var configData map[string]interface{} + if err := json.Unmarshal([]byte(policy.Description), &configData); err == nil { + if val, ok := configData["compaction_file_threshold"]; ok { + if floatVal, ok := val.(float64); ok { + c.CompactionFileThreshold = int(floatVal) + } + } + if val, ok := configData["snapshot_retention_days"]; ok { + if floatVal, ok := val.(float64); ok { + c.SnapshotRetentionDays = int(floatVal) + } + } + } + } + return nil } diff --git a/weed/worker/tasks/table_maintenance/register.go b/weed/worker/tasks/table_maintenance/register.go index 3cf178b3e..3ecbc7b45 100644 --- a/weed/worker/tasks/table_maintenance/register.go +++ b/weed/worker/tasks/table_maintenance/register.go @@ -74,6 +74,18 @@ func RegisterTableMaintenanceTask() { } } + // Parse namespace and tableName from tablePath + // Expected format: /table-buckets/bucketName/namespaceName/tableName + namespace := "" + tableName := "" + parts := strings.Split(tablePath, "/") + if len(parts) > 3 { + namespace = parts[3] + } + if len(parts) > 4 { + tableName = parts[4] + } + // Create the maintenance job job := &TableMaintenanceJob{ JobType: jobType, @@ -86,8 +98,8 @@ func RegisterTableMaintenanceTask() { return NewTableMaintenanceTask( fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()), tableBucket, - "", // Namespace parsed from path if needed - "", // Table name parsed from path if needed + namespace, + tableName, job, ), nil }, diff --git a/weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go b/weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go index 20c774180..00c0c3b52 100644 --- a/weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go +++ b/weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go @@ -407,6 +407,19 @@ func TestConfigPersistence(t *testing.T) { if newConfig.MaxConcurrent != 4 { t.Errorf("Expected MaxConcurrent 4 after round-trip, got %d", newConfig.MaxConcurrent) } + + // Verify custom fields are preserved (the fix for the review comment) + if newConfig.CompactionFileThreshold != 200 { + t.Errorf("Expected CompactionFileThreshold 200 after round-trip, got %d", newConfig.CompactionFileThreshold) + } + + if newConfig.SnapshotRetentionDays != 14 { + t.Errorf("Expected SnapshotRetentionDays 14 after round-trip, got %d", newConfig.SnapshotRetentionDays) + } + + if newConfig.ScanIntervalMinutes != 60 { + t.Errorf("Expected ScanIntervalMinutes 60 after round-trip, got %d", newConfig.ScanIntervalMinutes) + } } func TestIcebergOps_GetExpiredSnapshots(t *testing.T) {