You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
5.0 KiB

4 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "math"
  10. "os"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/security"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. "google.golang.org/grpc"
  21. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  22. )
  23. var (
  24. serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
  25. volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
  26. volumeCollection = flag.String("collection", "", "the volume collection name")
  27. grpcDialOption grpc.DialOption
  28. )
  29. /*
  30. Diff the volume's files across multiple volume servers.
  31. diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
  32. Example Output:
  33. reference 127.0.0.1:8081
  34. fileId volumeServer message
  35. 5,01617c3f61 127.0.0.1:8080 wrongSize
  36. */
  37. func main() {
  38. flag.Parse()
  39. util_http.InitGlobalHttpClient()
  40. util.LoadSecurityConfiguration()
  41. grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  42. vid := uint32(*volumeId)
  43. servers := pb.ServerAddresses(*serversStr).ToAddresses()
  44. if len(servers) < 2 {
  45. glog.Fatalf("You must specify more than 1 server\n")
  46. }
  47. var referenceServer pb.ServerAddress
  48. var maxOffset int64
  49. allFiles := map[pb.ServerAddress]map[types.NeedleId]needleState{}
  50. for _, addr := range servers {
  51. files, offset, err := getVolumeFiles(vid, addr)
  52. if err != nil {
  53. glog.Fatalf("Failed to copy idx from volume server %s\n", err)
  54. }
  55. allFiles[addr] = files
  56. if offset > maxOffset {
  57. referenceServer = addr
  58. }
  59. }
  60. same := true
  61. fmt.Println("reference", referenceServer)
  62. fmt.Println("fileId volumeServer message")
  63. for nid, n := range allFiles[referenceServer] {
  64. for addr, files := range allFiles {
  65. if addr == referenceServer {
  66. continue
  67. }
  68. var diffMsg string
  69. n2, ok := files[nid]
  70. if !ok {
  71. if n.state == stateDeleted {
  72. continue
  73. }
  74. diffMsg = "missing"
  75. } else if n2.state != n.state {
  76. switch n.state {
  77. case stateDeleted:
  78. diffMsg = "notDeleted"
  79. case statePresent:
  80. diffMsg = "deleted"
  81. }
  82. } else if n2.size != n.size {
  83. diffMsg = "wrongSize"
  84. } else {
  85. continue
  86. }
  87. same = false
  88. // fetch the needle details
  89. var id string
  90. var err error
  91. if n.state == statePresent {
  92. id, err = getNeedleFileId(vid, nid, referenceServer)
  93. } else {
  94. id, err = getNeedleFileId(vid, nid, addr)
  95. }
  96. if err != nil {
  97. glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
  98. }
  99. fmt.Println(id, addr, diffMsg)
  100. }
  101. }
  102. if !same {
  103. os.Exit(1)
  104. }
  105. }
  106. const (
  107. stateDeleted uint8 = 1
  108. statePresent uint8 = 2
  109. )
  110. type needleState struct {
  111. state uint8
  112. size types.Size
  113. }
  114. func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) {
  115. var idxFile *bytes.Reader
  116. err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  117. ctx, cancel := context.WithCancel(context.Background())
  118. defer cancel()
  119. copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  120. VolumeId: v,
  121. Ext: ".idx",
  122. CompactionRevision: math.MaxUint32,
  123. StopOffset: math.MaxInt64,
  124. Collection: *volumeCollection,
  125. })
  126. if err != nil {
  127. return err
  128. }
  129. var buf bytes.Buffer
  130. for {
  131. resp, err := copyFileClient.Recv()
  132. if errors.Is(err, io.EOF) {
  133. break
  134. }
  135. if err != nil {
  136. return err
  137. }
  138. buf.Write(resp.FileContent)
  139. }
  140. idxFile = bytes.NewReader(buf.Bytes())
  141. return nil
  142. })
  143. if err != nil {
  144. return nil, 0, err
  145. }
  146. var maxOffset int64
  147. files := map[types.NeedleId]needleState{}
  148. err = idx.WalkIndexFile(idxFile, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
  149. if offset.IsZero() || size.IsDeleted() {
  150. files[key] = needleState{
  151. state: stateDeleted,
  152. size: size,
  153. }
  154. } else {
  155. files[key] = needleState{
  156. state: statePresent,
  157. size: size,
  158. }
  159. }
  160. if actual := offset.ToActualOffset(); actual > maxOffset {
  161. maxOffset = actual
  162. }
  163. return nil
  164. })
  165. if err != nil {
  166. return nil, 0, err
  167. }
  168. return files, maxOffset, nil
  169. }
  170. func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) {
  171. var id string
  172. err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  173. resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  174. VolumeId: v,
  175. NeedleId: uint64(nid),
  176. })
  177. if err != nil {
  178. return err
  179. }
  180. id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
  181. return nil
  182. })
  183. return id, err
  184. }