Browse Source

command: implement graceful shutdown for mini cluster

- Introduce MiniClusterCtx to coordinate shutdown across mini services
- Update Master, Volume, Filer, S3, and WebDAV servers to respect context cancellation
- Ensure all resources are cleaned up properly during test teardown
- Integrate MiniClusterCtx in s3tables integration tests
pull/8147/head
Chris Lu 6 days ago
parent
commit
01c17478ae
  1. 2
      test/s3tables/s3tables_integration_test.go
  2. 24
      weed/command/filer.go
  3. 8
      weed/command/master.go
  4. 22
      weed/command/mini.go
  5. 21
      weed/command/s3.go
  6. 12
      weed/command/volume.go
  7. 11
      weed/command/webdav.go

2
test/s3tables/s3tables_integration_test.go

@ -405,7 +405,9 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
if cmd.Name() == "mini" && cmd.Run != nil {
cmd.Flag.Parse(os.Args[1:])
args := cmd.Flag.Args()
command.MiniClusterCtx = ctx
cmd.Run(cmd, args)
command.MiniClusterCtx = nil
return
}
}

24
weed/command/filer.go

@ -481,8 +481,16 @@ func (fo *FilerOptions) startFiler() {
}
}()
}
if err := newHttpServer(defaultMux, tlsConfig).ServeTLS(filerListener, "", ""); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
httpS := newHttpServer(defaultMux, tlsConfig)
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
httpS.Shutdown(context.Background())
grpcS.Stop()
}()
}
if err := httpS.ServeTLS(filerListener, "", ""); err != nil && err != http.ErrServerClosed {
glog.Fatalf("Filer Fail to serve: %v", err)
}
} else {
if filerLocalListener != nil {
@ -492,8 +500,16 @@ func (fo *FilerOptions) startFiler() {
}
}()
}
if err := newHttpServer(defaultMux, nil).Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
httpS := newHttpServer(defaultMux, nil)
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
httpS.Shutdown(context.Background())
grpcS.Stop()
}()
}
if err := httpS.Serve(filerListener); err != nil && err != http.ErrServerClosed {
glog.Fatalf("Filer Fail to serve: %v", err)
}
}
}

8
weed/command/master.go

@ -311,7 +311,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
ms.Topo.HashicorpRaft.LeadershipTransfer()
}
})
select {}
if MiniClusterCtx != nil {
<-MiniClusterCtx.Done()
ms.Shutdown()
grpcS.Stop()
} else {
select {}
}
}
func isSingleMasterMode(peers string) bool {

22
weed/command/mini.go

@ -59,6 +59,8 @@ var (
miniEnableS3 *bool
miniEnableAdminUI *bool
miniS3IamReadOnly *bool
// MiniClusterCtx is the context for the mini cluster. If set, the mini cluster will stop when the context is cancelled.
MiniClusterCtx context.Context
)
func init() {
@ -821,7 +823,12 @@ func runMini(cmd *Command, args []string) bool {
// Save configuration to file for persistence and documentation
saveMiniConfiguration(*miniDataFolders)
select {}
if MiniClusterCtx != nil {
<-MiniClusterCtx.Done()
} else {
select {}
}
return true
}
// startMiniServices starts all mini services with proper dependency coordination
@ -928,7 +935,12 @@ func startS3Service() {
func startMiniAdminWithWorker(allServicesReady chan struct{}) {
defer close(allServicesReady) // Ensure channel is always closed on all paths
ctx := context.Background()
var ctx context.Context
if MiniClusterCtx != nil {
ctx = MiniClusterCtx
} else {
ctx = context.Background()
}
// Determine bind IP for health checks
bindIp := getBindIp()
@ -1101,6 +1113,12 @@ func startMiniWorker() {
// Metrics server is already started in the main init function above, so no need to start it again here
// Start the worker
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
workerInstance.Stop()
}()
}
err = workerInstance.Start()
if err != nil {
glog.Fatalf("Failed to start worker: %v", err)

21
weed/command/s3.go

@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"runtime"
"strings"
@ -405,7 +406,15 @@ func (s3opt *S3Options) startS3Server() bool {
}
}()
}
if err = newHttpServer(router, tlsConfig).ServeTLS(s3ApiListener, "", ""); err != nil {
httpS := newHttpServer(router, tlsConfig)
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
httpS.Shutdown(context.Background())
grpcS.Stop()
}()
}
if err = httpS.ServeTLS(s3ApiListener, "", ""); err != nil && err != http.ErrServerClosed {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
} else {
@ -438,7 +447,15 @@ func (s3opt *S3Options) startS3Server() bool {
}
}()
}
if err = newHttpServer(router, nil).Serve(s3ApiListener); err != nil {
httpS := newHttpServer(router, nil)
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
httpS.Shutdown(context.Background())
grpcS.Stop()
}()
}
if err = httpS.Serve(s3ApiListener); err != nil && err != http.ErrServerClosed {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
}

12
weed/command/volume.go

@ -319,8 +319,16 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
stopChan <- true
})
select {
case <-stopChan:
if MiniClusterCtx != nil {
select {
case <-stopChan:
case <-MiniClusterCtx.Done():
shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer)
}
} else {
select {
case <-stopChan:
}
}
}

11
weed/command/webdav.go

@ -137,14 +137,21 @@ func (wo *WebDavOption) startWebDav() bool {
glog.Fatalf("WebDav Server listener on %s error: %v", listenAddress, err)
}
if MiniClusterCtx != nil {
go func() {
<-MiniClusterCtx.Done()
httpS.Shutdown(context.Background())
}()
}
if *wo.tlsPrivateKey != "" {
glog.V(0).Infof("Start Seaweed WebDav Server %s at https %s", version.Version(), listenAddress)
if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil {
if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil && err != http.ErrServerClosed {
glog.Fatalf("WebDav Server Fail to serve: %v", err)
}
} else {
glog.V(0).Infof("Start Seaweed WebDav Server %s at http %s", version.Version(), listenAddress)
if err = httpS.Serve(webDavListener); err != nil {
if err = httpS.Serve(webDavListener); err != nil && err != http.ErrServerClosed {
glog.Fatalf("WebDav Server Fail to serve: %v", err)
}
}

Loading…
Cancel
Save