1 changed files with 194 additions and 0 deletions
			
			
		@ -0,0 +1,194 @@ | 
				
			|||
package main | 
				
			|||
 | 
				
			|||
import ( | 
				
			|||
	"bytes" | 
				
			|||
	"context" | 
				
			|||
	"errors" | 
				
			|||
	"flag" | 
				
			|||
	"fmt" | 
				
			|||
	"io" | 
				
			|||
	"math" | 
				
			|||
	"os" | 
				
			|||
	"strings" | 
				
			|||
 | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/glog" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/operation" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/security" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/storage/idx" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/storage/needle" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/storage/types" | 
				
			|||
	"github.com/chrislusf/seaweedfs/weed/util" | 
				
			|||
	"google.golang.org/grpc" | 
				
			|||
) | 
				
			|||
 | 
				
			|||
var ( | 
				
			|||
	serversStr       = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against") | 
				
			|||
	volumeId         = flag.Int("volumeId", -1, "a volume id to diff from servers") | 
				
			|||
	volumeCollection = flag.String("collection", "", "the volume collection name") | 
				
			|||
	grpcDialOption   grpc.DialOption | 
				
			|||
) | 
				
			|||
 | 
				
			|||
/* | 
				
			|||
	Diff the volume's files across multiple volume servers. | 
				
			|||
	diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 | 
				
			|||
 | 
				
			|||
	Example Output: | 
				
			|||
	reference 127.0.0.1:8081 | 
				
			|||
	fileId volumeServer message | 
				
			|||
	5,01617c3f61 127.0.0.1:8080 wrongSize | 
				
			|||
*/ | 
				
			|||
func main() { | 
				
			|||
	flag.Parse() | 
				
			|||
 | 
				
			|||
	util.LoadConfiguration("security", false) | 
				
			|||
	grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") | 
				
			|||
 | 
				
			|||
	vid := uint32(*volumeId) | 
				
			|||
	servers := strings.Split(*serversStr, ",") | 
				
			|||
	if len(servers) < 2 { | 
				
			|||
		glog.Fatalf("You must specify more than 1 server\n") | 
				
			|||
	} | 
				
			|||
	var referenceServer string | 
				
			|||
	var maxOffset int64 | 
				
			|||
	allFiles := map[string]map[types.NeedleId]needleState{} | 
				
			|||
	for _, addr := range servers { | 
				
			|||
		files, offset, err := getVolumeFiles(vid, addr) | 
				
			|||
		if err != nil { | 
				
			|||
			glog.Fatalf("Failed to copy idx from volume server %s\n", err) | 
				
			|||
		} | 
				
			|||
		allFiles[addr] = files | 
				
			|||
		if offset > maxOffset { | 
				
			|||
			referenceServer = addr | 
				
			|||
		} | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	same := true | 
				
			|||
	fmt.Println("reference", referenceServer) | 
				
			|||
	fmt.Println("fileId volumeServer message") | 
				
			|||
	for nid, n := range allFiles[referenceServer] { | 
				
			|||
		for addr, files := range allFiles { | 
				
			|||
			if addr == referenceServer { | 
				
			|||
				continue | 
				
			|||
			} | 
				
			|||
			var diffMsg string | 
				
			|||
			n2, ok := files[nid] | 
				
			|||
			if !ok { | 
				
			|||
				if n.state == stateDeleted { | 
				
			|||
					continue | 
				
			|||
				} | 
				
			|||
				diffMsg = "missing" | 
				
			|||
			} else if n2.state != n.state { | 
				
			|||
				switch n.state { | 
				
			|||
				case stateDeleted: | 
				
			|||
					diffMsg = "notDeleted" | 
				
			|||
				case statePresent: | 
				
			|||
					diffMsg = "deleted" | 
				
			|||
				} | 
				
			|||
			} else if n2.size != n.size { | 
				
			|||
				diffMsg = "wrongSize" | 
				
			|||
			} else { | 
				
			|||
				continue | 
				
			|||
			} | 
				
			|||
			same = false | 
				
			|||
 | 
				
			|||
			// fetch the needle details
 | 
				
			|||
			var id string | 
				
			|||
			var err error | 
				
			|||
			if n.state == statePresent { | 
				
			|||
				id, err = getNeedleFileId(vid, nid, referenceServer) | 
				
			|||
			} else { | 
				
			|||
				id, err = getNeedleFileId(vid, nid, addr) | 
				
			|||
			} | 
				
			|||
			if err != nil { | 
				
			|||
				glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err) | 
				
			|||
			} | 
				
			|||
			fmt.Println(id, addr, diffMsg) | 
				
			|||
		} | 
				
			|||
	} | 
				
			|||
	if !same { | 
				
			|||
		os.Exit(1) | 
				
			|||
	} | 
				
			|||
} | 
				
			|||
 | 
				
			|||
