Browse Source
Merge pull request #1397 from levenlabs/diff
Merge pull request #1397 from levenlabs/diff
Added diffing of multiple volume serverspull/1402/head
Chris Lu
5 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 782 additions and 295 deletions
-
194unmaintained/diff_volume_servers/diff_volume_servers.go
-
15weed/pb/volume_server.proto
-
814weed/pb/volume_server_pb/volume_server.pb.go
-
42weed/server/volume_grpc_admin.go
-
12weed/storage/idx/walk.go
@ -0,0 +1,194 @@ |
|||||
|
package main |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"context" |
||||
|
"errors" |
||||
|
"flag" |
||||
|
"fmt" |
||||
|
"io" |
||||
|
"math" |
||||
|
"os" |
||||
|
"strings" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/operation" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/security" |
||||
|
"github.com/chrislusf/seaweedfs/weed/storage/idx" |
||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
||||
|
"github.com/chrislusf/seaweedfs/weed/storage/types" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"google.golang.org/grpc" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against") |
||||
|
volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers") |
||||
|
volumeCollection = flag.String("collection", "", "the volume collection name") |
||||
|
grpcDialOption grpc.DialOption |
||||
|
) |
||||
|
|
||||
|
/* |
||||
|
Diff the volume's files across multiple volume servers. |
||||
|
diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 |
||||
|
|
||||
|
Example Output: |
||||
|
reference 127.0.0.1:8081 |
||||
|
fileId volumeServer message |
||||
|
5,01617c3f61 127.0.0.1:8080 wrongSize |
||||
|
*/ |
||||
|
func main() { |
||||
|
flag.Parse() |
||||
|
|
||||
|
util.LoadConfiguration("security", false) |
||||
|
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") |
||||
|
|
||||
|
vid := uint32(*volumeId) |
||||
|
servers := strings.Split(*serversStr, ",") |
||||
|
if len(servers) < 2 { |
||||
|
glog.Fatalf("You must specify more than 1 server\n") |
||||
|
} |
||||
|
var referenceServer string |
||||
|
var maxOffset int64 |
||||
|
allFiles := map[string]map[types.NeedleId]needleState{} |
||||
|
for _, addr := range servers { |
||||
|
files, offset, err := getVolumeFiles(vid, addr) |
||||
|
if err != nil { |
||||
|
glog.Fatalf("Failed to copy idx from volume server %s\n", err) |
||||
|
} |
||||
|
allFiles[addr] = files |
||||
|
if offset > maxOffset { |
||||
|
referenceServer = addr |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
same := true |
||||
|
fmt.Println("reference", referenceServer) |
||||
|
fmt.Println("fileId volumeServer message") |
||||
|
for nid, n := range allFiles[referenceServer] { |
||||
|
for addr, files := range allFiles { |
||||
|
if addr == referenceServer { |
||||
|
continue |
||||
|
} |
||||
|
var diffMsg string |
||||
|
n2, ok := files[nid] |
||||
|
if !ok { |
||||
|
if n.state == stateDeleted { |
||||
|
continue |
||||
|
} |
||||
|
diffMsg = "missing" |
||||
|
} else if n2.state != n.state { |
||||
|
switch n.state { |
||||
|
case stateDeleted: |
||||
|
diffMsg = "notDeleted" |
||||
|
case statePresent: |
||||
|
diffMsg = "deleted" |
||||
|
} |
||||
|
} else if n2.size != n.size { |
||||
|
diffMsg = "wrongSize" |
||||
|
} else { |
||||
|
continue |
||||
|
} |
||||
|
same = false |
||||
|
|
||||
|
// fetch the needle details
|
||||
|
var id string |
||||
|
var err error |
||||
|
if n.state == statePresent { |
||||
|
id, err = getNeedleFileId(vid, nid, referenceServer) |
||||
|
} else { |
||||
|
id, err = getNeedleFileId(vid, nid, addr) |
||||
|
} |
||||
|
if err != nil { |
||||
|
glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err) |
||||
|
} |
||||
|
fmt.Println(id, addr, diffMsg) |
||||
|
} |
||||
|
} |
||||
|
if !same { |
||||
|
os.Exit(1) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
const ( |
||||
|
stateDeleted uint8 = 1 |
||||
|
statePresent uint8 = 2 |
||||
|
) |
||||
|
|
||||
|
type needleState struct { |
||||
|
state uint8 |
||||
|
size uint32 |
||||
|
} |
||||
|
|
||||
|
func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { |
||||
|
var idxFile *bytes.Reader |
||||
|
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { |
||||
|
copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ |
||||
|
VolumeId: v, |
||||
|
Ext: ".idx", |
||||
|
CompactionRevision: math.MaxUint32, |
||||
|
StopOffset: math.MaxInt64, |
||||
|
Collection: *volumeCollection, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
var buf bytes.Buffer |
||||
|
for { |
||||
|
resp, err := copyFileClient.Recv() |
||||
|
if errors.Is(err, io.EOF) { |
||||
|
break |
||||
|
} |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
buf.Write(resp.FileContent) |
||||
|
} |
||||
|
idxFile = bytes.NewReader(buf.Bytes()) |
||||
|
return nil |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return nil, 0, err |
||||
|
} |
||||
|
|
||||
|
var maxOffset int64 |
||||
|
files := map[types.NeedleId]needleState{} |
||||
|
err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error { |
||||
|
if offset.IsZero() || size == types.TombstoneFileSize { |
||||
|
files[key] = needleState{ |
||||
|
state: stateDeleted, |
||||
|
size: size, |
||||
|
} |
||||
|
} else { |
||||
|
files[key] = needleState{ |
||||
|
state: statePresent, |
||||
|
size: size, |
||||
|
} |
||||
|
} |
||||
|
if actual := offset.ToAcutalOffset(); actual > maxOffset { |
||||
|
maxOffset = actual |
||||
|
} |
||||
|
return nil |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return nil, 0, err |
||||
|
} |
||||
|
return files, maxOffset, nil |
||||
|
} |
||||
|
|
||||
|
func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) { |
||||
|
var id string |
||||
|
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { |
||||
|
resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ |
||||
|
VolumeId: v, |
||||
|
NeedleId: uint64(nid), |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String() |
||||
|
return nil |
||||
|
}) |
||||
|
return id, err |
||||
|
} |
814
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
Write
Preview
Loading…
Cancel
Save
Reference in new issue