Browse Source

fix: address review comments on admin_script seeding

- Replace TOCTOU race (separate Load+Save) with atomic
  SaveJobTypeConfigIfNotExists on ConfigStore and Plugin
- Replace ineffective polling loop with single GetMaster call using
  30s context timeout, since GetMaster respects context cancellation
- Add unit tests for SaveJobTypeConfigIfNotExists (in-memory + on-disk)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
pull/8509/head
Chris Lu 5 days ago
parent
commit
4b2a231913
  1. 29
      weed/admin/dash/admin_server.go
  2. 47
      weed/admin/plugin/config_store.go
  3. 75
      weed/admin/plugin/config_store_test.go
  4. 13
      weed/admin/plugin/plugin.go

29
weed/admin/dash/admin_server.go

@ -287,12 +287,12 @@ func (s *AdminServer) seedAdminScriptFromMaster() {
return return
} }
// Wait for master connection to be available
for i := 0; i < 30; i++ {
if s.masterClient.GetMaster(context.Background()) != "" {
break
}
time.Sleep(time.Second)
// Wait for master connection with a bounded timeout
waitCtx, waitCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer waitCancel()
if s.masterClient.GetMaster(waitCtx) == "" {
glog.V(1).Infof("Timed out waiting for master connection for admin_script seeding")
return
} }
var maintenanceScripts string var maintenanceScripts string
@ -318,16 +318,6 @@ func (s *AdminServer) seedAdminScriptFromMaster() {
return return
} }
// Only seed if the admin_script plugin does not already have a saved config
existing, err := s.plugin.LoadJobTypeConfig("admin_script")
if err != nil {
glog.Warningf("Failed to check admin_script plugin config: %v", err)
return
}
if existing != nil {
return
}
interval := int64(sleepMinutes) interval := int64(sleepMinutes)
if interval <= 0 { if interval <= 0 {
interval = clustermaintenance.DefaultMaintenanceSleepMinutes interval = clustermaintenance.DefaultMaintenanceSleepMinutes
@ -355,11 +345,14 @@ func (s *AdminServer) seedAdminScriptFromMaster() {
UpdatedBy: "master_migration", UpdatedBy: "master_migration",
} }
if err := s.plugin.SaveJobTypeConfig(cfg); err != nil {
saved, err := s.plugin.SaveJobTypeConfigIfNotExists(cfg)
if err != nil {
glog.Warningf("Failed to seed admin_script plugin config from master: %v", err) glog.Warningf("Failed to seed admin_script plugin config from master: %v", err)
return return
} }
glog.V(0).Infof("Seeded admin_script plugin config from master maintenance scripts (interval=%dm)", interval)
if saved {
glog.V(0).Infof("Seeded admin_script plugin config from master maintenance scripts (interval=%dm)", interval)
}
} }
// cleanMaintenanceScript strips lock/unlock commands and normalizes a // cleanMaintenanceScript strips lock/unlock commands and normalizes a

47
weed/admin/plugin/config_store.go

