Browse Source

Persist config values across restarts, parse namespace/table from path

pull/8177/head
Chris Lu 2 weeks ago
parent
commit
4e1901d0bd
  1. 1
      weed/pb/worker.proto
  2. 13
      weed/pb/worker_pb/worker.pb.go
  3. 33
      weed/worker/tasks/table_maintenance/config.go
  4. 16
      weed/worker/tasks/table_maintenance/register.go
  5. 13
      weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go

1
weed/pb/worker.proto

@ -289,6 +289,7 @@ message TaskPolicy {
int32 max_concurrent = 2; int32 max_concurrent = 2;
int32 repeat_interval_seconds = 3; // Seconds to wait before repeating int32 repeat_interval_seconds = 3; // Seconds to wait before repeating
int32 check_interval_seconds = 4; // Seconds between checks int32 check_interval_seconds = 4; // Seconds between checks
string description = 9; // Generic field for task-specific config (JSON format)
// Typed task-specific configuration (replaces generic map) // Typed task-specific configuration (replaces generic map)
oneof task_config { oneof task_config {

13
weed/pb/worker_pb/worker.pb.go

@ -2379,6 +2379,7 @@ type TaskPolicy struct {
MaxConcurrent int32 `protobuf:"varint,2,opt,name=max_concurrent,json=maxConcurrent,proto3" json:"max_concurrent,omitempty"` MaxConcurrent int32 `protobuf:"varint,2,opt,name=max_concurrent,json=maxConcurrent,proto3" json:"max_concurrent,omitempty"`
RepeatIntervalSeconds int32 `protobuf:"varint,3,opt,name=repeat_interval_seconds,json=repeatIntervalSeconds,proto3" json:"repeat_interval_seconds,omitempty"` // Seconds to wait before repeating RepeatIntervalSeconds int32 `protobuf:"varint,3,opt,name=repeat_interval_seconds,json=repeatIntervalSeconds,proto3" json:"repeat_interval_seconds,omitempty"` // Seconds to wait before repeating
CheckIntervalSeconds int32 `protobuf:"varint,4,opt,name=check_interval_seconds,json=checkIntervalSeconds,proto3" json:"check_interval_seconds,omitempty"` // Seconds between checks CheckIntervalSeconds int32 `protobuf:"varint,4,opt,name=check_interval_seconds,json=checkIntervalSeconds,proto3" json:"check_interval_seconds,omitempty"` // Seconds between checks
Description string `protobuf:"bytes,9,opt,name=description,proto3" json:"description,omitempty"` // Generic field for task-specific config (JSON format)
// Typed task-specific configuration (replaces generic map) // Typed task-specific configuration (replaces generic map)
// //
// Types that are valid to be assigned to TaskConfig: // Types that are valid to be assigned to TaskConfig:
@ -2450,6 +2451,13 @@ func (x *TaskPolicy) GetCheckIntervalSeconds() int32 {
return 0 return 0
} }
func (x *TaskPolicy) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *TaskPolicy) GetTaskConfig() isTaskPolicy_TaskConfig { func (x *TaskPolicy) GetTaskConfig() isTaskPolicy_TaskConfig {
if x != nil { if x != nil {
return x.TaskConfig return x.TaskConfig
@ -3544,13 +3552,14 @@ const file_worker_proto_rawDesc = "" +
"\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" + "\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" +
"\x11TaskPoliciesEntry\x12\x10\n" + "\x11TaskPoliciesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12+\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12+\n" +
"\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\x82\x04\n" +
"\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\xa4\x04\n" +
"\n" + "\n" +
"TaskPolicy\x12\x18\n" + "TaskPolicy\x12\x18\n" +
"\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" +
"\x0emax_concurrent\x18\x02 \x01(\x05R\rmaxConcurrent\x126\n" + "\x0emax_concurrent\x18\x02 \x01(\x05R\rmaxConcurrent\x126\n" +
"\x17repeat_interval_seconds\x18\x03 \x01(\x05R\x15repeatIntervalSeconds\x124\n" + "\x17repeat_interval_seconds\x18\x03 \x01(\x05R\x15repeatIntervalSeconds\x124\n" +
"\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12B\n" +
"\x16check_interval_seconds\x18\x04 \x01(\x05R\x14checkIntervalSeconds\x12 \n" +
"\vdescription\x18\t \x01(\tR\vdescription\x12B\n" +
"\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" + "\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" +
"\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" + "\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" +
"\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" + "\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" +

33
weed/worker/tasks/table_maintenance/config.go

@ -1,6 +1,7 @@
package table_maintenance package table_maintenance
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/admin/config" "github.com/seaweedfs/seaweedfs/weed/admin/config"
@ -36,14 +37,23 @@ func NewDefaultConfig() *Config {
// ToTaskPolicy converts configuration to a TaskPolicy protobuf message // ToTaskPolicy converts configuration to a TaskPolicy protobuf message
func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
return &worker_pb.TaskPolicy{
policy := &worker_pb.TaskPolicy{
Enabled: c.Enabled, Enabled: c.Enabled,
MaxConcurrent: int32(c.MaxConcurrent), MaxConcurrent: int32(c.MaxConcurrent),
RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60), CheckIntervalSeconds: int32(c.ScanIntervalMinutes * 60),
// Table maintenance doesn't have a specific protobuf config yet
// Would need to add TableMaintenanceTaskConfig to worker_pb
} }
// Encode config-specific fields in Description as JSON
configData := map[string]interface{}{
"compaction_file_threshold": c.CompactionFileThreshold,
"snapshot_retention_days": c.SnapshotRetentionDays,
}
if data, err := json.Marshal(configData); err == nil {
policy.Description = string(data)
}
return policy
} }
// FromTaskPolicy loads configuration from a TaskPolicy protobuf message // FromTaskPolicy loads configuration from a TaskPolicy protobuf message
@ -57,6 +67,23 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60) c.ScanIntervalMinutes = int(policy.CheckIntervalSeconds / 60)
// Decode config-specific fields from Description if present
if policy.Description != "" {
var configData map[string]interface{}
if err := json.Unmarshal([]byte(policy.Description), &configData); err == nil {
if val, ok := configData["compaction_file_threshold"]; ok {
if floatVal, ok := val.(float64); ok {
c.CompactionFileThreshold = int(floatVal)
}
}
if val, ok := configData["snapshot_retention_days"]; ok {
if floatVal, ok := val.(float64); ok {
c.SnapshotRetentionDays = int(floatVal)
}
}
}
}
return nil return nil
} }

16
weed/worker/tasks/table_maintenance/register.go

@ -74,6 +74,18 @@ func RegisterTableMaintenanceTask() {
} }
} }
// Parse namespace and tableName from tablePath
// Expected format: /table-buckets/bucketName/namespaceName/tableName
namespace := ""
tableName := ""
parts := strings.Split(tablePath, "/")
if len(parts) > 3 {
namespace = parts[3]
}
if len(parts) > 4 {
tableName = parts[4]
}
// Create the maintenance job // Create the maintenance job
job := &TableMaintenanceJob{ job := &TableMaintenanceJob{
JobType: jobType, JobType: jobType,
@ -86,8 +98,8 @@ func RegisterTableMaintenanceTask() {
return NewTableMaintenanceTask( return NewTableMaintenanceTask(
fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()), fmt.Sprintf("table-maintenance-%s-%d", tableBucket, time.Now().UnixNano()),
tableBucket, tableBucket,
"", // Namespace parsed from path if needed
"", // Table name parsed from path if needed
namespace,
tableName,
job, job,
), nil ), nil
}, },

