You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

69 lines
1.5 KiB

  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "io"
  9. "time"
  10. )
  11. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
  12. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  13. message := resp.EventNotification
  14. if message.NewEntry == nil {
  15. return nil
  16. }
  17. dir := resp.Directory
  18. if message.NewParentPath != "" {
  19. dir = message.NewParentPath
  20. }
  21. if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
  22. if err := s3a.iam.loadS3ApiConfigurationFromFiler(s3a.option); err != nil {
  23. return err
  24. }
  25. }
  26. return nil
  27. }
  28. for {
  29. err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  30. ctx, cancel := context.WithCancel(context.Background())
  31. defer cancel()
  32. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  33. ClientName: clientName,
  34. PathPrefix: prefix,
  35. SinceNs: lastTsNs,
  36. })
  37. if err != nil {
  38. return fmt.Errorf("subscribe: %v", err)
  39. }
  40. for {
  41. resp, listenErr := stream.Recv()
  42. if listenErr == io.EOF {
  43. return nil
  44. }
  45. if listenErr != nil {
  46. return listenErr
  47. }
  48. if err := processEventFn(resp); err != nil {
  49. glog.Fatalf("process %v: %v", resp, err)
  50. }
  51. lastTsNs = resp.TsNs
  52. }
  53. })
  54. if err != nil {
  55. glog.Errorf("subscribing filer meta change: %v", err)
  56. }
  57. time.Sleep(time.Second)
  58. }
  59. }