diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 550794e6c..a2f73f02d 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -111,6 +111,9 @@ type AdminServer struct { // Worker gRPC server workerGrpcServer *WorkerGrpcServer + // Background goroutine lifecycle + bgCancel context.CancelFunc + // Collection statistics caching collectionStatsCache map[string]collectionStats lastCollectionStatsUpdate time.Time @@ -137,8 +140,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, ) // Start master client connection process (like shell and filer do) - ctx := context.Background() - go masterClient.KeepConnectedToMaster(ctx) + bgCtx, bgCancel := context.WithCancel(context.Background()) + go masterClient.KeepConnectedToMaster(bgCtx) lockManager := NewAdminLockManager(masterClient, adminLockClientName) presenceLock := newAdminPresenceLock(masterClient) @@ -159,6 +162,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, icebergPort: icebergPort, pluginLock: lockManager, adminPresenceLock: presenceLock, + bgCancel: bgCancel, } // Initialize topic retention purger @@ -256,11 +260,93 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, } else { server.plugin = plugin glog.V(0).Infof("Plugin enabled") + go server.monitorVacuumWorker(bgCtx) } return server } +// vacuumToggler abstracts the master's vacuum enable/disable for testing. +type vacuumToggler interface { + disableVacuum() error + enableVacuum() error +} + +// masterVacuumToggler implements vacuumToggler via gRPC calls to the master. +type masterVacuumToggler struct { + server *AdminServer +} + +func (m *masterVacuumToggler) disableVacuum() error { + return m.server.WithMasterClient(func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := client.DisableVacuum(ctx, &master_pb.DisableVacuumRequest{ByPlugin: true}) + return err + }) +} + +func (m *masterVacuumToggler) enableVacuum() error { + return m.server.WithMasterClient(func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := client.EnableVacuum(ctx, &master_pb.EnableVacuumRequest{ByPlugin: true}) + return err + }) +} + +// syncVacuumState performs a single sync step: checks if a vacuum-capable worker +// is present and calls disable/enable accordingly. Returns the updated state +// and whether the call failed (for log dedup on retries). +func syncVacuumState(hasWorker bool, previouslyActive bool, toggler vacuumToggler, retrying bool) (active bool, failed bool) { + if hasWorker == previouslyActive { + return previouslyActive, false + } + if hasWorker { + if !retrying { + glog.V(0).Infof("Vacuum plugin worker connected, disabling master automatic vacuum") + } + if err := toggler.disableVacuum(); err != nil { + glog.Warningf("Failed to disable vacuum on master: %v", err) + return false, true // retry next tick + } + return true, false + } + if !retrying { + glog.V(0).Infof("Vacuum plugin worker disconnected, re-enabling master automatic vacuum") + } + if err := toggler.enableVacuum(); err != nil { + glog.Warningf("Failed to enable vacuum on master: %v", err) + return true, true // retry next tick + } + return false, false +} + +// monitorVacuumWorker polls the plugin registry for vacuum-capable workers and +// disables/enables the master's automatic scheduled vacuum accordingly. +func (s *AdminServer) monitorVacuumWorker(ctx context.Context) { + const pollInterval = 30 * time.Second + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + toggler := &masterVacuumToggler{server: s} + vacuumWorkerActive := false + retrying := false + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if s.plugin == nil { + continue + } + hasWorker := s.plugin.HasCapableWorker("vacuum") + vacuumWorkerActive, retrying = syncVacuumState(hasWorker, vacuumWorkerActive, toggler, retrying) + } + } +} + // loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files func (s *AdminServer) loadTaskConfigurationsFromPersistence() { if s.configPersistence == nil || !s.configPersistence.IsConfigured() { @@ -1500,6 +1586,11 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, func (s *AdminServer) Shutdown() { glog.V(1).Infof("Shutting down admin server...") + // Cancel background goroutines (vacuum monitor, etc.) + if s.bgCancel != nil { + s.bgCancel() + } + // Stop maintenance manager s.StopMaintenanceManager() if s.adminPresenceLock != nil { diff --git a/weed/admin/dash/vacuum_monitor_test.go b/weed/admin/dash/vacuum_monitor_test.go new file mode 100644 index 000000000..69b97f14c --- /dev/null +++ b/weed/admin/dash/vacuum_monitor_test.go @@ -0,0 +1,116 @@ +package dash + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + + adminplugin "github.com/seaweedfs/seaweedfs/weed/admin/plugin" +) + +// TestVacuumMonitorStateTransitions verifies the state transition logic that +// drives the vacuum monitor goroutine: the plugin registry correctly reports +// whether a vacuum-capable worker is present or absent. +func TestVacuumMonitorStateTransitions(t *testing.T) { + t.Parallel() + + opts := adminplugin.Options{ + DataDir: "", + } + p, err := adminplugin.New(opts) + if err != nil { + t.Fatalf("failed to create plugin: %v", err) + } + defer p.Shutdown() + + // Initially no workers => no capable worker. + if p.HasCapableWorker("vacuum") { + t.Fatalf("expected no capable worker initially") + } + + // Simulate state transition false -> true: worker connects. + p.WorkerConnectForTest(&plugin_pb.WorkerHello{ + WorkerId: "vacuum-worker-1", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: true, CanExecute: true}, + }, + }) + + if !p.HasCapableWorker("vacuum") { + t.Fatalf("expected capable worker after connect") + } + + // Simulate state transition true -> false: worker disconnects. + p.WorkerDisconnectForTest("vacuum-worker-1") + + // Give registry removal a moment (it's synchronous, but be safe). + time.Sleep(10 * time.Millisecond) + + if p.HasCapableWorker("vacuum") { + t.Fatalf("expected no capable worker after disconnect") + } +} + +// fakeToggler records disable/enable calls for testing. +type fakeToggler struct { + disableCalls int + enableCalls int +} + +func (f *fakeToggler) disableVacuum() error { + f.disableCalls++ + return nil +} + +func (f *fakeToggler) enableVacuum() error { + f.enableCalls++ + return nil +} + +func TestSyncVacuumState(t *testing.T) { + t.Parallel() + + t.Run("no change when state matches", func(t *testing.T) { + tog := &fakeToggler{} + result, failed := syncVacuumState(false, false, tog, false) + if result != false || failed { + t.Error("expected false, no failure") + } + result, failed = syncVacuumState(true, true, tog, false) + if result != true || failed { + t.Error("expected true, no failure") + } + if tog.disableCalls != 0 || tog.enableCalls != 0 { + t.Errorf("expected no calls, got disable=%d enable=%d", tog.disableCalls, tog.enableCalls) + } + }) + + t.Run("worker connects triggers disable", func(t *testing.T) { + tog := &fakeToggler{} + result, failed := syncVacuumState(true, false, tog, false) + if result != true || failed { + t.Error("expected true after disable, no failure") + } + if tog.disableCalls != 1 { + t.Errorf("expected 1 disable call, got %d", tog.disableCalls) + } + if tog.enableCalls != 0 { + t.Errorf("expected 0 enable calls, got %d", tog.enableCalls) + } + }) + + t.Run("worker disconnects triggers enable", func(t *testing.T) { + tog := &fakeToggler{} + result, failed := syncVacuumState(false, true, tog, false) + if result != false || failed { + t.Error("expected false after enable, no failure") + } + if tog.enableCalls != 1 { + t.Errorf("expected 1 enable call, got %d", tog.enableCalls) + } + if tog.disableCalls != 0 { + t.Errorf("expected 0 disable calls, got %d", tog.disableCalls) + } + }) +} diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index a4bb5e575..c04a66c78 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -688,6 +688,11 @@ func (r *Plugin) executeJobWithExecutor( } } +// HasCapableWorker checks if any non-stale worker has a capability for the given job type. +func (r *Plugin) HasCapableWorker(jobType string) bool { + return r.registry.HasCapableWorker(jobType) +} + func (r *Plugin) ListWorkers() []*WorkerSession { return r.registry.List() } @@ -1414,3 +1419,13 @@ func (s *streamSession) close() { close(s.outgoing) }) } + +// WorkerConnectForTest simulates a worker connecting (test helper). +func (r *Plugin) WorkerConnectForTest(hello *plugin_pb.WorkerHello) { + r.registry.UpsertFromHello(hello) +} + +// WorkerDisconnectForTest simulates a worker disconnecting (test helper). +func (r *Plugin) WorkerDisconnectForTest(workerID string) { + r.registry.Remove(workerID) +} diff --git a/weed/admin/plugin/registry.go b/weed/admin/plugin/registry.go index dafddc203..033a11fcd 100644 --- a/weed/admin/plugin/registry.go +++ b/weed/admin/plugin/registry.go @@ -120,6 +120,28 @@ func (r *Registry) List() []*WorkerSession { return out } +// HasCapableWorker checks if any non-stale worker session has a capability for the given job type. +// A worker is capable if its capabilities include the job type with CanDetect or CanExecute true. +func (r *Registry) HasCapableWorker(jobType string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + now := time.Now() + for _, session := range r.sessions { + if r.isSessionStaleLocked(session, now) { + continue + } + capability := session.Capabilities[jobType] + if capability == nil { + continue + } + if capability.CanDetect || capability.CanExecute { + return true + } + } + return false +} + // DetectableJobTypes returns sorted job types that currently have at least one detect-capable worker. func (r *Registry) DetectableJobTypes() []string { r.mu.RLock() diff --git a/weed/admin/plugin/registry_test.go b/weed/admin/plugin/registry_test.go index ff61b2b7a..1d331278c 100644 --- a/weed/admin/plugin/registry_test.go +++ b/weed/admin/plugin/registry_test.go @@ -319,3 +319,85 @@ func TestRegistryReturnsNoDetectorWhenAllWorkersStale(t *testing.T) { t.Fatalf("expected no detector when all workers are stale") } } + +func TestRegistryHasCapableWorkerWithDetectCapability(t *testing.T) { + t.Parallel() + + r := NewRegistry() + r.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-a", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: true, CanExecute: false}, + }, + }) + + if !r.HasCapableWorker("vacuum") { + t.Fatalf("expected HasCapableWorker to return true for detect-capable worker") + } + if r.HasCapableWorker("balance") { + t.Fatalf("expected HasCapableWorker to return false for unknown job type") + } +} + +func TestRegistryHasCapableWorkerWithExecuteCapability(t *testing.T) { + t.Parallel() + + r := NewRegistry() + r.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-a", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: false, CanExecute: true}, + }, + }) + + if !r.HasCapableWorker("vacuum") { + t.Fatalf("expected HasCapableWorker to return true for execute-capable worker") + } +} + +func TestRegistryHasCapableWorkerReturnsFalseWhenNoWorkers(t *testing.T) { + t.Parallel() + + r := NewRegistry() + if r.HasCapableWorker("vacuum") { + t.Fatalf("expected HasCapableWorker to return false with no workers") + } +} + +func TestRegistryHasCapableWorkerSkipsStaleWorkers(t *testing.T) { + t.Parallel() + + r := NewRegistry() + r.staleAfter = 2 * time.Second + + r.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-stale", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: true, CanExecute: true}, + }, + }) + + r.mu.Lock() + r.sessions["worker-stale"].LastSeenAt = time.Now().Add(-10 * time.Second) + r.mu.Unlock() + + if r.HasCapableWorker("vacuum") { + t.Fatalf("expected HasCapableWorker to return false when all workers are stale") + } +} + +func TestRegistryHasCapableWorkerIgnoresNilCapability(t *testing.T) { + t.Parallel() + + r := NewRegistry() + r.UpsertFromHello(&plugin_pb.WorkerHello{ + WorkerId: "worker-a", + Capabilities: []*plugin_pb.JobTypeCapability{ + {JobType: "vacuum", CanDetect: false, CanExecute: false}, + }, + }) + + if r.HasCapableWorker("vacuum") { + t.Fatalf("expected HasCapableWorker to return false when capability has no detect or execute") + } +} diff --git a/weed/pb/master.proto b/weed/pb/master.proto index 8289cd233..a11a31e99 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -350,11 +350,13 @@ message VacuumVolumeResponse { } message DisableVacuumRequest { + bool by_plugin = 1; } message DisableVacuumResponse { } message EnableVacuumRequest { + bool by_plugin = 1; } message EnableVacuumResponse { } diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 20a74f633..2ffb4d306 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -2650,6 +2650,7 @@ func (*VacuumVolumeResponse) Descriptor() ([]byte, []int) { type DisableVacuumRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + ByPlugin bool `protobuf:"varint,1,opt,name=by_plugin,json=byPlugin,proto3" json:"by_plugin,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2684,6 +2685,13 @@ func (*DisableVacuumRequest) Descriptor() ([]byte, []int) { return file_master_proto_rawDescGZIP(), []int{37} } +func (x *DisableVacuumRequest) GetByPlugin() bool { + if x != nil { + return x.ByPlugin + } + return false +} + type DisableVacuumResponse struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -2722,6 +2730,7 @@ func (*DisableVacuumResponse) Descriptor() ([]byte, []int) { type EnableVacuumRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + ByPlugin bool `protobuf:"varint,1,opt,name=by_plugin,json=byPlugin,proto3" json:"by_plugin,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2756,6 +2765,13 @@ func (*EnableVacuumRequest) Descriptor() ([]byte, []int) { return file_master_proto_rawDescGZIP(), []int{39} } +func (x *EnableVacuumRequest) GetByPlugin() bool { + if x != nil { + return x.ByPlugin + } + return false +} + type EnableVacuumResponse struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -2973,16 +2989,17 @@ func (*GetMasterConfigurationRequest) Descriptor() ([]byte, []int) { } type GetMasterConfigurationResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - MetricsAddress string `protobuf:"bytes,1,opt,name=metrics_address,json=metricsAddress,proto3" json:"metrics_address,omitempty"` - MetricsIntervalSeconds uint32 `protobuf:"varint,2,opt,name=metrics_interval_seconds,json=metricsIntervalSeconds,proto3" json:"metrics_interval_seconds,omitempty"` - StorageBackends []*StorageBackend `protobuf:"bytes,3,rep,name=storage_backends,json=storageBackends,proto3" json:"storage_backends,omitempty"` - DefaultReplication string `protobuf:"bytes,4,opt,name=default_replication,json=defaultReplication,proto3" json:"default_replication,omitempty"` - Leader string `protobuf:"bytes,5,opt,name=leader,proto3" json:"leader,omitempty"` - VolumeSizeLimitMB uint32 `protobuf:"varint,6,opt,name=volume_size_limit_m_b,json=volumeSizeLimitMB,proto3" json:"volume_size_limit_m_b,omitempty"` - VolumePreallocate bool `protobuf:"varint,7,opt,name=volume_preallocate,json=volumePreallocate,proto3" json:"volume_preallocate,omitempty"` - MaintenanceScripts string `protobuf:"bytes,8,opt,name=maintenance_scripts,json=maintenanceScripts,proto3" json:"maintenance_scripts,omitempty"` - MaintenanceSleepMinutes uint32 `protobuf:"varint,9,opt,name=maintenance_sleep_minutes,json=maintenanceSleepMinutes,proto3" json:"maintenance_sleep_minutes,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + MetricsAddress string `protobuf:"bytes,1,opt,name=metrics_address,json=metricsAddress,proto3" json:"metrics_address,omitempty"` + MetricsIntervalSeconds uint32 `protobuf:"varint,2,opt,name=metrics_interval_seconds,json=metricsIntervalSeconds,proto3" json:"metrics_interval_seconds,omitempty"` + StorageBackends []*StorageBackend `protobuf:"bytes,3,rep,name=storage_backends,json=storageBackends,proto3" json:"storage_backends,omitempty"` + DefaultReplication string `protobuf:"bytes,4,opt,name=default_replication,json=defaultReplication,proto3" json:"default_replication,omitempty"` + Leader string `protobuf:"bytes,5,opt,name=leader,proto3" json:"leader,omitempty"` + VolumeSizeLimitMB uint32 `protobuf:"varint,6,opt,name=volume_size_limit_m_b,json=volumeSizeLimitMB,proto3" json:"volume_size_limit_m_b,omitempty"` + VolumePreallocate bool `protobuf:"varint,7,opt,name=volume_preallocate,json=volumePreallocate,proto3" json:"volume_preallocate,omitempty"` + // MIGRATION: fields 8-9 help migrate master.toml [master.maintenance] to admin script plugin. Remove after March 2027. + MaintenanceScripts string `protobuf:"bytes,8,opt,name=maintenance_scripts,json=maintenanceScripts,proto3" json:"maintenance_scripts,omitempty"` + MaintenanceSleepMinutes uint32 `protobuf:"varint,9,opt,name=maintenance_sleep_minutes,json=maintenanceSleepMinutes,proto3" json:"maintenance_sleep_minutes,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -4525,10 +4542,12 @@ const file_master_proto_rawDesc = "" + "\n" + "collection\x18\x03 \x01(\tR\n" + "collection\"\x16\n" + - "\x14VacuumVolumeResponse\"\x16\n" + - "\x14DisableVacuumRequest\"\x17\n" + - "\x15DisableVacuumResponse\"\x15\n" + - "\x13EnableVacuumRequest\"\x16\n" + + "\x14VacuumVolumeResponse\"3\n" + + "\x14DisableVacuumRequest\x12\x1b\n" + + "\tby_plugin\x18\x01 \x01(\bR\bbyPlugin\"\x17\n" + + "\x15DisableVacuumResponse\"2\n" + + "\x13EnableVacuumRequest\x12\x1b\n" + + "\tby_plugin\x18\x01 \x01(\bR\bbyPlugin\"\x16\n" + "\x14EnableVacuumResponse\"\x93\x02\n" + "\x19VolumeMarkReadonlyRequest\x12\x0e\n" + "\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" + diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index daa250fa4..512dfd2aa 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -299,15 +299,24 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV } func (ms *MasterServer) DisableVacuum(ctx context.Context, req *master_pb.DisableVacuumRequest) (*master_pb.DisableVacuumResponse, error) { - - ms.Topo.DisableVacuum() + // The caller explicitly indicates whether this disable request comes + // from the vacuum plugin monitor. Track ownership so the safety net + // in the vacuum loop won't override an operator's intentional disable. + if req.GetByPlugin() { + ms.Topo.DisableVacuumByPlugin() + } else { + ms.Topo.DisableVacuum() + } resp := &master_pb.DisableVacuumResponse{} return resp, nil } func (ms *MasterServer) EnableVacuum(ctx context.Context, req *master_pb.EnableVacuumRequest) (*master_pb.EnableVacuumResponse, error) { - - ms.Topo.EnableVacuum() + if req.GetByPlugin() { + ms.Topo.EnableVacuumByPlugin() + } else { + ms.Topo.EnableVacuum() + } resp := &master_pb.EnableVacuumResponse{} return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d6dd51328..b65869107 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -185,6 +185,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se r.HandleFunc("/{fileId}", requestIDMiddleware(ms.redirectHandler)) } + ms.Topo.SetAdminServerConnectedFunc(ms.isAdminServerConnectedFunc) ms.Topo.StartRefreshWritableVolumes( ms.grpcDialOption, ms.option.GarbageThreshold, @@ -336,7 +337,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } } -func (ms *MasterServer) isAdminServerConnected() bool { +func (ms *MasterServer) isAdminServerConnectedFunc() bool { if ms == nil || ms.adminLocks == nil { return false } @@ -383,7 +384,7 @@ func (ms *MasterServer) startAdminScripts() { for { time.Sleep(time.Duration(sleepMinutes) * time.Minute) if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" { - if ms.isAdminServerConnected() { + if ms.isAdminServerConnectedFunc() { glog.V(1).Infof("Skipping master maintenance scripts because admin server is connected") continue } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 8b659fb03..2fe720327 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -7,6 +7,7 @@ import ( "math/rand/v2" "slices" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -45,7 +46,9 @@ type Topology struct { volumeSizeLimit uint64 replicationAsMin bool - isDisableVacuum bool + vacuumDisabledByOperator atomic.Bool // true when operator manually disables vacuum + vacuumDisabledByPlugin atomic.Bool // true when disabled by the vacuum plugin monitor + adminServerConnectedFunc func() bool // optional callback to check admin server presence Sequence sequence.Sequencer @@ -526,14 +529,49 @@ func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) { glog.Infof("[%s] reLink To topo ", dn.Id()) } +// IsVacuumDisabled returns true if vacuum is disabled by either the +// operator or the plugin monitor. +func (t *Topology) IsVacuumDisabled() bool { + return t.vacuumDisabledByOperator.Load() || t.vacuumDisabledByPlugin.Load() +} + +// DisableVacuum is called by the operator (shell command / manual RPC). +// Only sets the operator flag; does not affect the plugin flag. func (t *Topology) DisableVacuum() { - glog.V(0).Infof("DisableVacuum") - t.isDisableVacuum = true + glog.V(0).Infof("DisableVacuum (by operator)") + t.vacuumDisabledByOperator.Store(true) } +// EnableVacuum is called by the operator (shell command / manual RPC). +// Only clears the operator flag; does not affect the plugin flag. func (t *Topology) EnableVacuum() { - glog.V(0).Infof("EnableVacuum") - t.isDisableVacuum = false + glog.V(0).Infof("EnableVacuum (by operator)") + t.vacuumDisabledByOperator.Store(false) +} + +// DisableVacuumByPlugin is called by the admin server's vacuum monitor +// when a vacuum plugin worker connects. Only sets the plugin flag. +func (t *Topology) DisableVacuumByPlugin() { + glog.V(0).Infof("DisableVacuum (by plugin worker)") + t.vacuumDisabledByPlugin.Store(true) +} + +// EnableVacuumByPlugin is called by the admin server's vacuum monitor +// when a vacuum plugin worker disconnects. Only clears the plugin flag. +func (t *Topology) EnableVacuumByPlugin() { + glog.V(0).Infof("EnableVacuum (by plugin worker)") + t.vacuumDisabledByPlugin.Store(false) +} + +// IsVacuumDisabledByPlugin returns whether the plugin monitor has disabled vacuum. +func (t *Topology) IsVacuumDisabledByPlugin() bool { + return t.vacuumDisabledByPlugin.Load() +} + +// SetAdminServerConnectedFunc sets an optional callback used by the vacuum +// safety net to detect when the admin server has disconnected. +func (t *Topology) SetAdminServerConnectedFunc(f func() bool) { + t.adminServerConnectedFunc = f } func (t *Topology) GetTopologyId() string { diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index e3ad8f2dc..739136199 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -26,7 +26,15 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g go func(garbageThreshold float64) { for { if t.IsLeader() { - if !t.isDisableVacuum { + // Safety net: if vacuum was disabled by the plugin monitor but the + // admin server is no longer connected, automatically re-enable. + // This handles the case where the admin server crashes without + // cleanup. Does NOT override an operator's intentional disable. + if t.IsVacuumDisabledByPlugin() && t.adminServerConnectedFunc != nil && !t.adminServerConnectedFunc() { + glog.V(0).Infof("Admin server disconnected while vacuum was disabled by plugin, clearing plugin disable") + t.EnableVacuumByPlugin() + } + if !t.IsVacuumDisabled() { t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate, true) } } else { diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 83be65d7c..45901f777 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -250,11 +250,11 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate, automatic) } } - if automatic && t.isDisableVacuum { + if automatic && t.IsVacuumDisabled() { break } } - if automatic && t.isDisableVacuum { + if automatic && t.IsVacuumDisabled() { glog.V(0).Infof("Vacuum is disabled") break } @@ -321,11 +321,11 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL limiterLock.Unlock() } }) - if automatic && t.isDisableVacuum { + if automatic && t.IsVacuumDisabled() { break } } - if automatic && t.isDisableVacuum { + if automatic && t.IsVacuumDisabled() { break } if len(todoVolumeMap) == len(pendingVolumeMap) {