13
weed/worker/tasks/table_maintenance/table_maintenance_integration_test.go

@ -407,6 +407,19 @@ func TestConfigPersistence(t *testing.T) {
if newConfig.MaxConcurrent != 4 { if newConfig.MaxConcurrent != 4 {
t.Errorf("Expected MaxConcurrent 4 after round-trip, got %d", newConfig.MaxConcurrent) t.Errorf("Expected MaxConcurrent 4 after round-trip, got %d", newConfig.MaxConcurrent)
} }
// Verify custom fields are preserved (the fix for the review comment)
if newConfig.CompactionFileThreshold != 200 {
t.Errorf("Expected CompactionFileThreshold 200 after round-trip, got %d", newConfig.CompactionFileThreshold)
}
if newConfig.SnapshotRetentionDays != 14 {
t.Errorf("Expected SnapshotRetentionDays 14 after round-trip, got %d", newConfig.SnapshotRetentionDays)
}
if newConfig.ScanIntervalMinutes != 60 {
t.Errorf("Expected ScanIntervalMinutes 60 after round-trip, got %d", newConfig.ScanIntervalMinutes)
}
} }
func TestIcebergOps_GetExpiredSnapshots(t *testing.T) { func TestIcebergOps_GetExpiredSnapshots(t *testing.T) {

Loading…
Cancel
Save