You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

65 lines
1.8 KiB

7 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "io"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
  13. WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  14. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
  15. defer cancel()
  16. resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{
  17. VolumdId: vid,
  18. })
  19. return nil
  20. })
  21. return
  22. }
  23. func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
  24. return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  25. stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
  26. VolumdId: vid,
  27. })
  28. if err != nil {
  29. return err
  30. }
  31. var indexFileContent []byte
  32. for {
  33. resp, err := stream.Recv()
  34. if err == io.EOF {
  35. break
  36. }
  37. if err != nil {
  38. return fmt.Errorf("read index entries: %v", err)
  39. }
  40. indexFileContent = append(indexFileContent, resp.IndexFileContent...)
  41. }
  42. dataSize := len(indexFileContent)
  43. for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
  44. line := indexFileContent[idx : idx+NeedleEntrySize]
  45. key := BytesToNeedleId(line[:NeedleIdSize])
  46. offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
  47. size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])
  48. eachEntryFn(key, offset, size)
  49. }
  50. return nil
  51. })
  52. }