You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

116 lines
3.6 KiB

3 years ago
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "io"
  10. "time"
  11. )
  12. type EventErrorType int
  13. const (
  14. TrivialOnError EventErrorType = iota
  15. FatalOnError
  16. RetryForeverOnError
  17. )
  18. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  19. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, clientEpoch int32,
  20. pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32,
  21. processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
  22. err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(
  23. clientName, clientId, clientEpoch,
  24. pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
  25. if err != nil {
  26. return fmt.Errorf("subscribing filer meta change: %v", err)
  27. }
  28. return err
  29. }
  30. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
  31. clientName string, clientId int32, clientEpoch int32, pathPrefix string, lastTsNs *int64, untilTsNs int64, selfSignature int32,
  32. processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
  33. err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
  34. if err != nil {
  35. return fmt.Errorf("subscribing filer meta change: %v", err)
  36. }
  37. return nil
  38. }
  39. func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error {
  40. return func(client filer_pb.SeaweedFilerClient) error {
  41. ctx, cancel := context.WithCancel(context.Background())
  42. defer cancel()
  43. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  44. ClientName: clientName,
  45. PathPrefix: pathPrefix,
  46. PathPrefixes: additionalPathPrefixes,
  47. SinceNs: *lastTsNs,
  48. Signature: selfSignature,
  49. ClientId: clientId,
  50. ClientEpoch: clientEpoch,
  51. UntilNs: untilTsNs,
  52. })
  53. if err != nil {
  54. return fmt.Errorf("subscribe: %v", err)
  55. }
  56. for {
  57. resp, listenErr := stream.Recv()
  58. if listenErr == io.EOF {
  59. return nil
  60. }
  61. if listenErr != nil {
  62. return listenErr
  63. }
  64. if err := processEventFn(resp); err != nil {
  65. switch eventErrorType {
  66. case TrivialOnError:
  67. glog.Errorf("process %v: %v", resp, err)
  68. case FatalOnError:
  69. glog.Fatalf("process %v: %v", resp, err)
  70. case RetryForeverOnError:
  71. util.RetryForever("followMetaUpdates", func() error {
  72. return processEventFn(resp)
  73. }, func(err error) bool {
  74. glog.Errorf("process %v: %v", resp, err)
  75. return true
  76. })
  77. default:
  78. glog.Errorf("process %v: %v", resp, err)
  79. }
  80. }
  81. *lastTsNs = resp.TsNs
  82. }
  83. }
  84. }
  85. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  86. var counter int64
  87. var lastWriteTime time.Time
  88. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  89. if err := processEventFn(resp); err != nil {
  90. return err
  91. }
  92. counter++
  93. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  94. lastWriteTime = time.Now()
  95. if err := offsetFunc(counter, resp.TsNs); err != nil {
  96. return err
  97. }
  98. counter = 0
  99. }
  100. return nil
  101. }
  102. }