You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
1.7 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "google.golang.org/grpc"
  9. )
  10. func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
  11. // find volume location, replication, ttl info
  12. lookup, err := Lookup(master, vid.String())
  13. if err != nil {
  14. return fmt.Errorf("look up volume %d: %v", vid, err)
  15. }
  16. if len(lookup.Locations) == 0 {
  17. return fmt.Errorf("unable to locate volume %d", vid)
  18. }
  19. volumeServer := lookup.Locations[0].Url
  20. return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  21. stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
  22. VolumeId: uint32(vid),
  23. SinceNs: sinceNs,
  24. DrainingSeconds: uint32(timeoutSeconds),
  25. })
  26. if err != nil {
  27. return err
  28. }
  29. for {
  30. resp, recvErr := stream.Recv()
  31. if recvErr != nil {
  32. if recvErr == io.EOF {
  33. break
  34. } else {
  35. return recvErr
  36. }
  37. }
  38. needleHeader := resp.NeedleHeader
  39. needleBody := resp.NeedleBody
  40. if len(needleHeader) == 0 {
  41. continue
  42. }
  43. for !resp.IsLastChunk {
  44. resp, recvErr = stream.Recv()
  45. if recvErr != nil {
  46. if recvErr == io.EOF {
  47. break
  48. } else {
  49. return recvErr
  50. }
  51. }
  52. needleBody = append(needleBody, resp.NeedleBody...)
  53. }
  54. n := new(needle.Needle)
  55. n.ParseNeedleHeader(needleHeader)
  56. n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
  57. err = fn(n)
  58. if err != nil {
  59. return err
  60. }
  61. }
  62. return nil
  63. })
  64. }