Browse Source

Merge pull request #1433 from kmlebedev/volumePreStop

stop send heartbeat before stop volume server
pull/1444/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
fdb59d5968
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      weed/command/server.go
  2. 8
      weed/command/volume.go
  3. 4
      weed/server/volume_grpc_client_to_master.go
  4. 2
      weed/server/volume_server.go

1
weed/command/server.go

@ -98,6 +98,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = &False serverOptions.v.pprof = &False
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")

8
weed/command/volume.go

@ -55,6 +55,7 @@ type VolumeServerOptions struct {
fileSizeLimitMB *int fileSizeLimitMB *int
minFreeSpacePercents []float32 minFreeSpacePercents []float32
pprof *bool pprof *bool
preStopSeconds *int
// pulseSeconds *int // pulseSeconds *int
} }
@ -66,6 +67,7 @@ func init() {
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond, *v.compactionMBPerSecond,
*v.fileSizeLimitMB, *v.fileSizeLimitMB,
) )
// starting grpc server // starting grpc server
grpcS := v.startGrpcService(volumeServer) grpcS := v.startGrpcService(volumeServer)
@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
fmt.Println("volume server has be killed") fmt.Println("volume server has be killed")
var startTime time.Time var startTime time.Time
// Stop heartbeats
fmt.Println("stop send heartbeat and sleep %d sec", v.preStopSeconds)
volumeServer.SendHeartbeat = false
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
fmt.Println("end sleep 20 sec")
// firstly, stop the public http service to prevent from receiving new user request // firstly, stop the public http service to prevent from receiving new user request
if nil != publicHttpDown { if nil != publicHttpDown {
startTime = time.Now() startTime = time.Now()

4
weed/server/volume_grpc_client_to_master.go

@ -168,11 +168,15 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
return "", err return "", err
} }
case <-volumeTickChan: case <-volumeTickChan:
if vs.SendHeartbeat {
glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err return "", err
} }
} else {
glog.V(5).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port)
}
case <-ecShardTickChan: case <-ecShardTickChan:
glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {

2
weed/server/volume_server.go

@ -31,6 +31,7 @@ type VolumeServer struct {
MetricsAddress string MetricsAddress string
MetricsIntervalSec int MetricsIntervalSec int
fileSizeLimitBytes int64 fileSizeLimitBytes int64
SendHeartbeat bool
} }
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@ -66,6 +67,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
SendHeartbeat: true,
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)

Loading…
Cancel
Save