Browse Source

add new files

pull/933/head
stlpmo-jn 7 years ago
parent
commit
0b6221b9b1
  1. 539
      weed/shell/replication_health_checker.go
  2. 178
      weed/shell/replication_health_checker_test.go

539
weed/shell/replication_health_checker.go

@ -0,0 +1,539 @@
package shell
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
"io"
"math/rand"
"sort"
"strings"
"sync"
)
func init() {
//commands = append(commands, &commandReplicationHealthChecker{})
//commands = append(commands, &commandReplicationHealthRepair{})
}
type commandReplicationHealthChecker struct {
args []string
commandEnv *commandEnv
writer io.Writer
checker *ReplicationHealthChecker
}
func (c *commandReplicationHealthChecker) Name() string {
return "volume.check.replication"
}
func (c *commandReplicationHealthChecker) Help() string {
return `
`
}
func (c *commandReplicationHealthChecker) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
c.writer = writer
c.commandEnv = commandEnv
c.args = args
c.checker = NewReplicationHealthChecker(context.Background(), commandEnv.option.GrpcDialOption)
return nil
}
type commandReplicationHealthRepair struct {
args []string
commandEnv *commandEnv
writer io.Writer
repair *ReplicationHealthRepair
}
func (c *commandReplicationHealthRepair) Name() string {
return "volume.repair.replication"
}
func (c *commandReplicationHealthRepair) Help() string {
return `
`
}
func (c *commandReplicationHealthRepair) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
c.args = args
c.commandEnv = commandEnv
c.writer = writer
ctx := context.Background()
c.repair = NewReplicationHealthRepair(ctx, commandEnv.option.GrpcDialOption)
var resp *master_pb.VolumeListResponse
if err := c.commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
var err error
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
}); err != nil {
return err
}
// invocation the commandReplicationHealthChecker and get the error replications
checker := NewReplicationHealthChecker(ctx, c.commandEnv.option.GrpcDialOption)
eVids, err := checker.Check(resp.TopologyInfo)
if err != nil {
writer.Write([]byte(err.Error()))
return err
}
// repair them
successVids, failedVids, err := c.repair.Repair(resp.TopologyInfo, eVids)
if err != nil {
str := fmt.Sprintf("repair volume:%v replication failed.\n", failedVids)
writer.Write([]byte(str))
} else {
str := fmt.Sprintf("repair volue:%v replication success.\n", successVids)
writer.Write([]byte(str))
}
return nil
}
/////////////////////////////////////////////////////////////////////////
type ReplicationHealthChecker struct {
grpcDialOption grpc.DialOption
context context.Context
}
func NewReplicationHealthChecker(ctx context.Context, grpcOption grpc.DialOption) *ReplicationHealthChecker {
return &ReplicationHealthChecker{grpcDialOption: grpcOption, context: ctx}
}
/**
double check :
1st, get information of volume from topology;
2nd, get the latest information of volume from every data node
*/
func (r *ReplicationHealthChecker) Check(topologyInfo *master_pb.TopologyInfo) ([]uint32, error) {
volInfoMap, vol2LocsMap := getVolumeInfo(topologyInfo)
if (nil == volInfoMap) || (nil == vol2LocsMap) || (len(volInfoMap) <= 0) || (len(vol2LocsMap) <= 0) {
return nil, fmt.Errorf("get volume info from topology failed")
}
errVids := getUnhealthyVolumeIds(volInfoMap, vol2LocsMap, topologyInfo.VolumeSizeLimitBytes)
if nil == errVids || (len(errVids) <= 0) {
glog.V(4).Infof("no error replications")
return nil, nil
}
// get the latest volume file status from every data node
newErrVids := make([]uint32, 0, len(errVids))
for _, eVid := range errVids {
eVidUrls := getVolumeUrls(vol2LocsMap[eVid])
fileStats, err := getVolumeFileStatus(r.grpcDialOption, r.context, eVid, eVidUrls)
if err != nil {
glog.Error(err)
return nil, err
}
vInfos := make([]*ReplicaInformation, 0, len(fileStats))
for _, i := range fileStats {
vInfos = append(vInfos, &ReplicaInformation{
Size: i.fileStat.Size,
FileCount: i.fileStat.FileCount,
ReadOnly: i.fileStat.ReadOnly,
CompactRevision: i.fileStat.CompactRevision,
LastCompactIndexOffset: i.fileStat.LastCompactIndexOffset,
})
}
if isHealthyVolumeReplications(vInfos, topologyInfo.VolumeSizeLimitBytes) {
continue
}
newErrVids = append(newErrVids, eVid)
}
return newErrVids, nil
}
/////////////////////////////////////////////////////////////////////////
type ReplicationHealthRepair struct {
grpcDialOption grpc.DialOption
context context.Context
}
func NewReplicationHealthRepair(ctx context.Context, grpcOption grpc.DialOption) *ReplicationHealthRepair {
return &ReplicationHealthRepair{grpcDialOption: grpcOption, context: ctx,}
}
/**
repair the unhealthy replications,
*/
func (r *ReplicationHealthRepair) Repair(topologyInfo *master_pb.TopologyInfo, errVids []uint32) (success, failed []uint32, err error) {
volInfoMap, vol2LocsMap := getVolumeInfo(topologyInfo)
if (nil == volInfoMap) || (nil == vol2LocsMap) || (len(volInfoMap) <= 0) || (len(vol2LocsMap) <= 0) {
return nil, errVids, fmt.Errorf("get volume info from topology failed")
}
for _, eVid := range errVids {
if isReadOnlyVolume(volInfoMap[eVid]) {
continue // skip the read-only volume compacting
}
glog.V(4).Infof("begin compact all the replications of volume:%v", eVid)
eVidUrls := getVolumeUrls(vol2LocsMap[eVid])
if tryBatchCompactVolume(r.context, r.grpcDialOption, needle.VolumeId(eVid), eVidUrls) == false {
err := fmt.Errorf("compact all the replications of volume:%v", eVid)
glog.Error(err)
return nil, errVids, err
}
glog.V(4).Infof("success compact all the replications of volume:%v", eVid)
}
for _, eVid := range errVids {
eVidUrls := getVolumeUrls(vol2LocsMap[eVid])
fileStats, err := getVolumeFileStatus(r.grpcDialOption, r.context, eVid, eVidUrls)
if err != nil {
glog.Error(err)
failed = append(failed, eVid)
continue
}
okUrls, errUrls := filterErrorReplication(fileStats)
if len(errUrls) == 0 {
success = append(success, eVid) // no need repair
continue
}
info := volInfoMap[eVid][0]
ttl := needle.LoadTTLFromUint32(info.Ttl).String()
rp, err := storage.NewReplicaPlacementFromByte(byte(info.ReplicaPlacement))
if err != nil {
failed = append(failed, eVid)
glog.Errorf("vid:%v, parse replicaPlacement failed, %d", eVid, info.ReplicaPlacement)
continue
}
syncSuccess := true
for _, errUrl := range errUrls {
okUrl := okUrls[rand.Intn(len(okUrls))]
req := &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(info.Id),
Collection: info.Collection,
Replication: rp.String(),
Ttl: ttl,
SourceDataNode: okUrl,
}
err = syncReplication(r.grpcDialOption, errUrl, req)
if nil != err {
syncSuccess = false
glog.Errorf("sync replication from %s to %s failed, %v", okUrl, errUrl, err)
}
}
if syncSuccess {
success = append(success, eVid)
} else {
failed = append(failed, eVid)
}
}
if len(failed) > 0 {
err = fmt.Errorf("there are some volumes health repair failed")
}
return
}
type ReplicaFileStatus struct {
url string
fileStat *ReplicaInformation
}
/**
get information of volume from every volume node concurrently
*/
func getVolumeFileStatus(grpcDialOption grpc.DialOption, ctx context.Context, vid uint32, volumeUrls []string) (fileStatuses []*ReplicaFileStatus, err error) {
type ResponsePair struct {
url string
status *volume_server_pb.ReadVolumeFileStatusResponse
err error
}
var wg sync.WaitGroup
resultChan := make(chan ResponsePair, len(volumeUrls))
wg.Add(len(volumeUrls))
getFileStatFunc := func(url string, volumeId uint32) {
defer wg.Done()
glog.V(4).Infof("volumeId:%v, location:%v", volumeId, url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: uint32(volumeId),
}
respTmp, err := client.ReadVolumeFileStatus(ctx, req)
resultChan <- ResponsePair{
url: url,
status: respTmp,
err: err,
}
return nil
})
if nil != err {
glog.Error(err)
}
}
for _, url := range volumeUrls {
go getFileStatFunc(url, vid)
}
go func() { // close channel
wg.Wait()
close(resultChan)
}()
var errs []string
for result := range resultChan {
if result.err == nil {
fileStatuses = append(fileStatuses, &ReplicaFileStatus{
url: result.url,
fileStat: &ReplicaInformation{
Size: result.status.DatFileSize,
FileCount: result.status.FileCount,
ReadOnly: false,
CompactRevision: 0,
LastCompactIndexOffset: 0,
}})
continue
}
tmp := fmt.Sprintf("url : %s, error : %v", result.url, result.err)
errs = append(errs, tmp)
}
if len(fileStatuses) == len(volumeUrls) {
return fileStatuses, nil
}
err = fmt.Errorf("get volume[%v] replication status failed, err : %s", vid, strings.Join(errs, "; "))
return nil, err
}
/**
<see the class mapMetric and needleMap> :
the file count is the total count of the volume received from user clients
*/
func filterErrorReplication(vInfo []*ReplicaFileStatus) (okUrls, errUrls []string) {
sort.Slice(vInfo, func(i, j int) bool {
return vInfo[i].fileStat.FileCount > vInfo[j].fileStat.FileCount
})
if vInfo[0].fileStat.FileCount != vInfo[len(vInfo)-1].fileStat.FileCount {
okFileCounter := vInfo[0].fileStat.FileCount
for _, v := range vInfo {
if okFileCounter == v.fileStat.FileCount {
okUrls = append(okUrls, v.url)
} else {
errUrls = append(errUrls, v.url)
}
}
return
}
return
}
// execute the compact transaction
func compactVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid needle.VolumeId) bool {
glog.V(0).Infoln("Start vacuuming", vid, "on", volumeUrl)
err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, volumeUrl, err)
return false
}
glog.V(0).Infof("Complete vacuuming volume:%v on %s", vid, volumeUrl)
return true
}
// commit the compact transaction when compactVolume() return true
func commitCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid needle.VolumeId) bool {
err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when committing vacuum %d on %s: %v", vid, volumeUrl, err)
return false
}
glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, volumeUrl)
return true
}
// rollback the compact transaction when compactVolume return false
func cleanupCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid needle.VolumeId) bool {
glog.V(0).Infoln("Start cleaning up", vid, "on", volumeUrl)
err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, volumeUrl, err)
return false
}
glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, volumeUrl)
return false
}
func tryCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, volumeUrl string) bool {
if compactVolume(ctx, grpcDialOption, volumeUrl, vid) == false {
return cleanupCompactedVolume(ctx, grpcDialOption, volumeUrl, vid)
}
return commitCompactedVolume(ctx, grpcDialOption, volumeUrl, vid)
}
func tryBatchCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, urls []string) bool {
resultChan := make(chan error)
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
go func(volumeUrl string) {
defer wg.Done()
if tryCompactVolume(ctx, grpcDialOption, vid, volumeUrl) == false {
resultChan <- fmt.Errorf("url:%s", volumeUrl)
}
}(url)
}
go func() {
wg.Wait()
close(resultChan)
}()
var errs []string
for result := range resultChan {
if result != nil {
errs = append(errs, result.Error())
}
}
if len(errs) > 0 {
glog.Errorf("consist volume:%v compact reversion failed, %s", vid, strings.Join(errs, "; "))
return false
}
return true
}
func getVolumeUrls(locs []*master_pb.DataNodeInfo) []string {
eVidUrls := make([]string, 0, len(locs))
for _, loc := range locs {
eVidUrls = append(eVidUrls, loc.Url)
}
return eVidUrls
}
type ReplicaInformation struct {
Size uint64
FileCount uint64
ReadOnly bool
CompactRevision uint32
LastCompactIndexOffset uint64
}
func getUnhealthyVolumeIds(volInfoMap map[uint32][]*master_pb.VolumeInformationMessage,
vol2LocsMap map[uint32][]*master_pb.DataNodeInfo, volumeSizeLimitMB uint64) []uint32 {
errVids := make([]uint32, 0, len(vol2LocsMap))
for vid, info := range volInfoMap {
vInfos := make([]*ReplicaInformation, 0, len(info))
for _, i := range info {
vInfos = append(vInfos, &ReplicaInformation{
Size: i.Size,
FileCount: i.FileCount,
ReadOnly: i.ReadOnly,
CompactRevision: i.CompactRevision,
})
}
if isHealthyVolumeReplications(vInfos, volumeSizeLimitMB*1024*1024) {
glog.V(4).Infof("the volume:%v has %d same replication, need not repair", vid, len(info))
continue
}
errVids = append(errVids, vid)
}
return errVids
}
func isHealthyVolumeReplications(volInfo []*ReplicaInformation, volumeSizeLimit uint64) bool {
fileSizeSet := make(map[uint64]bool)
fileCountSet := make(map[uint64]bool)
compactVersionSet := make(map[uint32]bool)
compactOffsetSet := make(map[uint64]bool)
//lastModifiedSet := make(map[uint64]bool)
var oneFileSize uint64 = 0
for _, v := range volInfo {
fileCountSet[v.FileCount] = true
//lastModifiedSet[v.] = true
fileSizeSet[v.Size] = true
oneFileSize = v.Size
compactVersionSet[v.CompactRevision] = true
compactOffsetSet[v.LastCompactIndexOffset] = true
}
if (len(fileSizeSet) == 1) && (oneFileSize >= volumeSizeLimit) && (len(fileCountSet) == 1) {
return true
}
if len(fileCountSet) != 1 {
return false
}
if len(fileSizeSet) != 1 {
return false
}
if len(compactVersionSet) != 1 {
return false
}
//if len(compactOffsetSet) != 1 {
// return false
//}
return true
}
func isReadOnlyVolume(replicaInfo []*master_pb.VolumeInformationMessage) bool {
readOnlySet := make(map[bool]bool)
for _, info := range replicaInfo {
readOnlySet[info.ReadOnly] = true
}
if _, exist := readOnlySet[true]; exist {
return len(readOnlySet) == 1
}
return false
}
func syncReplication(grpcDialOption grpc.DialOption, destUrl string, req *volume_server_pb.VolumeCopyRequest) error {
ctx := context.Background()
err := operation.WithVolumeServerClient(destUrl, grpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
if _, err := client.VolumeCopy(ctx, req); err != nil {
glog.Errorf("sync replication failed, %v", err)
return err
}
return nil
})
return err
}
func getVolumeInfo(topo *master_pb.TopologyInfo) (map[uint32][]*master_pb.VolumeInformationMessage, map[uint32][]*master_pb.DataNodeInfo) {
volInfoMap := make(map[uint32][]*master_pb.VolumeInformationMessage)
vol2LocsMap := make(map[uint32][]*master_pb.DataNodeInfo)
IterateVolumes(topo, func(dc *master_pb.DataCenterInfo, rack *master_pb.RackInfo, dataNode *master_pb.DataNodeInfo, vol *master_pb.VolumeInformationMessage) {
volInfoMap[vol.Id] = append(volInfoMap[vol.Id], vol)
vol2LocsMap[vol.Id] = append(vol2LocsMap[vol.Id], dataNode)
})
return volInfoMap, vol2LocsMap
}
func IterateVolumes(topo *master_pb.TopologyInfo,
callBack func(dc *master_pb.DataCenterInfo, rack *master_pb.RackInfo, dataNode *master_pb.DataNodeInfo, vol *master_pb.VolumeInformationMessage)) {
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dn := range rack.DataNodeInfos {
for _, vol := range dn.VolumeInfos {
callBack(dc, rack, dn, vol)
}
}
}
}
}

