You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
4.7 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "math"
  10. "os"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/chrislusf/seaweedfs/weed/security"
  16. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  17. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  18. "github.com/chrislusf/seaweedfs/weed/storage/types"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "google.golang.org/grpc"
  21. )
  22. var (
  23. serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
  24. volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
  25. volumeCollection = flag.String("collection", "", "the volume collection name")
  26. grpcDialOption grpc.DialOption
  27. )
  28. /*
  29. Diff the volume's files across multiple volume servers.
  30. diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
  31. Example Output:
  32. reference 127.0.0.1:8081
  33. fileId volumeServer message
  34. 5,01617c3f61 127.0.0.1:8080 wrongSize
  35. */
  36. func main() {
  37. flag.Parse()
  38. util.LoadConfiguration("security", false)
  39. grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  40. vid := uint32(*volumeId)
  41. servers := strings.Split(*serversStr, ",")
  42. if len(servers) < 2 {
  43. glog.Fatalf("You must specify more than 1 server\n")
  44. }
  45. var referenceServer string
  46. var maxOffset int64
  47. allFiles := map[string]map[types.NeedleId]needleState{}
  48. for _, addr := range servers {
  49. files, offset, err := getVolumeFiles(vid, addr)
  50. if err != nil {
  51. glog.Fatalf("Failed to copy idx from volume server %s\n", err)
  52. }
  53. allFiles[addr] = files
  54. if offset > maxOffset {
  55. referenceServer = addr
  56. }
  57. }
  58. same := true
  59. fmt.Println("reference", referenceServer)
  60. fmt.Println("fileId volumeServer message")
  61. for nid, n := range allFiles[referenceServer] {
  62. for addr, files := range allFiles {
  63. if addr == referenceServer {
  64. continue
  65. }
  66. var diffMsg string
  67. n2, ok := files[nid]
  68. if !ok {
  69. if n.state == stateDeleted {
  70. continue
  71. }
  72. diffMsg = "missing"
  73. } else if n2.state != n.state {
  74. switch n.state {
  75. case stateDeleted:
  76. diffMsg = "notDeleted"
  77. case statePresent:
  78. diffMsg = "deleted"
  79. }
  80. } else if n2.size != n.size {
  81. diffMsg = "wrongSize"
  82. } else {
  83. continue
  84. }
  85. same = false
  86. // fetch the needle details
  87. var id string
  88. var err error
  89. if n.state == statePresent {
  90. id, err = getNeedleFileId(vid, nid, referenceServer)
  91. } else {
  92. id, err = getNeedleFileId(vid, nid, addr)
  93. }
  94. if err != nil {
  95. glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
  96. }
  97. fmt.Println(id, addr, diffMsg)
  98. }
  99. }
  100. if !same {
  101. os.Exit(1)
  102. }
  103. }
  104. const (
  105. stateDeleted uint8 = 1
  106. statePresent uint8 = 2
  107. )
  108. type needleState struct {
  109. state uint8
  110. size types.Size
  111. }
  112. func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
  113. var idxFile *bytes.Reader
  114. err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  115. copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  116. VolumeId: v,
  117. Ext: ".idx",
  118. CompactionRevision: math.MaxUint32,
  119. StopOffset: math.MaxInt64,
  120. Collection: *volumeCollection,
  121. })
  122. if err != nil {
  123. return err
  124. }
  125. var buf bytes.Buffer
  126. for {
  127. resp, err := copyFileClient.Recv()
  128. if errors.Is(err, io.EOF) {
  129. break
  130. }
  131. if err != nil {
  132. return err
  133. }
  134. buf.Write(resp.FileContent)
  135. }
  136. idxFile = bytes.NewReader(buf.Bytes())
  137. return nil
  138. })
  139. if err != nil {
  140. return nil, 0, err
  141. }
  142. var maxOffset int64
  143. files := map[types.NeedleId]needleState{}
  144. err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
  145. if offset.IsZero() || size < 0 || size == types.TombstoneFileSize {
  146. files[key] = needleState{
  147. state: stateDeleted,
  148. size: size,
  149. }
  150. } else {
  151. files[key] = needleState{
  152. state: statePresent,
  153. size: size,
  154. }
  155. }
  156. if actual := offset.ToAcutalOffset(); actual > maxOffset {
  157. maxOffset = actual
  158. }
  159. return nil
  160. })
  161. if err != nil {
  162. return nil, 0, err
  163. }
  164. return files, maxOffset, nil
  165. }
  166. func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) {
  167. var id string
  168. err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  169. resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  170. VolumeId: v,
  171. NeedleId: uint64(nid),
  172. })
  173. if err != nil {
  174. return err
  175. }
  176. id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
  177. return nil
  178. })
  179. return id, err
  180. }