164 lines
4.7 KiB

4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package remote_storage
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  5. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  6. "github.com/golang/protobuf/proto"
  7. "io"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. func ParseLocationName(remote string) (locationName string) {
  14. if strings.HasSuffix(string(remote), "/") {
  15. remote = remote[:len(remote)-1]
  16. }
  17. parts := strings.SplitN(string(remote), "/", 2)
  18. if len(parts) >= 1 {
  19. return parts[0]
  20. }
  21. return
  22. }
  23. func parseBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
  24. loc = &remote_pb.RemoteStorageLocation{}
  25. if strings.HasSuffix(string(remote), "/") {
  26. remote = remote[:len(remote)-1]
  27. }
  28. parts := strings.SplitN(string(remote), "/", 3)
  29. if len(parts) >= 1 {
  30. loc.Name = parts[0]
  31. }
  32. if len(parts) >= 2 {
  33. loc.Bucket = parts[1]
  34. }
  35. loc.Path = string(remote[len(loc.Name)+1+len(loc.Bucket):])
  36. if loc.Path == "" {
  37. loc.Path = "/"
  38. }
  39. return
  40. }
  41. func parseNoBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
  42. loc = &remote_pb.RemoteStorageLocation{}
  43. if strings.HasSuffix(string(remote), "/") {
  44. remote = remote[:len(remote)-1]
  45. }
  46. parts := strings.SplitN(string(remote), "/", 2)
  47. if len(parts) >= 1 {
  48. loc.Name = parts[0]
  49. }
  50. loc.Path = string(remote[len(loc.Name):])
  51. if loc.Path == "" {
  52. loc.Path = "/"
  53. }
  54. return
  55. }
  56. func FormatLocation(loc *remote_pb.RemoteStorageLocation) string {
  57. if loc.Bucket == "" {
  58. return fmt.Sprintf("%s%s", loc.Name, loc.Path)
  59. }
  60. return fmt.Sprintf("%s/%s%s", loc.Name, loc.Bucket, loc.Path)
  61. }
  62. type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error
  63. type Bucket struct {
  64. Name string
  65. CreatedAt time.Time
  66. }
  67. type RemoteStorageClient interface {
  68. Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
  69. ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
  70. WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
  71. RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error)
  72. WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error)
  73. UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error)
  74. DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error)
  75. ListBuckets() ([]*Bucket, error)
  76. CreateBucket(name string) (err error)
  77. DeleteBucket(name string) (err error)
  78. }
  79. type RemoteStorageClientMaker interface {
  80. Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error)
  81. HasBucket() bool
  82. }
  83. type CachedRemoteStorageClient struct {
  84. *remote_pb.RemoteConf
  85. RemoteStorageClient
  86. }
  87. var (
  88. RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker)
  89. remoteStorageClients = make(map[string]CachedRemoteStorageClient)
  90. remoteStorageClientsLock sync.Mutex
  91. )
  92. func GetAllRemoteStorageNames() string {
  93. var storageNames []string
  94. for k := range RemoteStorageClientMakers {
  95. storageNames = append(storageNames, k)
  96. }
  97. sort.Strings(storageNames)
  98. return strings.Join(storageNames, "|")
  99. }
  100. func GetRemoteStorageNamesHasBucket() string {
  101. var storageNames []string
  102. for k, m := range RemoteStorageClientMakers {
  103. if m.HasBucket() {
  104. storageNames = append(storageNames, k)
  105. }
  106. }
  107. sort.Strings(storageNames)
  108. return strings.Join(storageNames, "|")
  109. }
  110. func ParseRemoteLocation(remoteConfType string, remote string) (remoteStorageLocation *remote_pb.RemoteStorageLocation, err error) {
  111. maker, found := RemoteStorageClientMakers[remoteConfType]
  112. if !found {
  113. return nil, fmt.Errorf("remote storage type %s not found", remoteConfType)
  114. }
  115. if !maker.HasBucket() {
  116. return parseNoBucketLocation(remote), nil
  117. }
  118. return parseBucketLocation(remote), nil
  119. }
  120. func makeRemoteStorageClient(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) {
  121. maker, found := RemoteStorageClientMakers[remoteConf.Type]
  122. if !found {
  123. return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type)
  124. }
  125. return maker.Make(remoteConf)
  126. }
  127. func GetRemoteStorage(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) {
  128. remoteStorageClientsLock.Lock()
  129. defer remoteStorageClientsLock.Unlock()
  130. existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name]
  131. if found && proto.Equal(existingRemoteStorageClient.RemoteConf, remoteConf) {
  132. return existingRemoteStorageClient.RemoteStorageClient, nil
  133. }
  134. newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf)
  135. if err != nil {
  136. return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err)
  137. }
  138. remoteStorageClients[remoteConf.Name] = CachedRemoteStorageClient{
  139. RemoteConf: remoteConf,
  140. RemoteStorageClient: newRemoteStorageClient,
  141. }
  142. return newRemoteStorageClient, nil
  143. }