From 9e17f6baffd5dd7cc404d831d18dd618b9fe5049 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 19 Feb 2026 20:45:41 -0800 Subject: [PATCH] mini clean shutdown --- .../table-buckets/s3tables_integration_test.go | 7 ++++--- weed/command/filer.go | 7 ++++++- weed/command/master.go | 6 +++++- weed/command/mini.go | 7 +++++++ weed/command/s3.go | 6 +++++- weed/command/volume.go | 11 ++++++++--- weed/s3api/s3api_server.go | 10 +++++----- weed/server/filer_server.go | 4 ++-- weed/server/volume_grpc_client_to_master.go | 16 ++++++++-------- weed/server/volume_server.go | 7 ++++--- 10 files changed, 54 insertions(+), 27 deletions(-) diff --git a/test/s3tables/table-buckets/s3tables_integration_test.go b/test/s3tables/table-buckets/s3tables_integration_test.go index 1adbfd946..0f3f71279 100644 --- a/test/s3tables/table-buckets/s3tables_integration_test.go +++ b/test/s3tables/table-buckets/s3tables_integration_test.go @@ -685,9 +685,9 @@ func (c *TestCluster) Stop() { if c.cancel != nil { c.cancel() } - // Give services time to shut down gracefully + // Give services time to shut down aggressively if c.isRunning { - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } // Wait for the mini goroutine to finish done := make(chan struct{}) @@ -695,11 +695,12 @@ func (c *TestCluster) Stop() { c.wg.Wait() close(done) }() - timer := time.NewTimer(2 * time.Second) + timer := time.NewTimer(5 * time.Second) defer timer.Stop() select { case <-done: // Goroutine finished + time.Sleep(200 * time.Millisecond) // Extra buffer for port release case <-timer.C: // Timeout - goroutine doesn't respond to context cancel // This may indicate the mini cluster didn't shut down cleanly diff --git a/weed/command/filer.go b/weed/command/filer.go index dfd08a9eb..ab6164460 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -341,7 +341,12 @@ func (fo *FilerOptions) startFiler() { glog.V(0).Infof("Initialized credential manager: %s", credentialManager.GetStoreName()) } - fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ + // ... some code omitted for simplicity ... + fctx := MiniClusterCtx + if fctx == nil { + fctx = context.Background() + } + fs, nfs_err := weed_server.NewFilerServer(fctx, defaultMux, publicVolumeMux, &weed_server.FilerOption{ Masters: fo.masters, FilerGroup: *fo.filerGroup, Collection: *fo.collection, diff --git a/weed/command/master.go b/weed/command/master.go index 565c9cc58..110be5d1f 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -268,7 +268,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { }() } - go ms.MasterClient.KeepConnectedToMaster(context.Background()) + masterCtx := MiniClusterCtx + if masterCtx == nil { + masterCtx = context.Background() + } + go ms.MasterClient.KeepConnectedToMaster(masterCtx) // start http server var ( diff --git a/weed/command/mini.go b/weed/command/mini.go index 0c4e00666..b9b43af51 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -1107,6 +1107,13 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Wait for worker to be ready by polling its gRPC port workerGrpcAddr := fmt.Sprintf("%s:%d", bindIp, *miniAdminOptions.grpcPort) waitForWorkerReady(workerGrpcAddr) + + if ctx != nil { + go func() { + <-ctx.Done() + glog.V(0).Infof("Cluster context cancelled, initiating aggressive shutdown of mini services...") + }() + } } // waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready diff --git a/weed/command/s3.go b/weed/command/s3.go index 115147b69..873ed9f00 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -288,7 +288,11 @@ func (s3opt *S3Options) startS3Server() bool { *s3opt.bindIp = "0.0.0.0" } - s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + s3ctx := MiniClusterCtx + if s3ctx == nil { + s3ctx = context.Background() + } + s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(s3ctx, router, &s3api.S3ApiServerOption{ Filers: filerAddresses, Masters: masterAddresses, Port: *s3opt.port, diff --git a/weed/command/volume.go b/weed/command/volume.go index 601849f28..a82983ae6 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "net/http" httppprof "net/http/pprof" @@ -267,7 +268,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // Determine volume server ID: if not specified, use ip:port volumeServerId := util.GetVolumeServerId(*v.id, *v.ip, *v.port) - volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, + vctx := MiniClusterCtx + if vctx == nil { + vctx = context.Background() + } + volumeServer := weed_server.NewVolumeServer(vctx, volumeMux, publicVolumeMux, *v.ip, *v.port, *v.portGrpc, *v.publicUrl, volumeServerId, v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, @@ -349,8 +354,8 @@ func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, glog.Warningf("stop the cluster http server failed, %v", err) } - glog.V(0).Infof("graceful stop gRPC ...") - grpcS.GracefulStop() + glog.V(0).Infof("stop gRPC ...") + grpcS.Stop() volumeServer.Shutdown() diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 5193199b2..bbb37e317 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -82,11 +82,11 @@ type S3ApiServer struct { cipher bool // encrypt data on volume servers } -func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { - return NewS3ApiServerWithStore(router, option, "") +func NewS3ApiServer(ctx context.Context, router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { + return NewS3ApiServerWithStore(ctx, router, option, "") } -func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { +func NewS3ApiServerWithStore(ctx context.Context, router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { if len(option.Filers) == 0 { return nil, fmt.Errorf("at least one filer address is required") } @@ -132,7 +132,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl } masterClient := wdclient.NewMasterClient(option.GrpcDialOption, option.FilerGroup, cluster.S3Type, pb.ServerAddress(util.JoinHostPort(clientHost, option.GrpcPort)), "", "", *pb.NewServiceDiscoveryFromMap(masterMap)) // Start the master client connection loop - required for GetMaster() to work - go masterClient.KeepConnectedToMaster(context.Background()) + go masterClient.KeepConnectedToMaster(ctx) filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter, &wdclient.FilerClientOption{ MasterClient: masterClient, @@ -277,7 +277,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl }) // Start bucket size metrics collection in background - go s3ApiServer.startBucketSizeMetricsLoop(context.Background()) + go s3ApiServer.startBucketSizeMetricsLoop(ctx) return s3ApiServer, nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 757374ea1..20bc93337 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -119,7 +119,7 @@ type FilerServer struct { CredentialManager *credential.CredentialManager } -func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { +func NewFilerServer(ctx context.Context, defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { v := util.GetViper() signingKey := v.GetString("jwt.filer_signing.key") @@ -194,7 +194,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.checkWithMaster() go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) - go fs.filer.MasterClient.KeepConnectedToMaster(context.Background()) + go fs.filer.MasterClient.KeepConnectedToMaster(ctx) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.SetDefault("filer.options.buckets_folder", "/buckets") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index a4349771b..371d50bc8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -26,11 +26,11 @@ func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress { return vs.currentMaster } -func (vs *VolumeServer) checkWithMaster() (err error) { +func (vs *VolumeServer) checkWithMaster(ctx context.Context) (err error) { for { for _, master := range vs.SeedMasterNodes { err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + resp, err := masterClient.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) } @@ -48,7 +48,7 @@ func (vs *VolumeServer) checkWithMaster() (err error) { } } -func (vs *VolumeServer) heartbeat() { +func (vs *VolumeServer) heartbeat(ctx context.Context) { glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes) vs.store.SetDataCenter(vs.dataCenter) @@ -68,7 +68,7 @@ func (vs *VolumeServer) heartbeat() { master = newLeader } vs.store.MasterAddress = master - newLeader, err = vs.doHeartbeatWithRetry(master, grpcDialOption, vs.pulsePeriod, duplicateRetryCount) + newLeader, err = vs.doHeartbeatWithRetry(ctx, master, grpcDialOption, vs.pulsePeriod, duplicateRetryCount) if err != nil { glog.V(0).Infof("heartbeat to %s error: %v", master, err) @@ -106,13 +106,13 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { return false } -func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { - return vs.doHeartbeatWithRetry(masterAddress, grpcDialOption, sleepInterval, 0) +func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { + return vs.doHeartbeatWithRetry(ctx, masterAddress, grpcDialOption, sleepInterval, 0) } -func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration, duplicateRetryCount int) (newLeader pb.ServerAddress, err error) { +func (vs *VolumeServer) doHeartbeatWithRetry(ctx context.Context, masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration, duplicateRetryCount int) (newLeader pb.ServerAddress, err error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() grpcConnection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 6bc3a6898..f5a8a33c5 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "fmt" "net/http" "sync" @@ -56,7 +57,7 @@ type VolumeServer struct { stopChan chan bool } -func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, +func NewVolumeServer(ctx context.Context, adminMux, publicMux *http.ServeMux, ip string, port int, grpcPort int, publicUrl string, id string, folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, idxFolder string, @@ -116,7 +117,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...) vs.SeedMasterNodes = masterNodes - vs.checkWithMaster() + vs.checkWithMaster(ctx) vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, id, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) @@ -143,7 +144,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, stats.VolumeServerConcurrentDownloadLimit.Set(float64(vs.concurrentDownloadLimit)) stats.VolumeServerConcurrentUploadLimit.Set(float64(vs.concurrentUploadLimit)) - go vs.heartbeat() + go vs.heartbeat(ctx) go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec) return vs