You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

60 lines
1.6 KiB

7 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
  11. WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
  12. resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
  13. VolumdId: vid,
  14. })
  15. return nil
  16. })
  17. return
  18. }
  19. func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
  20. return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
  21. stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
  22. VolumdId: vid,
  23. })
  24. if err != nil {
  25. return err
  26. }
  27. var indexFileContent []byte
  28. for {
  29. resp, err := stream.Recv()
  30. if err == io.EOF {
  31. break
  32. }
  33. if err != nil {
  34. return fmt.Errorf("read index entries: %v", err)
  35. }
  36. indexFileContent = append(indexFileContent, resp.IndexFileContent...)
  37. }
  38. dataSize := len(indexFileContent)
  39. for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
  40. line := indexFileContent[idx : idx+NeedleEntrySize]
  41. key := BytesToNeedleId(line[:NeedleIdSize])
  42. offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
  43. size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])
  44. eachEntryFn(key, offset, size)
  45. }
  46. return nil
  47. })
  48. }