178
weed/shell/replication_health_checker_test.go

@ -0,0 +1,178 @@
package shell
import (
"context"
"encoding/json"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"strconv"
"strings"
"testing"
)
var topologyLayout = `
{
"dc1":{
"rack1":{
"server111":{
"volumes":[
{"id":1, "size":12312},
{"id":2, "size":12312},
{"id":3, "size":12312}
],
"limit":3
},
"server112":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
{"id":6, "size":12312}
],
"limit":10
}
},
"rack2":{
"server121":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
{"id":6, "size":12312}
],
"limit":4
},
"server122":{
"volumes":[],
"limit":4
},
"server123":{
"volumes":[
{"id":2, "size":12312},
{"id":3, "size":12311},
{"id":4, "size":12312}
],
"limit":5
}
}
},
"dc3":{
}
}
`
func setup(topologyLayout string) *topology.Topology {
var data interface{}
err := json.Unmarshal([]byte(topologyLayout), &data)
if err != nil {
fmt.Println("error:", err)
}
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
var portT int
topo := topology.NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey)
dcMap := dcValue.(map[string]interface{})
topo.LinkChildNode(dc)
for rackKey, rackValue := range dcMap {
rack := topology.NewRack(rackKey)
rackMap := rackValue.(map[string]interface{})
dc.LinkChildNode(rack)
for serverKey, serverValue := range rackMap {
server := topology.NewDataNode(serverKey)
server.Ip = "localhost"
portT += 2
server.Port = portT
server.PublicUrl = server.Ip + ":" + strconv.FormatUint(uint64(server.Port), 10)
serverMap := serverValue.(map[string]interface{})
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
Id: needle.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
Version: needle.CurrentVersion,
ReplicaPlacement: &storage.ReplicaPlacement{1, 0, 0},
Ttl: needle.EMPTY_TTL,
Collection: "",
}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64)))
}
}
}
return topo
}
func TestGetVolumeList(t *testing.T) {
topo := setup(topologyLayout)
topoInfo := topo.ToTopologyInfo()
if nil == topoInfo {
t.Errorf("failed.")
}
}
func TestReplicationHealthChecker_GetErrorReplications(t *testing.T) {
topo := setup(topologyLayout)
topoInfo := topo.ToTopologyInfo()
if nil == topoInfo {
t.Errorf("failed.")
}
checker := NewReplicationHealthChecker(context.Background(), grpc.EmptyDialOption{})
errVids, err := checker.Check(topoInfo)
if err != nil {
t.Error(err)
return
} else {
fmt.Printf("error vids : %v\n", errVids)
}
}
// this UT need mock or real seaweedfs service
func TestReplicationHealthChecker_GetErrorReplications2(t *testing.T) {
masters := "localhost:9633,localhost:9733,localhost:9833"
ctx := context.Background()
masterClient := wdclient.NewMasterClient(ctx, grpc.WithInsecure(), "shell", strings.Split(masters, ","))
go masterClient.KeepConnectedToMaster()
masterClient.WaitUntilConnected()
var resp *master_pb.VolumeListResponse
if err := masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
var err error
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
}); err != nil {
t.Error(err)
}
//respBytes, err := json.Marshal(resp)
//if err != nil {
// t.Error(err)
//}
//t.Log(string(respBytes[:]))
checker := NewReplicationHealthChecker(ctx, grpc.WithInsecure())
errVids, err := checker.Check(resp.TopologyInfo)
if err != nil {
t.Error(err)
return
}
fmt.Printf("error vids : %v\n", errVids)
repair := NewReplicationHealthRepair(ctx, grpc.WithInsecure())
success, failed, err := repair.Repair(resp.TopologyInfo, errVids)
if err != nil {
t.Error(err)
}
fmt.Printf("success:%v, failed:%v", success, failed)
}
Loading…
Cancel
Save