Browse Source

Healthz check for deadlocks (#4558)

pull/4560/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
5ee04d20fa
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      weed/server/raft_server_handlers.go
  2. 19
      weed/topology/node.go
  3. 22
      weed/topology/topology.go

21
weed/server/raft_server_handlers.go

@ -1,9 +1,12 @@
package weed_server package weed_server
import ( import (
"github.com/cenkalti/backoff/v4"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"net/http" "net/http"
"time"
) )
type ClusterStatusResult struct { type ClusterStatusResult struct {
@ -27,12 +30,24 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
} }
func (s *RaftServer) HealthzHandler(w http.ResponseWriter, r *http.Request) { func (s *RaftServer) HealthzHandler(w http.ResponseWriter, r *http.Request) {
_, err := s.topo.Leader()
leader, err := s.topo.Leader()
if err != nil { if err != nil {
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusOK)
return
}
if s.serverAddr == leader {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 20 * time.Millisecond
expBackoff.MaxInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Second
isLocked, err := backoff.RetryWithData(s.topo.IsChildLocked, expBackoff)
glog.Errorf("HealthzHandler: %+v", err)
if isLocked {
w.WriteHeader(http.StatusLocked)
return
}
} }
w.WriteHeader(http.StatusOK)
} }
func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {

19
weed/topology/node.go

@ -34,11 +34,13 @@ type Node interface {
IsDataNode() bool IsDataNode() bool
IsRack() bool IsRack() bool
IsDataCenter() bool IsDataCenter() bool
IsLocked() bool
Children() []Node Children() []Node
Parent() Node Parent() Node
GetValue() interface{} //get reference to the topology,dc,rack,datanode GetValue() interface{} //get reference to the topology,dc,rack,datanode
} }
type NodeImpl struct { type NodeImpl struct {
diskUsages *DiskUsages diskUsages *DiskUsages
id NodeId id NodeId
@ -122,24 +124,37 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption
func (n *NodeImpl) IsDataNode() bool { func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode" return n.nodeType == "DataNode"
} }
func (n *NodeImpl) IsRack() bool { func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack" return n.nodeType == "Rack"
} }
func (n *NodeImpl) IsDataCenter() bool { func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter" return n.nodeType == "DataCenter"
} }
func (n *NodeImpl) IsLocked() (isTryLock bool) {
if isTryLock = n.TryRLock(); isTryLock {
n.RUnlock()
}
return !isTryLock
}
func (n *NodeImpl) String() string { func (n *NodeImpl) String() string {
if n.parent != nil { if n.parent != nil {
return n.parent.String() + ":" + string(n.id) return n.parent.String() + ":" + string(n.id)
} }
return string(n.id) return string(n.id)
} }
func (n *NodeImpl) Id() NodeId { func (n *NodeImpl) Id() NodeId {
return n.id return n.id
} }
func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts { func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
return n.diskUsages.getOrCreateDisk(diskType) return n.diskUsages.getOrCreateDisk(diskType)
} }
func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 { func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
t := n.getOrCreateDisk(option.DiskType) t := n.getOrCreateDisk(option.DiskType)
freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount) freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
@ -152,6 +167,7 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
func (n *NodeImpl) SetParent(node Node) { func (n *NodeImpl) SetParent(node Node) {
n.parent = node n.parent = node
} }
func (n *NodeImpl) Children() (ret []Node) { func (n *NodeImpl) Children() (ret []Node) {
n.RLock() n.RLock()
defer n.RUnlock() defer n.RUnlock()
@ -160,12 +176,15 @@ func (n *NodeImpl) Children() (ret []Node) {
} }
return ret return ret
} }
func (n *NodeImpl) Parent() Node { func (n *NodeImpl) Parent() Node {
return n.parent return n.parent
} }
func (n *NodeImpl) GetValue() interface{} { func (n *NodeImpl) GetValue() interface{} {
return n.value return n.value
} }
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) { func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
n.RLock() n.RLock()
defer n.RUnlock() defer n.RUnlock()

22
weed/topology/topology.go

@ -76,6 +76,28 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
return t return t
} }
func (t *Topology) IsChildLocked() (bool, error) {
if t.IsLocked() {
return true, errors.New("topology is locked")
}
for _, dcNode := range t.Children() {
if dcNode.IsLocked() {
return true, fmt.Errorf("topology child %s is locked", dcNode.String())
}
for _, rackNode := range dcNode.Children() {
if rackNode.IsLocked() {
return true, fmt.Errorf("dc %s child %s is locked", dcNode.String(), rackNode.String())
}
for _, dataNode := range rackNode.Children() {
if dataNode.IsLocked() {
return true, fmt.Errorf("rack %s child %s is locked", rackNode.String(), dataNode.Id())
}
}
}
}
return false, nil
}
func (t *Topology) IsLeader() bool { func (t *Topology) IsLeader() bool {
t.RaftServerAccessLock.RLock() t.RaftServerAccessLock.RLock()
defer t.RaftServerAccessLock.RUnlock() defer t.RaftServerAccessLock.RUnlock()

Loading…
Cancel
Save