196 lines
4.8 KiB
196 lines
4.8 KiB
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/idx"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
|
|
volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
|
|
volumeCollection = flag.String("collection", "", "the volume collection name")
|
|
grpcDialOption grpc.DialOption
|
|
)
|
|
|
|
/*
|
|
Diff the volume's files across multiple volume servers.
|
|
diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
|
|
|
|
Example Output:
|
|
reference 127.0.0.1:8081
|
|
fileId volumeServer message
|
|
5,01617c3f61 127.0.0.1:8080 wrongSize
|
|
*/
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
util.LoadConfiguration("security", false)
|
|
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
vid := uint32(*volumeId)
|
|
servers := strings.Split(*serversStr, ",")
|
|
if len(servers) < 2 {
|
|
glog.Fatalf("You must specify more than 1 server\n")
|
|
}
|
|
var referenceServer string
|
|
var maxOffset int64
|
|
allFiles := map[string]map[types.NeedleId]needleState{}
|
|
for _, addr := range servers {
|
|
files, offset, err := getVolumeFiles(vid, addr)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to copy idx from volume server %s\n", err)
|
|
}
|
|
allFiles[addr] = files
|
|
if offset > maxOffset {
|
|
referenceServer = addr
|
|
}
|
|
}
|
|
|
|
same := true
|
|
fmt.Println("reference", referenceServer)
|
|
fmt.Println("fileId volumeServer message")
|
|
for nid, n := range allFiles[referenceServer] {
|
|
for addr, files := range allFiles {
|
|
if addr == referenceServer {
|
|
continue
|
|
}
|
|
var diffMsg string
|
|
n2, ok := files[nid]
|
|
if !ok {
|
|
if n.state == stateDeleted {
|
|
continue
|
|
}
|
|
diffMsg = "missing"
|
|
} else if n2.state != n.state {
|
|
switch n.state {
|
|
case stateDeleted:
|
|
diffMsg = "notDeleted"
|
|
case statePresent:
|
|
diffMsg = "deleted"
|
|
}
|
|
} else if n2.size != n.size {
|
|
diffMsg = "wrongSize"
|
|
} else {
|
|
continue
|
|
}
|
|
same = false
|
|
|
|
// fetch the needle details
|
|
var id string
|
|
var err error
|
|
if n.state == statePresent {
|
|
id, err = getNeedleFileId(vid, nid, referenceServer)
|
|
} else {
|
|
id, err = getNeedleFileId(vid, nid, addr)
|
|
}
|
|
if err != nil {
|
|
glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
|
|
}
|
|
fmt.Println(id, addr, diffMsg)
|
|
}
|
|
}
|
|
if !same {
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
const (
|
|
stateDeleted uint8 = 1
|
|
statePresent uint8 = 2
|
|
)
|
|
|
|
type needleState struct {
|
|
state uint8
|
|
size types.Size
|
|
}
|
|
|
|
func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
|
|
var idxFile *bytes.Reader
|
|
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
|
VolumeId: v,
|
|
Ext: ".idx",
|
|
CompactionRevision: math.MaxUint32,
|
|
StopOffset: math.MaxInt64,
|
|
Collection: *volumeCollection,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var buf bytes.Buffer
|
|
for {
|
|
resp, err := copyFileClient.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
buf.Write(resp.FileContent)
|
|
}
|
|
idxFile = bytes.NewReader(buf.Bytes())
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
var maxOffset int64
|
|
files := map[types.NeedleId]needleState{}
|
|
err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
|
if offset.IsZero() || size.IsDeleted() {
|
|
files[key] = needleState{
|
|
state: stateDeleted,
|
|
size: size,
|
|
}
|
|
} else {
|
|
files[key] = needleState{
|
|
state: statePresent,
|
|
size: size,
|
|
}
|
|
}
|
|
if actual := offset.ToAcutalOffset(); actual > maxOffset {
|
|
maxOffset = actual
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
return files, maxOffset, nil
|
|
}
|
|
|
|
func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) {
|
|
var id string
|
|
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
|
|
resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
|
|
VolumeId: v,
|
|
NeedleId: uint64(nid),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
|
|
return nil
|
|
})
|
|
return id, err
|
|
}
|