You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
2.0 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "google.golang.org/grpc"
  9. )
  10. func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
  11. // find volume location, replication, ttl info
  12. lookup, err := Lookup(master, vid.String())
  13. if err != nil {
  14. return fmt.Errorf("look up volume %d: %v", vid, err)
  15. }
  16. if len(lookup.Locations) == 0 {
  17. return fmt.Errorf("unable to locate volume %d", vid)
  18. }
  19. volumeServer := lookup.Locations[0].Url
  20. return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
  21. }
  22. func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
  23. return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  24. stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
  25. VolumeId: uint32(vid),
  26. SinceNs: sinceNs,
  27. IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
  28. })
  29. if err != nil {
  30. return err
  31. }
  32. for {
  33. resp, recvErr := stream.Recv()
  34. if recvErr != nil {
  35. if recvErr == io.EOF {
  36. break
  37. } else {
  38. return recvErr
  39. }
  40. }
  41. needleHeader := resp.NeedleHeader
  42. needleBody := resp.NeedleBody
  43. if len(needleHeader) == 0 {
  44. continue
  45. }
  46. for !resp.IsLastChunk {
  47. resp, recvErr = stream.Recv()
  48. if recvErr != nil {
  49. if recvErr == io.EOF {
  50. break
  51. } else {
  52. return recvErr
  53. }
  54. }
  55. needleBody = append(needleBody, resp.NeedleBody...)
  56. }
  57. n := new(needle.Needle)
  58. n.ParseNeedleHeader(needleHeader)
  59. n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
  60. err = fn(n)
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. return nil
  66. })
  67. }