{{ range $entry := .Breadcrumbs }}
-
+
{{ $entry.Name }}
{{ end }}
@@ -69,11 +78,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`
{{if $entry.IsDirectory}}

-
+
{{ $entry.Name }}
{{else}}
-
+
{{ $entry.Name }}
{{end}}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 108892f92..93ecefb74 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -12,7 +12,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -73,9 +72,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 282c75679..168975fb6 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/raft"
@@ -184,6 +185,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_
resp := &master_pb.GetMasterConfigurationResponse{
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}
return resp, nil
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 7595c0171..34235384f 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} else {
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
- http.Redirect(w, r, url, http.StatusMovedPermanently)
+ http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index f81ec649d..9296c63e9 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -196,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
}
+func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerLeaveResponse{}
+
+ vs.StopHeartbeat()
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
resp := &volume_server_pb.VolumeNeedleStatusResponse{}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 3a1b95f26..0c0cc39c1 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"time"
"google.golang.org/grpc"
@@ -21,6 +22,27 @@ import (
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
+
+func (vs *VolumeServer) checkWithMaster() (err error) {
+ for _, master := range vs.SeedMasterNodes {
+ err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ vs.MetricsAddress, vs.MetricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ backend.LoadFromPbStorageBackends(resp.StorageBackends)
+ return nil
+ })
+ if err == nil {
+ return
+ } else {
+ glog.V(0).Infof("checkWithMaster %s: %v", master, err)
+ }
+ }
+ return
+}
+
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
@@ -31,7 +53,7 @@ func (vs *VolumeServer) heartbeat() {
var err error
var newLeader string
- for {
+ for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
// the new leader may actually is the same master
@@ -52,20 +74,35 @@ func (vs *VolumeServer) heartbeat() {
newLeader = ""
vs.store.MasterAddress = ""
}
+ if !vs.isHeartbeating {
+ break
+ }
}
}
}
+func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
+ if !vs.isHeartbeating {
+ return true
+ }
+ vs.isHeartbeating = false
+ vs.stopChan <- true
+ return false
+}
+
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(context.Background())
+ stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -96,13 +133,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
doneChan <- nil
return
}
- if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
- vs.MetricsAddress = in.GetMetricsAddress()
- vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
- }
- if len(in.StorageBackends) > 0 {
- backend.LoadFromPbStorageBackends(in.StorageBackends)
- }
}
}()
@@ -168,14 +198,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
return "", err
}
case <-volumeTickChan:
- if vs.SendHeartbeat {
- glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
- if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
- return "", err
- }
- } else {
- glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port)
+ glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
}
case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
@@ -185,6 +211,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
case err = <-doneChan:
return
+ case <-vs.stopChan:
+ return
}
}
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 6612e9045..c600da21e 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -31,7 +31,8 @@ type VolumeServer struct {
MetricsAddress string
MetricsIntervalSec int
fileSizeLimitBytes int64
- SendHeartbeat bool
+ isHeartbeating bool
+ stopChan chan bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -67,9 +68,13 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
- SendHeartbeat: true,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
}
vs.SeedMasterNodes = masterNodes
+
+ vs.checkWithMaster()
+
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
@@ -93,10 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
go vs.heartbeat()
hostAddress := fmt.Sprintf("%s:%d", ip, port)
- go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
- func() (addr string, intervalSeconds int) {
- return vs.MetricsAddress, vs.MetricsIntervalSec
- })
+ go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, vs.MetricsAddress, vs.MetricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 07289e880..bb04678d6 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -95,7 +95,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
+ glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 57723ab0b..121c0d2bb 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
dir, name := util.FullPath(fullFilePath).DirAndName()
- return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature)
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
}
@@ -480,7 +480,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
}
@@ -552,9 +552,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
index 03c878e6a..02790b9e2 100644
--- a/weed/shell/command_bucket_delete.go
+++ b/weed/shell/command_bucket_delete.go
@@ -49,6 +49,6 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i
return fmt.Errorf("read buckets: %v", err)
}
- return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, 0)
+ return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil)
}
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 1ddb6a490..bb280b7d9 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -28,7 +28,7 @@ func (c *commandEcBalance) Help() string {
Algorithm:
- For each type of volume server (different max volume count limit){
+ func EcBalance() {
for each collection:
balanceEcVolumes(collectionName)
for each rack:
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 0db119d3c..a808335eb 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -173,6 +173,16 @@ type EcNode struct {
freeEcSlot int
}
+func (ecNode *EcNode) localShardIdCount(vid uint32) int {
+ for _, ecShardInfo := range ecNode.info.EcShardInfos {
+ if vid == ecShardInfo.Id {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ return shardBits.ShardIdCount()
+ }
+ }
+ return 0
+}
+
type EcRack struct {
ecNodes map[EcNodeId]*EcNode
freeEcSlot int
@@ -191,7 +201,15 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes
}
// find out all volume servers with one slot left.
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter)
+
+ sortEcNodesByFreeslotsDecending(ecNodes)
+
+ return
+}
+
+func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
+ eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != dc {
return
}
@@ -205,9 +223,6 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes
})
totalFreeEcSlots += freeEcSlots
})
-
- sortEcNodesByFreeslotsDecending(ecNodes)
-
return
}
@@ -253,6 +268,10 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
})
}
+func divide(total, n int) float64 {
+ return float64(total) / float64(n)
+}
+
func ceilDivide(total, n int) int {
return int(math.Ceil(float64(total) / float64(n)))
}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index 8cba2d520..a097a3a4e 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -2,6 +2,7 @@ package shell
import (
"fmt"
+ "github.com/golang/protobuf/proto"
"io"
"sort"
@@ -69,6 +70,11 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, "%s\n", text)
+ bytes, _ := proto.Marshal(respLookupEntry.Entry)
+ gzippedBytes, _ := util.GzipData(bytes)
+ zstdBytes, _ := util.ZstdData(bytes)
+ fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes))
+
return nil
})
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 69e3c7fd9..53222ca29 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
"sort"
@@ -39,14 +40,15 @@ func (c *commandVolumeBalance) Help() string {
}
func balanceWritableVolumes(){
- idealWritableVolumes = totalWritableVolumes / numVolumeServers
+ idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes
for hasMovedOneVolume {
- sort all volume servers ordered by the number of local writable volumes
- pick the volume server A with the lowest number of writable volumes x
- pick the volume server B with the highest number of writable volumes y
- if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
- if B has a writable volume id v that A does not have {
- move writable volume v from A to B
+ sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax
+ pick the volume server B with the highest localWritableVolumeRatio y
+ for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax {
+ if y > localWritableVolumeRatio {
+ if B has a writable volume id v that A does not have, and satisfy v replication requirements {
+ move writable volume v from A to B
+ }
}
}
}
@@ -81,38 +83,33 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
- typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
+ volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
+ volumeReplicas, _ := collectVolumeReplicaLocations(resp)
- for maxVolumeCount, volumeServers := range typeToNodes {
- if len(volumeServers) < 2 {
- fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
- continue
+ if *collection == "EACH_COLLECTION" {
+ collections, err := ListCollectionNames(commandEnv, true, false)
+ if err != nil {
+ return err
}
- if *collection == "EACH_COLLECTION" {
- collections, err := ListCollectionNames(commandEnv, true, false)
- if err != nil {
- return err
- }
- for _, c := range collections {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
- return err
- }
- }
- } else if *collection == "ALL_COLLECTIONS" {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
- return err
- }
- } else {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ for _, c := range collections {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
-
+ } else if *collection == "ALL_COLLECTIONS" {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
+ return err
+ }
+ } else {
+ if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ return err
+ }
}
+
return nil
}
-func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
for _, n := range nodes {
@@ -125,7 +122,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
return !v.ReadOnly && v.Size < volumeSizeLimit
})
}
- if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
return err
}
@@ -140,22 +137,21 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit
return v.ReadOnly || v.Size >= volumeSizeLimit
})
}
- if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
return nil
}
-func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
- typeToNodes = make(map[uint64][]*Node)
+func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter string) (nodes []*Node) {
for _, dc := range t.DataCenterInfos {
if selectedDataCenter != "" && dc.Id != selectedDataCenter {
continue
}
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
- typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
+ nodes = append(nodes, &Node{
info: dn,
dc: dc.Id,
rack: r.Id,
@@ -173,6 +169,23 @@ type Node struct {
rack string
}
+func (n *Node) localVolumeRatio() float64 {
+ return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount))
+}
+
+func (n *Node) localVolumeNextRatio() float64 {
+ return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount))
+}
+
+func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
+ n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
+ for _, v := range n.info.VolumeInfos {
+ if fn(v) {
+ n.selectedVolumes[v.Id] = v
+ }
+ }
+}
+
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
sort.Slice(volumes, func(i, j int) bool {
return volumes[i].Size < volumes[j].Size
@@ -185,73 +198,146 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
-func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error {
- selectedVolumeCount := 0
+func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
+ selectedVolumeCount, volumeMaxCount := 0, 0
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
+ volumeMaxCount += int(dn.info.MaxVolumeCount)
}
- idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
+ idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount)
- hasMove := true
+ hasMoved := true
- for hasMove {
- hasMove = false
+ for hasMoved {
+ hasMoved = false
sort.Slice(nodes, func(i, j int) bool {
- // TODO sort by free volume slots???
- return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
+ return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio()
})
- emptyNode, fullNode := nodes[0], nodes[len(nodes)-1]
- if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes {
- // sort the volumes to move
- var candidateVolumes []*master_pb.VolumeInformationMessage
- for _, v := range fullNode.selectedVolumes {
- candidateVolumes = append(candidateVolumes, v)
+ fullNode := nodes[len(nodes)-1]
+ var candidateVolumes []*master_pb.VolumeInformationMessage
+ for _, v := range fullNode.selectedVolumes {
+ candidateVolumes = append(candidateVolumes, v)
+ }
+ sortCandidatesFn(candidateVolumes)
+
+ for i := 0; i < len(nodes)-1; i++ {
+ emptyNode := nodes[i]
+ if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) {
+ // no more volume servers with empty slots
+ break
}
- sortCandidatesFn(candidateVolumes)
-
- for _, v := range candidateVolumes {
- if v.ReplicaPlacement > 0 {
- if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
- // TODO this logic is too simple, but should work most of the time
- // Need a correct algorithm to handle all different cases
- continue
- }
- }
- if _, found := emptyNode.selectedVolumes[v.Id]; !found {
- if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
- delete(fullNode.selectedVolumes, v.Id)
- emptyNode.selectedVolumes[v.Id] = v
- hasMove = true
- break
- } else {
- return err
- }
- }
+ hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ // moved one volume
+ break
}
}
}
return nil
}
-func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
+func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
+
+ for _, v := range candidateVolumes {
+ hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, fullNode, v, emptyNode, applyBalancing)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ break
+ }
+ }
+ return
+}
+
+func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) {
+
+ if candidateVolume.ReplicaPlacement > 0 {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(candidateVolume.ReplicaPlacement))
+ if !isGoodMove(replicaPlacement, volumeReplicas[candidateVolume.Id], fullNode, emptyNode) {
+ return false, nil
+ }
+ }
+ if _, found := emptyNode.selectedVolumes[candidateVolume.Id]; !found {
+ if err = moveVolume(commandEnv, candidateVolume, fullNode, emptyNode, applyChange); err == nil {
+ adjustAfterMove(candidateVolume, volumeReplicas, fullNode, emptyNode)
+ return true, nil
+ } else {
+ return
+ }
+ }
+ return
+}
+
+func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyChange bool) error {
collectionPrefix := v.Collection + "_"
if v.Collection == "" {
collectionPrefix = ""
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
- if applyBalancing {
+ if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}
return nil
}
-func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
- node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
- for _, v := range node.info.VolumeInfos {
- if fn(v) {
- node.selectedVolumes[v.Id] = v
+func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool {
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id == targetNode.info.Id &&
+ replica.location.rack == targetNode.rack &&
+ replica.location.dc == targetNode.dc {
+ // never move to existing nodes
+ return false
+ }
+ }
+ dcs, racks := make(map[string]bool), make(map[string]int)
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id != sourceNode.info.Id {
+ dcs[replica.location.DataCenter()] = true
+ racks[replica.location.Rack()]++
+ }
+ }
+
+ dcs[targetNode.dc] = true
+ racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++
+
+ if len(dcs) > placement.DiffDataCenterCount+1 {
+ return false
+ }
+
+ if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 {
+ return false
+ }
+
+ for _, sameRackCount := range racks {
+ if sameRackCount > placement.SameRackCount+1 {
+ return false
+ }
+ }
+
+ return true
+
+}
+
+func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, emptyNode *Node) {
+ delete(fullNode.selectedVolumes, v.Id)
+ if emptyNode.selectedVolumes != nil {
+ emptyNode.selectedVolumes[v.Id] = v
+ }
+ existingReplicas := volumeReplicas[v.Id]
+ for _, replica := range existingReplicas {
+ if replica.location.dataNode.Id == fullNode.info.Id &&
+ replica.location.rack == fullNode.rack &&
+ replica.location.dc == fullNode.dc {
+ replica.location.dc = emptyNode.dc
+ replica.location.rack = emptyNode.rack
+ replica.location.dataNode = emptyNode.info
+ return
}
}
}
diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go
new file mode 100644
index 000000000..9e154dc00
--- /dev/null
+++ b/weed/shell/command_volume_balance_test.go
@@ -0,0 +1,155 @@
+package shell
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+type testMoveCase struct {
+ name string
+ replication string
+ replicas []*VolumeReplica
+ sourceLocation location
+ targetLocation location
+ expected bool
+}
+
+func TestIsGoodMove(t *testing.T) {
+
+ var tests = []testMoveCase{
+
+ {
+ name: "test 100 move to spread into proper data centers",
+ replication: "100",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test move to the same node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+
+ {
+ name: "test move to the same rack, but existing node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ expected: false,
+ },
+
+ {
+ name: "test move to the same rack, a new node",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test 010 move all to the same rack",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+
+ {
+ name: "test 010 move to spread racks",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+
+ {
+ name: "test 010 move to spread racks",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ targetLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ }
+
+ for _, tt := range tests {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
+ println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
+ sourceNode := &Node{
+ info: tt.sourceLocation.dataNode,
+ dc: tt.sourceLocation.dc,
+ rack: tt.sourceLocation.rack,
+ }
+ targetNode := &Node{
+ info: tt.targetLocation.dataNode,
+ dc: tt.targetLocation.dc,
+ rack: tt.targetLocation.rack,
+ }
+ if isGoodMove(replicaPlacement, tt.replicas, sourceNode, targetNode) != tt.expected {
+ t.Errorf("%s: expect %v move from %v to %s, replication:%v",
+ tt.name, tt.expected, tt.sourceLocation, tt.targetLocation, tt.replication)
+ }
+ }
+
+}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index e17f35c67..b32ccaaab 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -3,8 +3,8 @@ package shell
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
- "math/rand"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -27,16 +27,18 @@ func (c *commandVolumeFixReplication) Name() string {
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
- This command finds all under-replicated volumes, and finds volume servers with free slots.
+ This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
+
+ This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action
- volume.fix.replication # actually copying the volume files and mount the volume
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
- * do not run this too quick within seconds, since the new volume replica may take a few seconds
+ * do not run this too quickly within seconds, since the new volume replica may take a few seconds
to register itself to the master.
`
@@ -64,53 +66,89 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication
// collect all data nodes
- replicatedVolumeLocations := make(map[uint32][]location)
- replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
+ }
+
+ // find all under replicated volumes
+ var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[0]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ }
+ }
+
+ if len(overReplicatedVolumeIds) > 0 {
+ return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
+ }
+
+ if len(underReplicatedVolumeIds) == 0 {
+ return nil
+ }
+
+ // find the most under populated data nodes
+ keepDataNodesSorted(allLocations)
+
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+
+}
+
+func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
+ volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
- if v.ReplicaPlacement > 0 {
- replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
- replicatedVolumeInfo[v.Id] = v
- }
+ volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
+ location: &loc,
+ info: v,
+ })
}
allLocations = append(allLocations, loc)
})
+ return volumeReplicas, allLocations
+}
- // find all under replicated volumes
- underReplicatedVolumeLocations := make(map[uint32][]location)
- for vid, locations := range replicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(locations) {
- underReplicatedVolumeLocations[vid] = locations
+func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range overReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
+
+ replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+
+ fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
+
+ if !takeAction {
+ break
}
- }
- if len(underReplicatedVolumeLocations) == 0 {
- return fmt.Errorf("no under replicated volumes")
- }
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
+ }
- if len(allLocations) == 0 {
- return fmt.Errorf("no data nodes at all")
}
+ return nil
+}
- // find the most under populated data nodes
- keepDataNodesSorted(allLocations)
-
- for vid, locations := range underReplicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range underReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// ask the volume server to replicate the volume
- sourceNodes := underReplicatedVolumeLocations[vid]
- sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break
@@ -118,11 +156,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: volumeInfo.Id,
- SourceDataNode: sourceNode.dataNode.Id,
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
@@ -138,11 +176,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
if !foundNewLocation {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
-
return nil
}
@@ -182,22 +219,15 @@ func keepDataNodesSorted(dataNodes []location) {
return false
}
*/
-func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
- existingDataNodes := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataNodes[loc.String()] += 1
- }
- sameDataNodeCount := existingDataNodes[possibleLocation.String()]
- // avoid duplicated volume on the same data node
- if sameDataNodeCount > 0 {
+ existingDataCenters, _, existingDataNodes := countReplicas(replicas)
+
+ if _, found := existingDataNodes[possibleLocation.String()]; found {
+ // avoid duplicated volume on the same data node
return false
}
- existingDataCenters := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] += 1
- }
primaryDataCenters, _ := findTopKeys(existingDataCenters)
// ensure data center count is within limit
@@ -218,20 +248,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
}
// now this is one of the primary dcs
- existingRacks := make(map[string]int)
- for _, loc := range existingLocations {
- if loc.DataCenter() != possibleLocation.DataCenter() {
+ primaryDcRacks := make(map[string]int)
+ for _, replica := range replicas {
+ if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
- existingRacks[loc.Rack()] += 1
+ primaryDcRacks[replica.location.Rack()] += 1
}
- primaryRacks, _ := findTopKeys(existingRacks)
- sameRackCount := existingRacks[possibleLocation.Rack()]
+ primaryRacks, _ := findTopKeys(primaryDcRacks)
+ sameRackCount := primaryDcRacks[possibleLocation.Rack()]
// ensure rack count is within limit
- if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
// different from existing racks
- if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
// lack on different racks
return true
} else {
@@ -280,6 +310,11 @@ func isAmong(key string, keys []string) bool {
return false
}
+type VolumeReplica struct {
+ location *location
+ info *master_pb.VolumeInformationMessage
+}
+
type location struct {
dc string
rack string
@@ -305,3 +340,43 @@ func (l location) Rack() string {
func (l location) DataCenter() string {
return l.dc
}
+
+func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
+ mostRecent := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
+ mostRecent = replica
+ }
+ }
+ return mostRecent
+}
+
+func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
+ diffDc = make(map[string]int)
+ diffRack = make(map[string]int)
+ diffNode = make(map[string]int)
+ for _, replica := range replicas {
+ diffDc[replica.location.DataCenter()] += 1
+ diffRack[replica.location.Rack()] += 1
+ diffNode[replica.location.String()] += 1
+ }
+ return
+}
+
+func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
+
+ allSame := true
+ oldest := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
+ oldest = replica
+ allSame = false
+ }
+ }
+ if !allSame {
+ return oldest
+ }
+
+ // TODO what if all the replicas have the same timestamp?
+ return oldest
+}
diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go
index 4cfbd96aa..bb61be1ef 100644
--- a/weed/shell/command_volume_fix_replication_test.go
+++ b/weed/shell/command_volume_fix_replication_test.go
@@ -8,11 +8,11 @@ import (
)
type testcase struct {
- name string
- replication string
- existingLocations []location
- possibleLocation location
- expected bool
+ name string
+ replication string
+ replicas []*VolumeReplica
+ possibleLocation location
+ expected bool
}
func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
@@ -21,8 +21,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 100 negative",
replication: "100",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: false,
@@ -30,8 +32,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 100 positive",
replication: "100",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: true,
@@ -39,10 +43,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 022 positive",
replication: "022",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: true,
@@ -50,10 +60,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 022 negative",
replication: "022",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -61,10 +77,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 positive",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: true,
@@ -72,10 +94,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 negative extra dc",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -83,10 +111,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
{
name: "test 210 moved from 200 negative extra data node",
replication: "210",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -103,9 +137,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 same existing rack",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -113,9 +151,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 negative",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: false,
@@ -123,9 +165,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 different existing racks",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -133,9 +179,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) {
{
name: "test 011 different existing racks negative",
replication: "011",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: false,
@@ -152,8 +202,10 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 001",
replication: "001",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: true,
@@ -161,9 +213,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 positive",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true,
@@ -171,9 +227,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 negative, repeat the same node",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
expected: false,
@@ -181,10 +241,16 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) {
{
name: "test 002 negative, enough node already",
replication: "002",
- existingLocations: []location{
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
- {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
},
possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
expected: false,
@@ -199,9 +265,9 @@ func runTests(tests []testcase, t *testing.T) {
for _, tt := range tests {
replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
- if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected {
+ if satisfyReplicaPlacement(replicaPlacement, tt.replicas, tt.possibleLocation) != tt.expected {
t.Errorf("%s: expect %v add %v to %s %+v",
- tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations)
+ tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.replicas)
}
}
}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
new file mode 100644
index 000000000..214783ee1
--- /dev/null
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -0,0 +1,208 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "io"
+ "sort"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeServerEvacuate{})
+}
+
+type commandVolumeServerEvacuate struct {
+}
+
+func (c *commandVolumeServerEvacuate) Name() string {
+ return "volumeServer.evacuate"
+}
+
+func (c *commandVolumeServerEvacuate) Help() string {
+ return `move out all data on a volume server
+
+ volumeServer.evacuate -node
+
+ This command moves all data away from the volume server.
+ The volumes on the volume servers will be redistributed.
+
+ Usually this is used to prepare to shutdown or upgrade the volume server.
+
+ Sometimes a volume can not be moved because there are no
+ good destination to meet the replication requirement.
+ E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
+ You can use "-skipNonMoveable" to move the rest volumes.
+
+`
+}
+
+func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server")
+ skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
+ applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
+ if err = vsEvacuateCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *volumeServer == "" {
+ return fmt.Errorf("need to specify volume server by -node=:")
+ }
+
+ return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer)
+
+}
+
+func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
+ // 1. confirm the volume server is part of the cluster
+ // 2. collect all other volume servers, sort by empty slots
+ // 3. move to any other volume server as long as it satisfy the replication requirements
+
+ // list all the volumes
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ return err
+ }
+
+ if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+ // find this volume server
+ volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "")
+ thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
+ if thisNode == nil {
+ return fmt.Errorf("%s is not found in this cluster", volumeServer)
+ }
+
+ // move away normal volumes
+ volumeReplicas, _ := collectVolumeReplicaLocations(resp)
+ for _, vol := range thisNode.info.VolumeInfos {
+ hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
+ if err != nil {
+ return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
+ fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
+ } else {
+ return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
+ }
+ }
+ }
+ return nil
+}
+
+func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+ // find this ec volume server
+ ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "")
+ thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
+ if thisNode == nil {
+ return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
+ }
+
+ // move away ec volumes
+ for _, ecShardInfo := range thisNode.info.EcShardInfos {
+ hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
+ if err != nil {
+ return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
+ } else {
+ return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
+ }
+ }
+ }
+ return nil
+}
+
+func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
+
+ for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
+
+ sort.Slice(otherNodes, func(i, j int) bool {
+ return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id)
+ })
+
+ for i := 0; i < len(otherNodes); i++ {
+ emptyNode := otherNodes[i]
+ err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
+ if err != nil {
+ return
+ } else {
+ hasMoved = true
+ break
+ }
+ }
+ if !hasMoved {
+ return
+ }
+ }
+
+ return
+}
+
+func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
+ sort.Slice(otherNodes, func(i, j int) bool {
+ return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio()
+ })
+
+ for i := 0; i < len(otherNodes); i++ {
+ emptyNode := otherNodes[i]
+ hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
+ if err != nil {
+ return
+ }
+ if hasMoved {
+ break
+ }
+ }
+ return
+}
+
+func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
+ for _, node := range volumeServers {
+ if node.info.Id == thisServer {
+ thisNode = node
+ continue
+ }
+ otherNodes = append(otherNodes, node)
+ }
+ return
+}
+
+func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
+ for _, node := range volumeServers {
+ if node.info.Id == thisServer {
+ thisNode = node
+ continue
+ }
+ otherNodes = append(otherNodes, node)
+ }
+ return
+}
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
new file mode 100644
index 000000000..2a2e56e86
--- /dev/null
+++ b/weed/shell/command_volume_server_leave.go
@@ -0,0 +1,67 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "google.golang.org/grpc"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeServerLeave{})
+}
+
+type commandVolumeServerLeave struct {
+}
+
+func (c *commandVolumeServerLeave) Name() string {
+ return "volumeServer.leave"
+}
+
+func (c *commandVolumeServerLeave) Help() string {
+ return `stop a volume server from sending heartbeats to the master
+
+ volume.unmount -node -force
+
+ This command enables gracefully shutting down the volume server.
+ The volume server will stop sending heartbeats to the master.
+ After draining the traffic for a few seconds, you can safely shut down the volume server.
+
+ This operation is not revocable unless the volume server is restarted.
+`
+}
+
+func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeServer := vsLeaveCommand.String("node", "", ": of the volume server")
+ if err = vsLeaveCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *volumeServer == "" {
+ return fmt.Errorf("need to specify volume server by -node=:")
+ }
+
+ return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer)
+
+}
+
+func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) {
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
+ if leaveErr != nil {
+ fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
+ } else {
+ fmt.Fprintf(writer, "stopped heartbeat in volume server %s. After a few seconds to drain traffic, it will be safe to stop the volume server.\n", volumeServer)
+ }
+ return leaveErr
+ })
+}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 4632a1fb0..2d5166acf 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -66,7 +66,7 @@ func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool
args[i] = strings.Trim(string(cmds[1+i]), "\"'")
}
- cmd := strings.ToLower(cmds[0])
+ cmd := cmds[0]
if cmd == "help" || cmd == "?" {
printHelp(cmds)
} else if cmd == "exit" || cmd == "quit" {
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index 7ff09a388..f3824728e 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -108,32 +108,23 @@ func init() {
}
-func LoopPushingMetric(name, instance string, gatherer *prometheus.Registry, fnGetMetricsDest func() (addr string, intervalSeconds int)) {
+func LoopPushingMetric(name, instance string, gatherer *prometheus.Registry, addr string, intervalSeconds int) {
- if fnGetMetricsDest == nil {
+ if addr == "" || intervalSeconds == 0 {
return
}
- addr, intervalSeconds := fnGetMetricsDest()
pusher := push.New(addr, name).Gatherer(gatherer).Grouping("instance", instance)
- currentAddr := addr
for {
- if currentAddr != "" {
- err := pusher.Push()
- if err != nil && !strings.HasPrefix(err.Error(), "unexpected status code 200") {
- glog.V(0).Infof("could not push metrics to prometheus push gateway %s: %v", addr, err)
- }
+ err := pusher.Push()
+ if err != nil && !strings.HasPrefix(err.Error(), "unexpected status code 200") {
+ glog.V(0).Infof("could not push metrics to prometheus push gateway %s: %v", addr, err)
}
if intervalSeconds <= 0 {
intervalSeconds = 15
}
time.Sleep(time.Duration(intervalSeconds) * time.Second)
- addr, intervalSeconds = fnGetMetricsDest()
- if currentAddr != addr {
- pusher = push.New(addr, name).Gatherer(gatherer).Grouping("instance", instance)
- currentAddr = addr
- }
}
}
diff --git a/weed/storage/needle/file_id.go b/weed/storage/needle/file_id.go
index 5dabb0f25..6055bdd1c 100644
--- a/weed/storage/needle/file_id.go
+++ b/weed/storage/needle/file_id.go
@@ -66,7 +66,7 @@ func formatNeedleIdCookie(key NeedleId, cookie Cookie) string {
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], cookie)
nonzero_index := 0
- for ; bytes[nonzero_index] == 0; nonzero_index++ {
+ for ; bytes[nonzero_index] == 0 && nonzero_index < NeedleIdSize; nonzero_index++ {
}
return hex.EncodeToString(bytes[nonzero_index:])
}
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 7a5a423b4..e42fb238b 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -27,11 +27,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
if offset.IsZero() {
return 0, nil
}
- if size.IsDeleted() {
- size = 0
- }
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if size < 0 {
+ // read the deletion entry
+ if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ }
+ } else {
+ if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ }
}
return
}
@@ -65,3 +69,20 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
}
return n.AppendAtNs, err
}
+
+func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) {
+ n := new(needle.Needle)
+ size := n.DiskSize(v)
+ var fileSize int64
+ fileSize, _, err = datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("GetStat: %v", err)
+ }
+ if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil {
+ return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
+ }
+ if n.Id != key {
+ return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
+ }
+ return n.AppendAtNs, err
+}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index e77010dbd..e11bde2cb 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -381,10 +381,8 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err)
}
- if v.volumeInfo.Version == 0 {
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
- }
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
}
defer v.Close()
@@ -406,8 +404,9 @@ func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorag
for n != nil {
var needleBody []byte
if volumeFileScanner.ReadNeedleBody() {
+ // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle body: %v", err)
+ glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
// err = fmt.Errorf("cannot read needle body: %v", err)
// return
}
diff --git a/weed/util/compression.go b/weed/util/compression.go
index 2881a7bfd..cf3ac7c57 100644
--- a/weed/util/compression.go
+++ b/weed/util/compression.go
@@ -12,15 +12,44 @@ import (
"github.com/klauspost/compress/zstd"
)
+var (
+ UnsupportedCompression = fmt.Errorf("unsupported compression")
+)
+
+func MaybeGzipData(input []byte) []byte {
+ if IsGzippedContent(input) {
+ return input
+ }
+ gzipped, err := GzipData(input)
+ if err != nil {
+ return input
+ }
+ if len(gzipped)*10 > len(input)*9 {
+ return input
+ }
+ return gzipped
+}
+
+func MaybeDecompressData(input []byte) []byte {
+ uncompressed, err := DecompressData(input)
+ if err != nil {
+ if err != UnsupportedCompression {
+ glog.Errorf("decompressed data: %v", err)
+ }
+ return input
+ }
+ return uncompressed
+}
+
func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
+ glog.V(2).Infof("error gzip data: %v", err)
return nil, err
}
if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
+ glog.V(2).Infof("error closing gzipped data: %v", err)
return nil, err
}
return buf.Bytes(), nil
@@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) {
if IsZstdContent(input) {
return unzstdData(input)
}
- return input, fmt.Errorf("unsupported compression")
+ return input, UnsupportedCompression
}
func ungzipData(input []byte) ([]byte, error) {
@@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) {
defer r.Close()
output, err := ioutil.ReadAll(r)
if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
+ glog.V(2).Infof("error ungzip data: %v", err)
}
return output, err
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index d48b9e32d..4782b5688 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 92)
+ VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 99)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 7cc64ea85..eef24b930 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -174,7 +174,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
return readFn(r.Body)
}
-func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) {
+func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) {
response, err := client.Get(fileUrl)
if err != nil {
return "", nil, nil, err
@@ -188,7 +188,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re
filename = strings.Trim(filename, "\"")
}
}
- rc = response.Body
+ resp = response
return
}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index f0486ac46..57f4b0115 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
+ "fmt"
"time"
"github.com/golang/protobuf/proto"
@@ -11,13 +12,17 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+var (
+ ResumeError = fmt.Errorf("resume")
+)
+
func (logBuffer *LogBuffer) LoopProcessLogData(
startTreadTime time.Time,
waitForDataFn func() bool,
- eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
- lastReadTime := startTreadTime
+ lastReadTime = startTreadTime
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
@@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
entryData := buf[pos+4 : pos+4+int(size)]
- // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
-
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 1ecfe6ce2..d477a6b2d 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -46,10 +46,13 @@ func (l *ExclusiveLocker) RequestLock() {
return
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
// retry to get the lease
for {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -71,9 +74,12 @@ func (l *ExclusiveLocker) RequestLock() {
// start a goroutine to renew the lease
go func() {
+ ctx2, cancel2 := context.WithCancel(context.Background())
+ defer cancel2()
+
for l.isLocking {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -98,8 +104,12 @@ func (l *ExclusiveLocker) RequestLock() {
func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 4c066d535..3d23d8f13 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -70,7 +70,10 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
- stream, err := client.KeepConnected(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
return err