diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index d6a9161d6..1b5ce7a90 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -287,12 +287,12 @@ func (s *AdminServer) seedAdminScriptFromMaster() { return } - // Wait for master connection to be available - for i := 0; i < 30; i++ { - if s.masterClient.GetMaster(context.Background()) != "" { - break - } - time.Sleep(time.Second) + // Wait for master connection with a bounded timeout + waitCtx, waitCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer waitCancel() + if s.masterClient.GetMaster(waitCtx) == "" { + glog.V(1).Infof("Timed out waiting for master connection for admin_script seeding") + return } var maintenanceScripts string @@ -318,16 +318,6 @@ func (s *AdminServer) seedAdminScriptFromMaster() { return } - // Only seed if the admin_script plugin does not already have a saved config - existing, err := s.plugin.LoadJobTypeConfig("admin_script") - if err != nil { - glog.Warningf("Failed to check admin_script plugin config: %v", err) - return - } - if existing != nil { - return - } - interval := int64(sleepMinutes) if interval <= 0 { interval = clustermaintenance.DefaultMaintenanceSleepMinutes @@ -355,11 +345,14 @@ func (s *AdminServer) seedAdminScriptFromMaster() { UpdatedBy: "master_migration", } - if err := s.plugin.SaveJobTypeConfig(cfg); err != nil { + saved, err := s.plugin.SaveJobTypeConfigIfNotExists(cfg) + if err != nil { glog.Warningf("Failed to seed admin_script plugin config from master: %v", err) return } - glog.V(0).Infof("Seeded admin_script plugin config from master maintenance scripts (interval=%dm)", interval) + if saved { + glog.V(0).Infof("Seeded admin_script plugin config from master maintenance scripts (interval=%dm)", interval) + } } // cleanMaintenanceScript strips lock/unlock commands and normalizes a diff --git a/weed/admin/plugin/config_store.go b/weed/admin/plugin/config_store.go index 9a2b8484e..c36f7bd67 100644 --- a/weed/admin/plugin/config_store.go +++ b/weed/admin/plugin/config_store.go @@ -256,6 +256,53 @@ func (s *ConfigStore) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig return nil } +// SaveJobTypeConfigIfNotExists atomically checks whether a config for the +// given job type already exists and only persists config when none is found. +// Returns true if the config was saved, false if a config already existed. +func (s *ConfigStore) SaveJobTypeConfigIfNotExists(config *plugin_pb.PersistedJobTypeConfig) (bool, error) { + if config == nil { + return false, fmt.Errorf("job type config is nil") + } + if config.JobType == "" { + return false, fmt.Errorf("job type config has empty job_type") + } + sanitizedJobType, err := sanitizeJobType(config.JobType) + if err != nil { + return false, err + } + config.JobType = sanitizedJobType + + clone := proto.Clone(config).(*plugin_pb.PersistedJobTypeConfig) + + s.mu.Lock() + defer s.mu.Unlock() + + if !s.configured { + if _, exists := s.memConfigs[config.JobType]; exists { + return false, nil + } + s.memConfigs[config.JobType] = clone + return true, nil + } + + pbPath := filepath.Join(s.baseDir, jobTypesDirName, config.JobType, configPBFileName) + if _, statErr := os.Stat(pbPath); statErr == nil { + return false, nil + } + + jobTypeDir, err := s.ensureJobTypeDir(config.JobType) + if err != nil { + return false, err + } + + jsonPath := filepath.Join(jobTypeDir, configJSONFileName) + if err := writeProtoFiles(clone, filepath.Join(jobTypeDir, configPBFileName), jsonPath); err != nil { + return false, fmt.Errorf("save job type config for %s: %w", config.JobType, err) + } + + return true, nil +} + func (s *ConfigStore) LoadJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) { if _, err := sanitizeJobType(jobType); err != nil { return nil, err diff --git a/weed/admin/plugin/config_store_test.go b/weed/admin/plugin/config_store_test.go index 689ec4e0a..f53a2699d 100644 --- a/weed/admin/plugin/config_store_test.go +++ b/weed/admin/plugin/config_store_test.go @@ -208,6 +208,81 @@ func TestConfigStoreMonitorStateRoundTrip(t *testing.T) { } } +func TestConfigStoreSaveJobTypeConfigIfNotExists(t *testing.T) { + t.Parallel() + + t.Run("in-memory", func(t *testing.T) { + t.Parallel() + store, err := NewConfigStore("") + if err != nil { + t.Fatalf("NewConfigStore: %v", err) + } + testSaveJobTypeConfigIfNotExists(t, store) + }) + + t.Run("on-disk", func(t *testing.T) { + t.Parallel() + store, err := NewConfigStore(t.TempDir()) + if err != nil { + t.Fatalf("NewConfigStore: %v", err) + } + testSaveJobTypeConfigIfNotExists(t, store) + }) +} + +func testSaveJobTypeConfigIfNotExists(t *testing.T, store *ConfigStore) { + t.Helper() + + cfg := &plugin_pb.PersistedJobTypeConfig{ + JobType: "admin_script", + AdminRuntime: &plugin_pb.AdminRuntimeConfig{Enabled: true}, + } + + // First call should save. + saved, err := store.SaveJobTypeConfigIfNotExists(cfg) + if err != nil { + t.Fatalf("first SaveJobTypeConfigIfNotExists: %v", err) + } + if !saved { + t.Fatal("expected first call to save the config") + } + + // Second call with same job type should not save. + saved, err = store.SaveJobTypeConfigIfNotExists(&plugin_pb.PersistedJobTypeConfig{ + JobType: "admin_script", + AdminRuntime: &plugin_pb.AdminRuntimeConfig{Enabled: false}, + }) + if err != nil { + t.Fatalf("second SaveJobTypeConfigIfNotExists: %v", err) + } + if saved { + t.Fatal("expected second call to be a no-op") + } + + // Verify the original config was preserved. + loaded, err := store.LoadJobTypeConfig("admin_script") + if err != nil { + t.Fatalf("LoadJobTypeConfig: %v", err) + } + if loaded == nil { + t.Fatal("expected config to exist") + } + if !loaded.AdminRuntime.Enabled { + t.Fatal("expected original config (Enabled=true) to be preserved") + } + + // Different job type should still save. + saved, err = store.SaveJobTypeConfigIfNotExists(&plugin_pb.PersistedJobTypeConfig{ + JobType: "vacuum", + }) + if err != nil { + t.Fatalf("SaveJobTypeConfigIfNotExists for different type: %v", err) + } + if !saved { + t.Fatal("expected save for a different job type") + } +} + func TestConfigStoreJobDetailRoundTrip(t *testing.T) { t.Parallel() diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index aecf44757..c60362bf7 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -402,6 +402,19 @@ func (r *Plugin) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) err return nil } +// SaveJobTypeConfigIfNotExists atomically saves config only if no config +// exists yet for the job type. Returns true if saved, false if already exists. +func (r *Plugin) SaveJobTypeConfigIfNotExists(config *plugin_pb.PersistedJobTypeConfig) (bool, error) { + saved, err := r.store.SaveJobTypeConfigIfNotExists(config) + if err != nil { + return false, err + } + if saved { + r.wakeScheduler() + } + return saved, nil +} + func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) { return r.store.LoadDescriptor(jobType) }