You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
2.7 KiB

3 years ago
3 years ago
  1. package pb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "google.golang.org/grpc"
  8. "io"
  9. "time"
  10. )
  11. type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  12. func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string,
  13. pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
  14. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  15. err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
  16. pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError))
  17. if err != nil {
  18. return fmt.Errorf("subscribing filer meta change: %v", err)
  19. }
  20. return err
  21. }
  22. func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
  23. clientName string, pathPrefix string, lastTsNs *int64, selfSignature int32,
  24. processEventFn ProcessMetadataFunc, fatalOnError bool) error {
  25. err := filerClient.WithFilerClient(makeFunc(clientName,
  26. pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError))
  27. if err != nil {
  28. return fmt.Errorf("subscribing filer meta change: %v", err)
  29. }
  30. return nil
  31. }
  32. func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, selfSignature int32,
  33. processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
  34. return func(client filer_pb.SeaweedFilerClient) error {
  35. ctx, cancel := context.WithCancel(context.Background())
  36. defer cancel()
  37. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  38. ClientName: clientName,
  39. PathPrefix: pathPrefix,
  40. PathPrefixes: additionalPathPrefixes,
  41. SinceNs: *lastTsNs,
  42. Signature: selfSignature,
  43. })
  44. if err != nil {
  45. return fmt.Errorf("subscribe: %v", err)
  46. }
  47. for {
  48. resp, listenErr := stream.Recv()
  49. if listenErr == io.EOF {
  50. return nil
  51. }
  52. if listenErr != nil {
  53. return listenErr
  54. }
  55. if err := processEventFn(resp); err != nil {
  56. if fatalOnError {
  57. glog.Fatalf("process %v: %v", resp, err)
  58. } else {
  59. glog.Errorf("process %v: %v", resp, err)
  60. }
  61. }
  62. *lastTsNs = resp.TsNs
  63. }
  64. }
  65. }
  66. func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
  67. var counter int64
  68. var lastWriteTime time.Time
  69. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  70. if err := processEventFn(resp); err != nil {
  71. return err
  72. }
  73. counter++
  74. if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
  75. counter = 0
  76. lastWriteTime = time.Now()
  77. if err := offsetFunc(counter, resp.TsNs); err != nil {
  78. return err
  79. }
  80. }
  81. return nil
  82. }
  83. }