const ( | 
				
			|||
	stateDeleted uint8 = 1 | 
				
			|||
	statePresent uint8 = 2 | 
				
			|||
) | 
				
			|||
 | 
				
			|||
type needleState struct { | 
				
			|||
	state uint8 | 
				
			|||
	size  uint32 | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) { | 
				
			|||
	var idxFile *bytes.Reader | 
				
			|||
	err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { | 
				
			|||
		copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ | 
				
			|||
			VolumeId:           v, | 
				
			|||
			Ext:                ".idx", | 
				
			|||
			CompactionRevision: math.MaxUint32, | 
				
			|||
			StopOffset:         math.MaxInt64, | 
				
			|||
			Collection:         *volumeCollection, | 
				
			|||
		}) | 
				
			|||
		if err != nil { | 
				
			|||
			return err | 
				
			|||
		} | 
				
			|||
		var buf bytes.Buffer | 
				
			|||
		for { | 
				
			|||
			resp, err := copyFileClient.Recv() | 
				
			|||
			if errors.Is(err, io.EOF) { | 
				
			|||
				break | 
				
			|||
			} | 
				
			|||
			if err != nil { | 
				
			|||
				return err | 
				
			|||
			} | 
				
			|||
			buf.Write(resp.FileContent) | 
				
			|||
		} | 
				
			|||
		idxFile = bytes.NewReader(buf.Bytes()) | 
				
			|||
		return nil | 
				
			|||
	}) | 
				
			|||
	if err != nil { | 
				
			|||
		return nil, 0, err | 
				
			|||
	} | 
				
			|||
 | 
				
			|||
	var maxOffset int64 | 
				
			|||
	files := map[types.NeedleId]needleState{} | 
				
			|||
	err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error { | 
				
			|||
		if offset.IsZero() || size == types.TombstoneFileSize { | 
				
			|||
			files[key] = needleState{ | 
				
			|||
				state: stateDeleted, | 
				
			|||
				size:  size, | 
				
			|||
			} | 
				
			|||
		} else { | 
				
			|||
			files[key] = needleState{ | 
				
			|||
				state: statePresent, | 
				
			|||
				size:  size, | 
				
			|||
			} | 
				
			|||
		} | 
				
			|||
		if actual := offset.ToAcutalOffset(); actual > maxOffset { | 
				
			|||
			maxOffset = actual | 
				
			|||
		} | 
				
			|||
		return nil | 
				
			|||
	}) | 
				
			|||
	if err != nil { | 
				
			|||
		return nil, 0, err | 
				
			|||
	} | 
				
			|||
	return files, maxOffset, nil | 
				
			|||
} | 
				
			|||
 | 
				
			|||
func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) { | 
				
			|||
	var id string | 
				
			|||
	err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { | 
				
			|||
		resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ | 
				
			|||
			VolumeId: v, | 
				
			|||
			NeedleId: uint64(nid), | 
				
			|||
		}) | 
				
			|||
		if err != nil { | 
				
			|||
			return err | 
				
			|||
		} | 
				
			|||
		id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String() | 
				
			|||
		return nil | 
				
			|||
	}) | 
				
			|||
	return id, err | 
				
			|||
} | 
				
			|||
						Write
						Preview
					
					
					Loading…
					
					Cancel
						Save
					
		Reference in new issue