@ -256,6 +256,53 @@ func (s *ConfigStore) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig
return nil return nil
} }
// SaveJobTypeConfigIfNotExists atomically checks whether a config for the
// given job type already exists and only persists config when none is found.
// Returns true if the config was saved, false if a config already existed.
func (s *ConfigStore) SaveJobTypeConfigIfNotExists(config *plugin_pb.PersistedJobTypeConfig) (bool, error) {
if config == nil {
return false, fmt.Errorf("job type config is nil")
}
if config.JobType == "" {
return false, fmt.Errorf("job type config has empty job_type")
}
sanitizedJobType, err := sanitizeJobType(config.JobType)
if err != nil {
return false, err
}
config.JobType = sanitizedJobType
clone := proto.Clone(config).(*plugin_pb.PersistedJobTypeConfig)
s.mu.Lock()
defer s.mu.Unlock()
if !s.configured {
if _, exists := s.memConfigs[config.JobType]; exists {
return false, nil
}
s.memConfigs[config.JobType] = clone
return true, nil
}
pbPath := filepath.Join(s.baseDir, jobTypesDirName, config.JobType, configPBFileName)
if _, statErr := os.Stat(pbPath); statErr == nil {
return false, nil
}
jobTypeDir, err := s.ensureJobTypeDir(config.JobType)
if err != nil {
return false, err
}
jsonPath := filepath.Join(jobTypeDir, configJSONFileName)
if err := writeProtoFiles(clone, filepath.Join(jobTypeDir, configPBFileName), jsonPath); err != nil {
return false, fmt.Errorf("save job type config for %s: %w", config.JobType, err)
}
return true, nil
}
func (s *ConfigStore) LoadJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) { func (s *ConfigStore) LoadJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) {
if _, err := sanitizeJobType(jobType); err != nil { if _, err := sanitizeJobType(jobType); err != nil {
return nil, err return nil, err

75
weed/admin/plugin/config_store_test.go

@ -208,6 +208,81 @@ func TestConfigStoreMonitorStateRoundTrip(t *testing.T) {
} }
} }
func TestConfigStoreSaveJobTypeConfigIfNotExists(t *testing.T) {
t.Parallel()
t.Run("in-memory", func(t *testing.T) {
t.Parallel()
store, err := NewConfigStore("")
if err != nil {
t.Fatalf("NewConfigStore: %v", err)
}
testSaveJobTypeConfigIfNotExists(t, store)
})
t.Run("on-disk", func(t *testing.T) {
t.Parallel()
store, err := NewConfigStore(t.TempDir())
if err != nil {
t.Fatalf("NewConfigStore: %v", err)
}
testSaveJobTypeConfigIfNotExists(t, store)
})
}
func testSaveJobTypeConfigIfNotExists(t *testing.T, store *ConfigStore) {
t.Helper()
cfg := &plugin_pb.PersistedJobTypeConfig{
JobType: "admin_script",
AdminRuntime: &plugin_pb.AdminRuntimeConfig{Enabled: true},
}
// First call should save.
saved, err := store.SaveJobTypeConfigIfNotExists(cfg)
if err != nil {
t.Fatalf("first SaveJobTypeConfigIfNotExists: %v", err)
}
if !saved {
t.Fatal("expected first call to save the config")
}
// Second call with same job type should not save.
saved, err = store.SaveJobTypeConfigIfNotExists(&plugin_pb.PersistedJobTypeConfig{
JobType: "admin_script",
AdminRuntime: &plugin_pb.AdminRuntimeConfig{Enabled: false},
})
if err != nil {
t.Fatalf("second SaveJobTypeConfigIfNotExists: %v", err)
}
if saved {
t.Fatal("expected second call to be a no-op")
}
// Verify the original config was preserved.
loaded, err := store.LoadJobTypeConfig("admin_script")
if err != nil {
t.Fatalf("LoadJobTypeConfig: %v", err)
}
if loaded == nil {
t.Fatal("expected config to exist")
}
if !loaded.AdminRuntime.Enabled {
t.Fatal("expected original config (Enabled=true) to be preserved")
}
// Different job type should still save.
saved, err = store.SaveJobTypeConfigIfNotExists(&plugin_pb.PersistedJobTypeConfig{
JobType: "vacuum",
})
if err != nil {
t.Fatalf("SaveJobTypeConfigIfNotExists for different type: %v", err)
}
if !saved {
t.Fatal("expected save for a different job type")
}
}
func TestConfigStoreJobDetailRoundTrip(t *testing.T) { func TestConfigStoreJobDetailRoundTrip(t *testing.T) {
t.Parallel() t.Parallel()

13
weed/admin/plugin/plugin.go

@ -402,6 +402,19 @@ func (r *Plugin) SaveJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) err
return nil return nil
} }
// SaveJobTypeConfigIfNotExists atomically saves config only if no config
// exists yet for the job type. Returns true if saved, false if already exists.
func (r *Plugin) SaveJobTypeConfigIfNotExists(config *plugin_pb.PersistedJobTypeConfig) (bool, error) {
saved, err := r.store.SaveJobTypeConfigIfNotExists(config)
if err != nil {
return false, err
}
if saved {
r.wakeScheduler()
}
return saved, nil
}
func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) { func (r *Plugin) LoadDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) {
return r.store.LoadDescriptor(jobType) return r.store.LoadDescriptor(jobType)
} }

Loading…
Cancel
Save