From c1a0403da2b0fb6f4eebc48d8dc621435c6e0b12 Mon Sep 17 00:00:00 2001 From: stlpmo-jn Date: Wed, 10 Apr 2019 19:41:55 +0800 Subject: [PATCH 1/3] repair the error replications of the volume --- weed/pb/volume_server.proto | 1 + weed/pb/volume_server_pb/volume_server.pb.go | 8 + weed/server/volume_grpc_replicate.go | 66 ++++- weed/storage/volume.go | 19 ++ weed/topology/replication_health_checker.go | 297 +++++++++++++++++++ 5 files changed, 380 insertions(+), 11 deletions(-) create mode 100644 weed/topology/replication_health_checker.go diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 3b5b36a21..1cef07dce 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -171,6 +171,7 @@ message ReadVolumeFileStatusResponse { uint64 idx_file_size = 3; uint64 dat_file_timestamp = 4; uint64 dat_file_size = 5; + uint64 file_count = 6; } message DiskStatus { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 0f3b47ee0..81cae93c6 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -656,6 +656,7 @@ type ReadVolumeFileStatusResponse struct { IdxFileSize uint64 `protobuf:"varint,3,opt,name=idx_file_size,json=idxFileSize" json:"idx_file_size,omitempty"` DatFileTimestamp uint64 `protobuf:"varint,4,opt,name=dat_file_timestamp,json=datFileTimestamp" json:"dat_file_timestamp,omitempty"` DatFileSize uint64 `protobuf:"varint,5,opt,name=dat_file_size,json=datFileSize" json:"dat_file_size,omitempty"` + FileCount uint64 `protobuf:"varint,6,opt,name=file_count,json=fileCount" json:"file_count,omitempty"` } func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } @@ -698,6 +699,13 @@ func (m *ReadVolumeFileStatusResponse) GetDatFileSize() uint64 { return 0 } +func (m *ReadVolumeFileStatusResponse) GetFileCount() uint64 { + if m != nil { + return m.FileCount + } + return 0 +} + type DiskStatus struct { Dir string `protobuf:"bytes,1,opt,name=dir" json:"dir,omitempty"` All uint64 `protobuf:"varint,2,opt,name=all" json:"all,omitempty"` diff --git a/weed/server/volume_grpc_replicate.go b/weed/server/volume_grpc_replicate.go index 1a31a37f3..c991a496e 100644 --- a/weed/server/volume_grpc_replicate.go +++ b/weed/server/volume_grpc_replicate.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" @@ -34,22 +35,28 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ // send .idx file // send .dat file // confirm size and timestamp - + var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse + datFileName := volumeFileName + ".dat" + idxFileName := volumeFileName + ".idx" err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - - // TODO read file sizes before copying - client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{}) + var err error + volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + if nil != err { + return fmt.Errorf("read volume file status failed, %v", err) + } copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: req.VolumeId, IsIdxFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".idx") + err = writeToFile(copyFileClient, idxFileName) if err != nil { return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err) } @@ -58,24 +65,26 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ VolumeId: req.VolumeId, IsDatFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".dat") + err = writeToFile(copyFileClient, datFileName) if err != nil { return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err) } return nil }) - if err != nil { + os.Remove(idxFileName) + os.Remove(datFileName) return nil, err } - // TODO: check the timestamp and size + if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 + return nil, err + } // mount the volume err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) @@ -84,11 +93,35 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ } return &volume_server_pb.ReplicateVolumeResponse{}, err +} +/** + only check the the differ of the file size + todo: maybe should check the received count and deleted count of the volume + */ +func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { + stat, err := os.Stat(idxFileName) + if err != nil { + return fmt.Errorf("get idx file info failed, %v", err) + } + if originFileInf.IdxFileSize != uint64(stat.Size()) { + return fmt.Errorf("the idx file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.IdxFileSize) + } + + stat, err = os.Stat(datFileName) + if err != nil { + return fmt.Errorf("get dat file info failed, %v", err) + } + if originFileInf.DatFileSize != uint64(stat.Size()) { + return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.DatFileSize) + } + return nil } func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { - println("writing to ", fileName) + glog.V(4).Infof("writing to ", fileName) dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil @@ -110,6 +143,17 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { resp := &volume_server_pb.ReadVolumeFileStatusResponse{} + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.VolumeId = req.VolumeId + resp.DatFileSize = v.DataFileSize() + resp.IdxFileSize = v.IndexFileSize() + resp.DatFileTimestamp = v.LastModifiedTime() + resp.IdxFileTimestamp = v.LastModifiedTime() + resp.FileCount = uint64(v.FileCount()) return resp, nil } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 807fefa38..280963c2c 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -79,6 +79,25 @@ func (v *Volume) Size() int64 { return 0 // -1 causes integer overflow and the volume to become unwritable. } +func (v *Volume)IndexFileSize() uint64 { + return v.nm.IndexFileSize() +} + +func (v *Volume)DataFileSize() uint64 { + return uint64(v.Size()) +} + +/** +unix time in seconds + */ +func (v *Volume)LastModifiedTime() uint64 { + return v.lastModifiedTime +} + +func (v *Volume)FileCount() uint { + return uint(v.nm.FileCount()) +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() diff --git a/weed/topology/replication_health_checker.go b/weed/topology/replication_health_checker.go new file mode 100644 index 000000000..947e7d45c --- /dev/null +++ b/weed/topology/replication_health_checker.go @@ -0,0 +1,297 @@ +package topology + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "google.golang.org/grpc" + "sort" + "strings" + "sync" +) + +/** + check the replication health + */ +func (t *Topology) RepairUnhealthyReplicationInLayout(grpcDialOption grpc.DialOption, layout *VolumeLayout, eVid storage.VolumeId) error { + ctx := context.Background() + locations, exist := layout.vid2location[eVid] + if !exist { + retErr := fmt.Errorf("the volume:%v has no locations", eVid) + glog.V(0).Infof(retErr.Error()) + return retErr + } + + //glog.V(5).Infof("volume:%v, locations:%v", eVid, locations.list) + fileStat, err := getReplicationInfo(grpcDialOption, ctx, eVid, locations) + if err != nil { + glog.Errorf("get replication status failed, %v", err) + return err + } + + if isSameVolumeReplications(fileStat, layout.volumeSizeLimit) { + glog.V(0).Infof("the volume:%v has %d same replication, need not repair", eVid, len(fileStat)) + return nil + } + + // compact all the replications of volume + { + glog.V(4).Infof("begin compact all the replications of volume:%v", eVid) + allUrls := make([]string, 0, len(fileStat)) + for _, fs := range fileStat { + allUrls = append(allUrls, fs.url) + } + + if tryBatchCompactVolume(ctx, grpcDialOption, eVid, allUrls) == false { + err := fmt.Errorf("compact all the replications of volume:%v", eVid) + glog.Error(err.Error()) + return err + } + glog.V(4).Infof("success compact all the replications of volume:%v", eVid) + } + + // get replication status again + fileStat, err = getReplicationInfo(grpcDialOption, ctx, eVid, locations) + if err != nil { + return err + } + + okUrls, errUrls := filterErrorReplication(fileStat) + if len(errUrls) == 0 { + return nil // they are the same + } + + if len(okUrls) == 0 { + return fmt.Errorf("no correct volume replications, that's impossible") + } + + glog.V(4).Infof("need repair replication : %v", errUrls) + if len(locations.list) <= 0 { + return fmt.Errorf("that's impossible, the locatins of volume:%v is empty", eVid) + } + for _, url := range errUrls { + vInfo := locations.list[0].volumes[eVid] + err = syncReplication(grpcDialOption, okUrls[0], url, vInfo) + if nil != err { + glog.Error(err) + return err + } + } + return nil +} + +type FileStatus struct { + url string + fileStat *volume_server_pb.ReadVolumeFileStatusResponse +} + +func getReplicationInfo(grpcDialOption grpc.DialOption, ctx context.Context, vid storage.VolumeId, locs *VolumeLocationList) (fs []FileStatus, err error) { + type ResponsePair struct { + url string + status *volume_server_pb.ReadVolumeFileStatusResponse + err error + } + + var wg sync.WaitGroup + resultChan := make(chan ResponsePair, len(locs.list)) + wg.Add(len(locs.list)) + getFileStatFunc := func(url string, volumeId storage.VolumeId) { + defer wg.Done() + glog.V(4).Infof("volumeId:%v, location:%v", volumeId, url) + err := operation.WithVolumeServerClient(url, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + req := &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: uint32(volumeId), + } + respTmp, err := client.ReadVolumeFileStatus(ctx, req) + resultChan <- ResponsePair{ + url: url, + status: respTmp, + err: err, + } + return nil + }) + if nil != err { + glog.Error(err) + } + } + for _, node := range locs.list { + go getFileStatFunc(node.Url(), vid) + } + + go func() { // close channel + wg.Wait() + close(resultChan) + }() + + var errs []string + for result := range resultChan { + if result.err == nil { + fs = append(fs, FileStatus{ + url: result.url, + fileStat: result.status, + }) + continue + } + tmp := fmt.Sprintf("url : %s, error : %v", result.url, result.err) + errs = append(errs, tmp) + } + + if len(fs) == len(locs.list) { + return fs, nil + } + err = fmt.Errorf("get volume[%v] replication status failed, err : %s", vid, strings.Join(errs, "; ")) + return nil, err +} + +/** + : + the file count is the total count of the volume received from user clients +todo: this policy is not perfected or not rigorous, need fix + */ +func filterErrorReplication(fileStat []FileStatus) (okUrls, errUrls []string) { + sort.Slice(fileStat, func(i, j int) bool { + return fileStat[i].fileStat.FileCount > fileStat[j].fileStat.FileCount + }) + if fileStat[0].fileStat.FileCount != fileStat[len(fileStat)-1].fileStat.FileCount { + okFileCounter := fileStat[0].fileStat.FileCount + for _, v := range fileStat { + if okFileCounter == v.fileStat.FileCount { + okUrls = append(okUrls, v.url) + } else { + errUrls = append(errUrls, v.url) + } + } + return + } + return +} + +// execute the compact transaction +func compactVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + glog.V(0).Infoln("Start vacuuming", vid, "on", volumeUrl) + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when vacuuming %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete vacuuming volume:%v on %s", vid, volumeUrl) + return true +} + +// commit the compact transaction when compactVolume() return true +func commitCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when committing vacuum %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, volumeUrl) + return true +} + +// rollback the compact transaction when compactVolume return false +func cleanupCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + glog.V(0).Infoln("Start cleaning up", vid, "on", volumeUrl) + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, volumeUrl) + return false +} + +func tryCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid storage.VolumeId, volumeUrl string) bool { + if compactVolume(ctx, grpcDialOption, volumeUrl, vid) == false { + return cleanupCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) + } + return commitCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) +} + +func tryBatchCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, + vid storage.VolumeId, urls []string) bool { + resultChan := make(chan error) + var wg sync.WaitGroup + wg.Add(len(urls)) + for _, url := range urls { + go func(volumeUrl string) { + defer wg.Done() + if tryCompactVolume(ctx, grpcDialOption, vid, volumeUrl) == false { + resultChan <- fmt.Errorf("url:%s", volumeUrl) + } + }(url) + } + + go func() { + wg.Wait() + close(resultChan) + }() + + var errs []string + for result := range resultChan { + if result != nil { + errs = append(errs, result.Error()) + } + } + if len(errs) > 0 { + glog.Errorf("consist volume:%v compact reversion failed, %s", vid, strings.Join(errs, "; ")) + return false + } + return true +} + +func isSameVolumeReplications(fileStat []FileStatus, volumeSizeLimit uint64) bool { + fileSizeSet := make(map[uint64]bool) + fileCountSet := make(map[uint64]bool) + lastModifiedSet := make(map[uint64]bool) + var oneFileSize uint64 = 0 + for _, v := range fileStat { + fileCountSet[v.fileStat.FileCount] = true + lastModifiedSet[v.fileStat.DatFileTimestamp] = true + fileSizeSet[v.fileStat.DatFileSize] = true + oneFileSize = v.fileStat.DatFileSize + } + + if (len(lastModifiedSet) == 1) && (len(fileCountSet) == 1) && + (len(fileSizeSet) == 1) && (oneFileSize >= volumeSizeLimit) { + return true + } + return false +} + +func syncReplication(grpcDialOption grpc.DialOption, srcUrl, destUrl string, vinfo storage.VolumeInfo) error { + ctx := context.Background() + err := operation.WithVolumeServerClient(destUrl, grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + if _, err := client.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ + VolumeId: uint32(vinfo.Id), + Collection: vinfo.Collection, + Replication: vinfo.ReplicaPlacement.String(), + Ttl: vinfo.Ttl.String(), + SourceDataNode: srcUrl, + }); err != nil { + glog.Errorf("sync replication failed, %v", err) + return err + } + return nil + }) + return err +} From f2031884f0a5e070ac1b16c968ece1664e20aafc Mon Sep 17 00:00:00 2001 From: stlpmo-jn Date: Thu, 11 Apr 2019 09:53:31 +0800 Subject: [PATCH 2/3] fix bug : CI build failed --- weed/server/volume_grpc_replicate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/server/volume_grpc_replicate.go b/weed/server/volume_grpc_replicate.go index c991a496e..c641755d0 100644 --- a/weed/server/volume_grpc_replicate.go +++ b/weed/server/volume_grpc_replicate.go @@ -121,7 +121,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse } func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { - glog.V(4).Infof("writing to ", fileName) + glog.V(4).Infof("writing to %s", fileName) dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil From c3fa50d3b344f27dcc82c5036e5bbefb6391d7f1 Mon Sep 17 00:00:00 2001 From: stlpmo-jn Date: Thu, 11 Apr 2019 13:40:31 +0800 Subject: [PATCH 3/3] remove the health chekcer, because it's the same as command_volume_fix_replication --- weed/topology/replication_health_checker.go | 297 -------------------- 1 file changed, 297 deletions(-) delete mode 100644 weed/topology/replication_health_checker.go diff --git a/weed/topology/replication_health_checker.go b/weed/topology/replication_health_checker.go deleted file mode 100644 index 947e7d45c..000000000 --- a/weed/topology/replication_health_checker.go +++ /dev/null @@ -1,297 +0,0 @@ -package topology - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" - "google.golang.org/grpc" - "sort" - "strings" - "sync" -) - -/** - check the replication health - */ -func (t *Topology) RepairUnhealthyReplicationInLayout(grpcDialOption grpc.DialOption, layout *VolumeLayout, eVid storage.VolumeId) error { - ctx := context.Background() - locations, exist := layout.vid2location[eVid] - if !exist { - retErr := fmt.Errorf("the volume:%v has no locations", eVid) - glog.V(0).Infof(retErr.Error()) - return retErr - } - - //glog.V(5).Infof("volume:%v, locations:%v", eVid, locations.list) - fileStat, err := getReplicationInfo(grpcDialOption, ctx, eVid, locations) - if err != nil { - glog.Errorf("get replication status failed, %v", err) - return err - } - - if isSameVolumeReplications(fileStat, layout.volumeSizeLimit) { - glog.V(0).Infof("the volume:%v has %d same replication, need not repair", eVid, len(fileStat)) - return nil - } - - // compact all the replications of volume - { - glog.V(4).Infof("begin compact all the replications of volume:%v", eVid) - allUrls := make([]string, 0, len(fileStat)) - for _, fs := range fileStat { - allUrls = append(allUrls, fs.url) - } - - if tryBatchCompactVolume(ctx, grpcDialOption, eVid, allUrls) == false { - err := fmt.Errorf("compact all the replications of volume:%v", eVid) - glog.Error(err.Error()) - return err - } - glog.V(4).Infof("success compact all the replications of volume:%v", eVid) - } - - // get replication status again - fileStat, err = getReplicationInfo(grpcDialOption, ctx, eVid, locations) - if err != nil { - return err - } - - okUrls, errUrls := filterErrorReplication(fileStat) - if len(errUrls) == 0 { - return nil // they are the same - } - - if len(okUrls) == 0 { - return fmt.Errorf("no correct volume replications, that's impossible") - } - - glog.V(4).Infof("need repair replication : %v", errUrls) - if len(locations.list) <= 0 { - return fmt.Errorf("that's impossible, the locatins of volume:%v is empty", eVid) - } - for _, url := range errUrls { - vInfo := locations.list[0].volumes[eVid] - err = syncReplication(grpcDialOption, okUrls[0], url, vInfo) - if nil != err { - glog.Error(err) - return err - } - } - return nil -} - -type FileStatus struct { - url string - fileStat *volume_server_pb.ReadVolumeFileStatusResponse -} - -func getReplicationInfo(grpcDialOption grpc.DialOption, ctx context.Context, vid storage.VolumeId, locs *VolumeLocationList) (fs []FileStatus, err error) { - type ResponsePair struct { - url string - status *volume_server_pb.ReadVolumeFileStatusResponse - err error - } - - var wg sync.WaitGroup - resultChan := make(chan ResponsePair, len(locs.list)) - wg.Add(len(locs.list)) - getFileStatFunc := func(url string, volumeId storage.VolumeId) { - defer wg.Done() - glog.V(4).Infof("volumeId:%v, location:%v", volumeId, url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - req := &volume_server_pb.ReadVolumeFileStatusRequest{ - VolumeId: uint32(volumeId), - } - respTmp, err := client.ReadVolumeFileStatus(ctx, req) - resultChan <- ResponsePair{ - url: url, - status: respTmp, - err: err, - } - return nil - }) - if nil != err { - glog.Error(err) - } - } - for _, node := range locs.list { - go getFileStatFunc(node.Url(), vid) - } - - go func() { // close channel - wg.Wait() - close(resultChan) - }() - - var errs []string - for result := range resultChan { - if result.err == nil { - fs = append(fs, FileStatus{ - url: result.url, - fileStat: result.status, - }) - continue - } - tmp := fmt.Sprintf("url : %s, error : %v", result.url, result.err) - errs = append(errs, tmp) - } - - if len(fs) == len(locs.list) { - return fs, nil - } - err = fmt.Errorf("get volume[%v] replication status failed, err : %s", vid, strings.Join(errs, "; ")) - return nil, err -} - -/** - : - the file count is the total count of the volume received from user clients -todo: this policy is not perfected or not rigorous, need fix - */ -func filterErrorReplication(fileStat []FileStatus) (okUrls, errUrls []string) { - sort.Slice(fileStat, func(i, j int) bool { - return fileStat[i].fileStat.FileCount > fileStat[j].fileStat.FileCount - }) - if fileStat[0].fileStat.FileCount != fileStat[len(fileStat)-1].fileStat.FileCount { - okFileCounter := fileStat[0].fileStat.FileCount - for _, v := range fileStat { - if okFileCounter == v.fileStat.FileCount { - okUrls = append(okUrls, v.url) - } else { - errUrls = append(errUrls, v.url) - } - } - return - } - return -} - -// execute the compact transaction -func compactVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - glog.V(0).Infoln("Start vacuuming", vid, "on", volumeUrl) - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when vacuuming %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete vacuuming volume:%v on %s", vid, volumeUrl) - return true -} - -// commit the compact transaction when compactVolume() return true -func commitCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when committing vacuum %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, volumeUrl) - return true -} - -// rollback the compact transaction when compactVolume return false -func cleanupCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { - glog.V(0).Infoln("Start cleaning up", vid, "on", volumeUrl) - err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumeId: uint32(vid), - }) - return err - }) - if err != nil { - glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, volumeUrl, err) - return false - } - glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, volumeUrl) - return false -} - -func tryCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid storage.VolumeId, volumeUrl string) bool { - if compactVolume(ctx, grpcDialOption, volumeUrl, vid) == false { - return cleanupCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) - } - return commitCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) -} - -func tryBatchCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, - vid storage.VolumeId, urls []string) bool { - resultChan := make(chan error) - var wg sync.WaitGroup - wg.Add(len(urls)) - for _, url := range urls { - go func(volumeUrl string) { - defer wg.Done() - if tryCompactVolume(ctx, grpcDialOption, vid, volumeUrl) == false { - resultChan <- fmt.Errorf("url:%s", volumeUrl) - } - }(url) - } - - go func() { - wg.Wait() - close(resultChan) - }() - - var errs []string - for result := range resultChan { - if result != nil { - errs = append(errs, result.Error()) - } - } - if len(errs) > 0 { - glog.Errorf("consist volume:%v compact reversion failed, %s", vid, strings.Join(errs, "; ")) - return false - } - return true -} - -func isSameVolumeReplications(fileStat []FileStatus, volumeSizeLimit uint64) bool { - fileSizeSet := make(map[uint64]bool) - fileCountSet := make(map[uint64]bool) - lastModifiedSet := make(map[uint64]bool) - var oneFileSize uint64 = 0 - for _, v := range fileStat { - fileCountSet[v.fileStat.FileCount] = true - lastModifiedSet[v.fileStat.DatFileTimestamp] = true - fileSizeSet[v.fileStat.DatFileSize] = true - oneFileSize = v.fileStat.DatFileSize - } - - if (len(lastModifiedSet) == 1) && (len(fileCountSet) == 1) && - (len(fileSizeSet) == 1) && (oneFileSize >= volumeSizeLimit) { - return true - } - return false -} - -func syncReplication(grpcDialOption grpc.DialOption, srcUrl, destUrl string, vinfo storage.VolumeInfo) error { - ctx := context.Background() - err := operation.WithVolumeServerClient(destUrl, grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - if _, err := client.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ - VolumeId: uint32(vinfo.Id), - Collection: vinfo.Collection, - Replication: vinfo.ReplicaPlacement.String(), - Ttl: vinfo.Ttl.String(), - SourceDataNode: srcUrl, - }); err != nil { - glog.Errorf("sync replication failed, %v", err) - return err - } - return nil - }) - return err -}