Browse Source

healthz check to avoid drain pod with last replicas

pull/2680/head
Konstantin Lebedev 3 years ago
parent
commit
9ea09cc41c
  1. 1
      weed/server/volume_server.go
  2. 19
      weed/server/volume_server_handlers_admin.go
  3. 6
      weed/topology/store_replicate.go

1
weed/server/volume_server.go

@ -98,6 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
handleStaticResources(adminMux) handleStaticResources(adminMux)
adminMux.HandleFunc("/status", vs.statusHandler) adminMux.HandleFunc("/status", vs.statusHandler)
adminMux.HandleFunc("/healthz", vs.healthzHandler)
if signingKey == "" || enableUiAccess { if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments // only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)

19
weed/server/volume_server_handlers_admin.go

@ -1,6 +1,7 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/seaweedfs/weed/topology"
"net/http" "net/http"
"path/filepath" "path/filepath"
@ -9,6 +10,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
volumeInfos := vs.store.VolumeInfos()
for _, vinfo := range volumeInfos {
if len(vinfo.Collection) == 0 {
continue
}
if vinfo.ReplicaPlacement.GetCopyCount() > 1 {
_, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster)
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
}
}
w.WriteHeader(http.StatusOK)
}
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{}) m := make(map[string]interface{})

6
weed/topology/store_replicate.go

@ -29,7 +29,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
// this is the initial request // this is the initial request
remoteLocations, err = getWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
if err != nil { if err != nil {
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -114,7 +114,7 @@ func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOp
var remoteLocations []operation.Location var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
remoteLocations, err = getWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn)
remoteLocations, err = GetWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn)
if err != nil { if err != nil {
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -174,7 +174,7 @@ func DistributedOperation(locations []operation.Location, op func(location opera
return ret.Error() return ret.Error()
} }
func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) {
func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) {
v := s.GetVolume(volumeId) v := s.GetVolume(volumeId)
if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 {

Loading…
Cancel
Save