You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.7 KiB

  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "google.golang.org/grpc"
  8. "io"
  9. "time"
  10. )
  11. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  12. func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, clientName string,
  13. pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
  14. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  15. err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
  16. pathPrefix, additionalPathPrefixes, lastTsNs, selfSignature, processEventFn, fatalOnError))
  17. if err != nil {
  18. return fmt.Errorf("subscribing filer meta change: %v", err)
  19. }
  20. return err
  21. }
  22. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
  23. clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
  24. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  25. err := filerClient.WithFilerClient(makeFunc(clientName, pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError))
  26. if err != nil {
  27. return fmt.Errorf("subscribing filer meta change: %v", err)
  28. }
  29. return nil
  30. }
  31. func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
  32. return func(client filer_pb.SeaweedFilerClient) error {
  33. ctx, cancel := context.WithCancel(context.Background())
  34. defer cancel()
  35. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  36. ClientName: clientName,
  37. PathPrefix: pathPrefix,
  38. PathPrefixes: additionalPathPrefixes,
  39. SinceNs: lastTsNs,
  40. Signature: selfSignature,
  41. })
  42. if err != nil {
  43. return fmt.Errorf("subscribe: %v", err)
  44. }
  45. for {
  46. resp, listenErr := stream.Recv()
  47. if listenErr == io.EOF {
  48. return nil
  49. }
  50. if listenErr != nil {
  51. return listenErr
  52. }
  53. if err := processEventFn(resp); err != nil {
  54. if fatalOnError {
  55. glog.Fatalf("process %v: %v", resp, err)
  56. } else {
  57. glog.Errorf("process %v: %v", resp, err)
  58. }
  59. }
  60. lastTsNs = resp.TsNs
  61. }
  62. }
  63. }
  64. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  65. var counter int64
  66. var lastWriteTime time.Time
  67. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  68. if err := processEventFn(resp); err != nil {
  69. return err
  70. }
  71. counter++
  72. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  73. counter = 0
  74. lastWriteTime = time.Now()
  75. if err := offsetFunc(counter, resp.TsNs); err != nil {
  76. return err
  77. }
  78. }
  79. return nil
  80. }
  81. }