Chris Lu
6 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 545 additions and 658 deletions
-
27weed/command/backup.go
-
2weed/command/export.go
-
2weed/command/fix.go
-
41weed/operation/sync_volume.go
-
20weed/pb/volume_server.proto
-
380weed/pb/volume_server_pb/volume_server.pb.go
-
67weed/server/volume_grpc_follow.go
-
2weed/server/volume_grpc_replicate.go
-
101weed/server/volume_grpc_sync.go
-
6weed/shell/command_fs_du.go
-
2weed/storage/store_vacuum.go
-
253weed/storage/volume_follow.go
-
39weed/storage/volume_follow_test.go
-
28weed/storage/volume_read_write.go
-
225weed/storage/volume_sync.go
-
6weed/storage/volume_vacuum.go
-
2weed/storage/volume_vacuum_test.go
@ -0,0 +1,67 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
"io" |
|||
"os" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/storage" |
|||
) |
|||
|
|||
func (vs *VolumeServer) VolumeFollow(req *volume_server_pb.VolumeFollowRequest, stream volume_server_pb.VolumeServer_VolumeFollowServer) error { |
|||
|
|||
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
stopOffset := v.Size() |
|||
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since) |
|||
if err != nil { |
|||
return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.Since, err) |
|||
} |
|||
|
|||
if isLastOne { |
|||
return nil |
|||
} |
|||
|
|||
startOffset := int64(foundOffset) * int64(types.NeedlePaddingSize) |
|||
|
|||
buf := make([]byte, 1024*1024*2) |
|||
return sendFileContent(v.DataFile(), buf, startOffset, stopOffset, stream) |
|||
|
|||
} |
|||
|
|||
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { |
|||
|
|||
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return nil, fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
resp := v.GetVolumeSyncStatus() |
|||
|
|||
return resp, nil |
|||
|
|||
} |
|||
|
|||
func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeFollowServer) error { |
|||
var blockSizeLimit = int64(len(buf)) |
|||
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { |
|||
n, readErr := datFile.ReadAt(buf, startOffset+i) |
|||
if readErr == nil || readErr == io.EOF { |
|||
resp := &volume_server_pb.VolumeFollowResponse{} |
|||
resp.FileContent = buf[:int64(n)] |
|||
sendErr := stream.Send(resp) |
|||
if sendErr != nil { |
|||
return sendErr |
|||
} |
|||
} else { |
|||
return readErr |
|||
} |
|||
} |
|||
return nil |
|||
} |
@ -1,101 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/storage" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
) |
|||
|
|||
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { |
|||
|
|||
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return nil, fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
resp := v.GetVolumeSyncStatus() |
|||
|
|||
glog.V(2).Infof("volume sync status %d", req.VolumeId) |
|||
|
|||
return resp, nil |
|||
|
|||
} |
|||
|
|||
func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error { |
|||
|
|||
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
content, err := v.IndexFileContent() |
|||
|
|||
if err != nil { |
|||
glog.Errorf("sync volume %d index: %v", req.VolumeId, err) |
|||
} else { |
|||
glog.V(2).Infof("sync volume %d index", req.VolumeId) |
|||
} |
|||
|
|||
const blockSizeLimit = 1024 * 1024 * 2 |
|||
for i := 0; i < len(content); i += blockSizeLimit { |
|||
blockSize := len(content) - i |
|||
if blockSize > blockSizeLimit { |
|||
blockSize = blockSizeLimit |
|||
} |
|||
resp := &volume_server_pb.VolumeSyncIndexResponse{} |
|||
resp.IndexFileContent = content[i : i+blockSize] |
|||
stream.Send(resp) |
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
|||
|
|||
func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error { |
|||
|
|||
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
if uint32(v.SuperBlock.CompactRevision) != req.Revision { |
|||
return fmt.Errorf("requested volume revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision) |
|||
} |
|||
|
|||
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version()) |
|||
if err != nil { |
|||
return fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size) |
|||
} |
|||
|
|||
id, err := types.ParseNeedleId(req.NeedleId) |
|||
if err != nil { |
|||
return fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err) |
|||
} |
|||
n := new(storage.Needle) |
|||
n.ParseNeedleHeader(content) |
|||
if id != n.Id { |
|||
return fmt.Errorf("expected file entry id %d, but found %d", id, n.Id) |
|||
} |
|||
|
|||
if err != nil { |
|||
glog.Errorf("sync volume %d data: %v", req.VolumeId, err) |
|||
} |
|||
|
|||
const blockSizeLimit = 1024 * 1024 * 2 |
|||
for i := 0; i < len(content); i += blockSizeLimit { |
|||
blockSize := len(content) - i |
|||
if blockSize > blockSizeLimit { |
|||
blockSize = blockSizeLimit |
|||
} |
|||
resp := &volume_server_pb.VolumeSyncDataResponse{} |
|||
resp.FileContent = content[i : i+blockSize] |
|||
stream.Send(resp) |
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
@ -0,0 +1,253 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
. "github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
"google.golang.org/grpc" |
|||
"io" |
|||
"os" |
|||
) |
|||
|
|||
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { |
|||
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} |
|||
if stat, err := v.dataFile.Stat(); err == nil { |
|||
syncStatus.TailOffset = uint64(stat.Size()) |
|||
} |
|||
syncStatus.Collection = v.Collection |
|||
syncStatus.IdxFileSize = v.nm.IndexFileSize() |
|||
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision) |
|||
syncStatus.Ttl = v.SuperBlock.Ttl.String() |
|||
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() |
|||
return syncStatus |
|||
} |
|||
|
|||
// The volume sync with a master volume via 2 steps:
|
|||
// 1. The slave checks master side to find subscription checkpoint
|
|||
// to setup the replication.
|
|||
// 2. The slave receives the updates from master
|
|||
|
|||
/* |
|||
Assume the slave volume needs to follow the master volume. |
|||
|
|||
The master volume could be compacted, and could be many files ahead of |
|||
slave volume. |
|||
|
|||
Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
|
|||
0.1 If slave compact version is less than the master, do a local compaction, and set |
|||
local compact version the same as the master. |
|||
0.2 If the slave size is still bigger than the master, discard local copy and do a full copy. |
|||
|
|||
Step 1: |
|||
The slave volume ask the master by the last modification time t. |
|||
The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file), |
|||
to find the first entry with appendAtNs > t. |
|||
|
|||
Step 2: |
|||
The master send content bytes to the slave. The bytes are not chunked by needle. |
|||
|
|||
Step 3: |
|||
The slave generate the needle map for the new bytes. (This may be optimized to incrementally |
|||
update needle map when receiving new .dat bytes. But seems not necessary now.) |
|||
|
|||
*/ |
|||
|
|||
func (v *Volume) Follow(volumeServer string, grpcDialOption grpc.DialOption) error { |
|||
|
|||
ctx := context.Background() |
|||
|
|||
startFromOffset := v.Size() |
|||
appendAtNs, err := v.findLastAppendAtNs() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { |
|||
|
|||
stream, err := client.VolumeFollow(ctx, &volume_server_pb.VolumeFollowRequest{ |
|||
VolumeId: uint32(v.Id), |
|||
Since: appendAtNs, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
v.dataFile.Seek(startFromOffset, io.SeekStart) |
|||
|
|||
for { |
|||
resp, recvErr := stream.Recv() |
|||
if recvErr != nil { |
|||
if recvErr == io.EOF { |
|||
break |
|||
} else { |
|||
return recvErr |
|||
} |
|||
} |
|||
|
|||
_, writeErr := v.dataFile.Write(resp.FileContent) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
|
|||
}) |
|||
|
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// add to needle map
|
|||
return ScanVolumeFileFrom(v.version, v.dataFile, startFromOffset, &VolumeFileScanner4GenIdx{v: v}) |
|||
|
|||
} |
|||
|
|||
func (v *Volume) findLastAppendAtNs() (uint64, error) { |
|||
offset, err := v.locateLastAppendEntry() |
|||
if err != nil { |
|||
return 0, err |
|||
} |
|||
if offset == 0 { |
|||
return 0, nil |
|||
} |
|||
return v.readAppendAtNs(offset) |
|||
} |
|||
|
|||
func (v *Volume) locateLastAppendEntry() (Offset, error) { |
|||
indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) |
|||
if e != nil { |
|||
return 0, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e) |
|||
} |
|||
defer indexFile.Close() |
|||
|
|||
fi, err := indexFile.Stat() |
|||
if err != nil { |
|||
return 0, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err) |
|||
} |
|||
fileSize := fi.Size() |
|||
if fileSize%NeedleEntrySize != 0 { |
|||
return 0, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) |
|||
} |
|||
if fileSize == 0 { |
|||
return 0, nil |
|||
} |
|||
|
|||
bytes := make([]byte, NeedleEntrySize) |
|||
n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize) |
|||
if n != NeedleEntrySize { |
|||
return 0, fmt.Errorf("file %s read error: %v", indexFile.Name(), e) |
|||
} |
|||
_, offset, _ := IdxFileEntry(bytes) |
|||
|
|||
return offset, nil |
|||
} |
|||
|
|||
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { |
|||
|
|||
n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize) |
|||
if err != nil { |
|||
return 0, fmt.Errorf("ReadNeedleHeader: %v", err) |
|||
} |
|||
err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize+int64(NeedleEntrySize), bodyLength) |
|||
if err != nil { |
|||
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", int64(offset)*NeedlePaddingSize, bodyLength, err) |
|||
} |
|||
return n.AppendAtNs, nil |
|||
|
|||
} |
|||
|
|||
// on server side
|
|||
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) { |
|||
indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) |
|||
if openErr != nil { |
|||
err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr) |
|||
return |
|||
} |
|||
defer indexFile.Close() |
|||
|
|||
fi, statErr := indexFile.Stat() |
|||
if statErr != nil { |
|||
err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr) |
|||
return |
|||
} |
|||
fileSize := fi.Size() |
|||
if fileSize%NeedleEntrySize != 0 { |
|||
err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) |
|||
return |
|||
} |
|||
|
|||
bytes := make([]byte, NeedleEntrySize) |
|||
entryCount := fileSize / NeedleEntrySize |
|||
l := int64(0) |
|||
h := entryCount |
|||
|
|||
for l < h { |
|||
|
|||
m := (l + h) / 2 |
|||
|
|||
if m == entryCount { |
|||
return 0, true, nil |
|||
} |
|||
|
|||
// read the appendAtNs for entry m
|
|||
offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
mNs, nsReadErr := v.readAppendAtNs(offset) |
|||
if nsReadErr != nil { |
|||
err = nsReadErr |
|||
return |
|||
} |
|||
|
|||
// move the boundary
|
|||
if mNs <= sinceNs { |
|||
l = m + 1 |
|||
} else { |
|||
h = m |
|||
} |
|||
|
|||
} |
|||
|
|||
if l == entryCount { |
|||
return 0, true, nil |
|||
} |
|||
|
|||
offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l) |
|||
|
|||
return offset, false, err |
|||
|
|||
} |
|||
|
|||
// bytes is of size NeedleEntrySize
|
|||
func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) { |
|||
if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF { |
|||
return 0, readErr |
|||
} |
|||
_, offset, _ := IdxFileEntry(bytes) |
|||
return offset, nil |
|||
} |
|||
|
|||
// generate the volume idx
|
|||
type VolumeFileScanner4GenIdx struct { |
|||
v *Volume |
|||
} |
|||
|
|||
func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error { |
|||
return nil |
|||
|
|||
} |
|||
func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool { |
|||
return false |
|||
} |
|||
|
|||
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *Needle, offset int64) error { |
|||
if n.Size > 0 && n.Size != TombstoneFileSize { |
|||
return scanner.v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size) |
|||
} |
|||
return scanner.v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)) |
|||
} |
@ -0,0 +1,39 @@ |
|||
package storage |
|||
|
|||
import "testing" |
|||
|
|||
func TestBinarySearch(t *testing.T) { |
|||
var testInput []int |
|||
testInput = []int{-1, 0, 3, 5, 9, 12} |
|||
|
|||
if 3 != binarySearchForLargerThanTarget(testInput, 4) { |
|||
t.Errorf("failed to find target %d", 4) |
|||
} |
|||
if 3 != binarySearchForLargerThanTarget(testInput, 3) { |
|||
t.Errorf("failed to find target %d", 3) |
|||
} |
|||
if 6 != binarySearchForLargerThanTarget(testInput, 12) { |
|||
t.Errorf("failed to find target %d", 12) |
|||
} |
|||
if 1 != binarySearchForLargerThanTarget(testInput, -1) { |
|||
t.Errorf("failed to find target %d", -1) |
|||
} |
|||
if 0 != binarySearchForLargerThanTarget(testInput, -2) { |
|||
t.Errorf("failed to find target %d", -2) |
|||
} |
|||
|
|||
} |
|||
|
|||
func binarySearchForLargerThanTarget(nums []int, target int) int { |
|||
l := 0 |
|||
h := len(nums) |
|||
for l < h { |
|||
m := (l + h) / 2 |
|||
if nums[m] <= target { |
|||
l = m + 1 |
|||
} else { |
|||
h = m |
|||
} |
|||
} |
|||
return l |
|||
} |
@ -1,225 +0,0 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"google.golang.org/grpc" |
|||
"io" |
|||
"os" |
|||
"sort" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|||
. "github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
) |
|||
|
|||
// The volume sync with a master volume via 2 steps:
|
|||
// 1. The slave checks master side to find subscription checkpoint
|
|||
// to setup the replication.
|
|||
// 2. The slave receives the updates from master
|
|||
|
|||
/* |
|||
Assume the slave volume needs to follow the master volume. |
|||
|
|||
The master volume could be compacted, and could be many files ahead of |
|||
slave volume. |
|||
|
|||
Step 1: |
|||
The slave volume will ask the master volume for a snapshot |
|||
of (existing file entries, last offset, number of compacted times). |
|||
|
|||
For each entry x in master existing file entries: |
|||
if x does not exist locally: |
|||
add x locally |
|||
|
|||
For each entry y in local slave existing file entries: |
|||
if y does not exist on master: |
|||
delete y locally |
|||
|
|||
Step 2: |
|||
After this, use the last offset and number of compacted times to request |
|||
the master volume to send a new file, and keep looping. If the number of |
|||
compacted times is changed, go back to step 1 (very likely this can be |
|||
optimized more later). |
|||
|
|||
*/ |
|||
|
|||
func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) { |
|||
var lastCompactRevision uint16 = 0 |
|||
var compactRevision uint16 = 0 |
|||
var masterMap *needle.CompactMap |
|||
for i := 0; i < 3; i++ { |
|||
if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil { |
|||
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) |
|||
} |
|||
if lastCompactRevision != compactRevision && lastCompactRevision != 0 { |
|||
if err = v.Compact(0); err != nil { |
|||
return fmt.Errorf("Compact Volume before synchronizing %v", err) |
|||
} |
|||
if err = v.commitCompact(); err != nil { |
|||
return fmt.Errorf("Commit Compact before synchronizing %v", err) |
|||
} |
|||
} |
|||
lastCompactRevision = compactRevision |
|||
if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil { |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
type ByOffset []needle.NeedleValue |
|||
|
|||
func (a ByOffset) Len() int { return len(a) } |
|||
func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
|||
func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } |
|||
|
|||
// trySynchronizing sync with remote volume server incrementally by
|
|||
// make up the local and remote delta.
|
|||
func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error { |
|||
slaveIdxFile, err := os.Open(v.nm.IndexFileName()) |
|||
if err != nil { |
|||
return fmt.Errorf("Open volume %d index file: %v", v.Id, err) |
|||
} |
|||
defer slaveIdxFile.Close() |
|||
slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile) |
|||
if err != nil { |
|||
return fmt.Errorf("Load volume %d index file: %v", v.Id, err) |
|||
} |
|||
var delta []needle.NeedleValue |
|||
if err := masterMap.Visit(func(needleValue needle.NeedleValue) error { |
|||
if needleValue.Key == NeedleIdEmpty { |
|||
return nil |
|||
} |
|||
if _, ok := slaveMap.Get(needleValue.Key); ok { |
|||
return nil // skip intersection
|
|||
} |
|||
delta = append(delta, needleValue) |
|||
return nil |
|||
}); err != nil { |
|||
return fmt.Errorf("Add master entry: %v", err) |
|||
} |
|||
if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error { |
|||
if needleValue.Key == NeedleIdEmpty { |
|||
return nil |
|||
} |
|||
if _, ok := masterMap.Get(needleValue.Key); ok { |
|||
return nil // skip intersection
|
|||
} |
|||
needleValue.Size = 0 |
|||
delta = append(delta, needleValue) |
|||
return nil |
|||
}); err != nil { |
|||
return fmt.Errorf("Remove local entry: %v", err) |
|||
} |
|||
|
|||
// simulate to same ordering of remote .dat file needle entries
|
|||
sort.Sort(ByOffset(delta)) |
|||
|
|||
// make up the delta
|
|||
fetchCount := 0 |
|||
for _, needleValue := range delta { |
|||
if needleValue.Size == 0 { |
|||
// remove file entry from local
|
|||
v.removeNeedle(needleValue.Key) |
|||
continue |
|||
} |
|||
// add master file entry to local data file
|
|||
if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil { |
|||
glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) |
|||
return err |
|||
} |
|||
fetchCount++ |
|||
} |
|||
glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) |
|||
return nil |
|||
} |
|||
|
|||
func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) { |
|||
m = needle.NewCompactMap() |
|||
|
|||
syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) |
|||
if err != nil { |
|||
return m, 0, 0, err |
|||
} |
|||
|
|||
total := 0 |
|||
err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) { |
|||
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
|
|||
if offset > 0 && size != TombstoneFileSize { |
|||
m.Set(NeedleId(key), offset, size) |
|||
} else { |
|||
m.Delete(NeedleId(key)) |
|||
} |
|||
total++ |
|||
}) |
|||
|
|||
glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) |
|||
return m, syncStatus.TailOffset, uint16(syncStatus.CompactRevision), err |
|||
|
|||
} |
|||
|
|||
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { |
|||
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} |
|||
if stat, err := v.dataFile.Stat(); err == nil { |
|||
syncStatus.TailOffset = uint64(stat.Size()) |
|||
} |
|||
syncStatus.Collection = v.Collection |
|||
syncStatus.IdxFileSize = v.nm.IndexFileSize() |
|||
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactRevision) |
|||
syncStatus.Ttl = v.SuperBlock.Ttl.String() |
|||
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() |
|||
return syncStatus |
|||
} |
|||
|
|||
func (v *Volume) IndexFileContent() ([]byte, error) { |
|||
return v.nm.IndexFileContent() |
|||
} |
|||
|
|||
// removeNeedle removes one needle by needle key
|
|||
func (v *Volume) removeNeedle(key NeedleId) { |
|||
n := new(Needle) |
|||
n.Id = key |
|||
v.deleteNeedle(n) |
|||
} |
|||
|
|||
// fetchNeedle fetches a remote volume needle by vid, id, offset
|
|||
// The compact revision is checked first in case the remote volume
|
|||
// is compacted and the offset is invalid any more.
|
|||
func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error { |
|||
|
|||
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { |
|||
stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{ |
|||
VolumeId: uint32(v.Id), |
|||
Revision: uint32(compactRevision), |
|||
Offset: uint32(needleValue.Offset), |
|||
Size: uint32(needleValue.Size), |
|||
NeedleId: needleValue.Key.String(), |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
var fileContent []byte |
|||
for { |
|||
resp, err := stream.Recv() |
|||
if err == io.EOF { |
|||
break |
|||
} |
|||
if err != nil { |
|||
return fmt.Errorf("read needle %v: %v", needleValue.Key.String(), err) |
|||
} |
|||
fileContent = append(fileContent, resp.FileContent...) |
|||
} |
|||
|
|||
offset, err := v.AppendBlob(fileContent) |
|||
if err != nil { |
|||
return fmt.Errorf("Appending volume %d error: %v", v.Id, err) |
|||
} |
|||
// println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
|
|||
v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size) |
|||
return nil |
|||
}) |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue