80 lines
2.5 KiB

2 years ago
2 years ago
  1. package s3api
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
  11. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  12. message := resp.EventNotification
  13. if message.NewEntry == nil {
  14. return nil
  15. }
  16. dir := resp.Directory
  17. if message.NewParentPath != "" {
  18. dir = message.NewParentPath
  19. }
  20. fileName := message.NewEntry.Name
  21. content := message.NewEntry.Content
  22. _ = s3a.onIamConfigUpdate(dir, fileName, content)
  23. _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
  24. _ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
  25. return nil
  26. }
  27. var clientEpoch int32
  28. util.RetryForever("followIamChanges", func() error {
  29. clientEpoch++
  30. return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, directoriesToWatch, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
  31. }, func(err error) bool {
  32. glog.V(0).Infof("iam follow metadata changes: %v", err)
  33. return true
  34. })
  35. }
  36. // reload iam config
  37. func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
  38. if dir == filer.IamConfigDirectory && filename == filer.IamIdentityFile {
  39. if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
  40. return err
  41. }
  42. glog.V(0).Infof("updated %s/%s", dir, filename)
  43. }
  44. return nil
  45. }
  46. // reload circuit breaker config
  47. func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
  48. if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
  49. if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
  50. return err
  51. }
  52. glog.V(0).Infof("updated %s/%s", dir, filename)
  53. }
  54. return nil
  55. }
  56. //reload bucket metadata
  57. func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
  58. if dir == s3a.option.BucketsPath {
  59. if newEntry != nil {
  60. s3a.bucketRegistry.LoadBucketMetadata(newEntry)
  61. glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry)
  62. } else {
  63. s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
  64. glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry)
  65. }
  66. }
  67. return nil
  68. }