91 lines
2.8 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package s3api
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
  11. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  12. message := resp.EventNotification
  13. if message.NewEntry == nil {
  14. return nil
  15. }
  16. dir := resp.Directory
  17. if message.NewParentPath != "" {
  18. dir = message.NewParentPath
  19. }
  20. fileName := message.NewEntry.Name
  21. content := message.NewEntry.Content
  22. _ = s3a.onIamConfigUpdate(dir, fileName, content)
  23. _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
  24. _ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
  25. return nil
  26. }
  27. metadataFollowOption := &pb.MetadataFollowOption{
  28. ClientName: clientName,
  29. ClientId: s3a.randomClientId,
  30. ClientEpoch: 1,
  31. SelfSignature: 0,
  32. PathPrefix: prefix,
  33. AdditionalPathPrefixes: nil,
  34. DirectoriesToWatch: directoriesToWatch,
  35. StartTsNs: lastTsNs,
  36. StopTsNs: 0,
  37. EventErrorType: pb.FatalOnError,
  38. }
  39. util.RetryUntil("followIamChanges", func() error {
  40. metadataFollowOption.ClientEpoch++
  41. return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
  42. }, func(err error) bool {
  43. glog.V(0).Infof("iam follow metadata changes: %v", err)
  44. return true
  45. })
  46. }
  47. // reload iam config
  48. func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
  49. if dir == filer.IamConfigDirectory && filename == filer.IamIdentityFile {
  50. if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
  51. return err
  52. }
  53. glog.V(0).Infof("updated %s/%s", dir, filename)
  54. }
  55. return nil
  56. }
  57. // reload circuit breaker config
  58. func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
  59. if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
  60. if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
  61. return err
  62. }
  63. glog.V(0).Infof("updated %s/%s", dir, filename)
  64. }
  65. return nil
  66. }
  67. // reload bucket metadata
  68. func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
  69. if dir == s3a.option.BucketsPath {
  70. if newEntry != nil {
  71. s3a.bucketRegistry.LoadBucketMetadata(newEntry)
  72. glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry)
  73. } else {
  74. s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
  75. glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry)
  76. }
  77. }
  78. return nil
  79. }