diff --git a/weed/command/master.go b/weed/command/master.go index eee22810b..aed92fa33 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -43,12 +43,12 @@ var ( mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file") defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") - mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") - masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") - masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") + // mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") + masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") masterWhiteList []string ) @@ -87,7 +87,7 @@ func runMaster(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second) + listener, e := util.NewListener(listeningAddress, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } diff --git a/weed/command/server.go b/weed/command/server.go index 5bde22517..b4b98ba1e 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -215,7 +215,7 @@ func runServer(cmd *Command, args []string) bool { ) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) - masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second) + masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go index 54e2c2f75..ac3871c8c 100644 --- a/weed/server/volume_grpc_client.go +++ b/weed/server/volume_grpc_client.go @@ -53,10 +53,13 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { vs.store.Client = stream defer func() { vs.store.Client = nil }() + doneChan := make(chan error, 1) + go func() { for { in, err := stream.Recv() if err != nil { + doneChan <- err return } vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit() @@ -64,11 +67,22 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { } }() + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return err + } + + tickChan := time.NewTimer(sleepInterval).C + for { - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + select { + case <-tickChan: + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return err + } + case err := <-doneChan: return err } - time.Sleep(sleepInterval) } } diff --git a/weed/topology/node.go b/weed/topology/node.go index 7383f9576..206a9aff4 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -234,7 +234,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) - glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount) + glog.V(0).Infoln(n, "removes", node.Id()) } } diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 40019fdcd..e2dcfca06 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { } func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.GetVolumes() { - glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) + glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id()) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f46776992..8acd50d42 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -38,9 +38,11 @@ type Conn struct { } func (c *Conn) Read(b []byte) (count int, e error) { - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) - if err != nil { - return 0, err + if c.ReadTimeout != 0 { + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + if err != nil { + return 0, err + } } count, e = c.Conn.Read(b) if e == nil { @@ -50,9 +52,11 @@ func (c *Conn) Read(b []byte) (count int, e error) { } func (c *Conn) Write(b []byte) (count int, e error) { - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) - if err != nil { - return 0, err + if c.WriteTimeout != 0 { + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + if err != nil { + return 0, err + } } count, e = c.Conn.Write(b) if e == nil {