Browse Source

refactor code

pull/843/head
bingoohuang 7 years ago
parent
commit
13da2b84e0
  1. 6
      weed/command/master.go
  2. 2
      weed/server/master_grpc_server.go
  3. 12
      weed/server/master_server.go
  4. 14
      weed/server/master_server_handlers_admin.go
  5. 9
      weed/util/str.go

6
weed/command/master.go

@ -36,7 +36,7 @@ var (
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
volumeSizeLimitMiB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes. (MiB)")
volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
@ -64,13 +64,13 @@ func runMaster(cmd *Command, args []string) bool {
if *masterWhiteListOption != "" {
masterWhiteList = strings.Split(*masterWhiteListOption, ",")
}
if *volumeSizeLimitMB > 30*1000 {
if *volumeSizeLimitMiB > 30*1000 {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
*volumeSizeLimitMB, *volumePreallocate,
*volumeSizeLimitMiB, *volumePreallocate,
*mpulse, *defaultReplicaPlacement, *garbageThreshold,
masterWhiteList, *masterSecureKey,
)

2
weed/server/master_grpc_server.go

@ -64,7 +64,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
VolumeSizeLimit: uint64(ms.volumeSizeLimitMiB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
}); err != nil {
return err

12
weed/server/master_server.go

@ -20,7 +20,7 @@ import (
type MasterServer struct {
port int
metaFolder string
volumeSizeLimitMB uint
volumeSizeLimitMiB uint
preallocate int64
pulseSeconds int
defaultReplicaPlacement string
@ -39,7 +39,7 @@ type MasterServer struct {
}
func NewMasterServer(r *mux.Router, port int, metaFolder string,
volumeSizeLimitMB uint,
volumeSizeLimitMiB uint,
preallocate bool,
pulseSeconds int,
defaultReplicaPlacement string,
@ -50,11 +50,11 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
var preallocateSize int64
if preallocate {
preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
preallocateSize = int64(volumeSizeLimitMiB) * (1 << 20)
}
ms := &MasterServer{
port: port,
volumeSizeLimitMB: volumeSizeLimitMB,
volumeSizeLimitMiB: volumeSizeLimitMiB,
preallocate: preallocateSize,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
@ -63,9 +63,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMiB)*1024*1024, pulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMiB, "MiB")
ms.guard = security.NewGuard(whiteList, secureKey)

14
weed/server/master_server_handlers_admin.go

@ -74,7 +74,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " +
strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
} else {
count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
}
@ -106,11 +107,9 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
collection := r.FormValue("collection")
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil && len(machines) > 0 {
var url string
url := util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
if r.URL.RawQuery != "" {
url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
} else {
url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
url += "?" + r.URL.RawQuery
}
http.Redirect(w, r, url, http.StatusMovedPermanently)
} else {
@ -143,10 +142,7 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo
}
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
replicationString = ms.defaultReplicaPlacement
}
replicationString := util.EmptyThen(r.FormValue("replication"), ms.defaultReplicaPlacement)
replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err

9
weed/util/str.go

@ -0,0 +1,9 @@
package util
func EmptyThen(s1, s2 string) string {
if s1 == "" {
return s2
}
return s1
}
Loading…
Cancel
Save