You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "google.golang.org/protobuf/proto"
  11. "math"
  12. "strings"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/viant/ptrie"
  16. )
  17. const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
  18. const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
  19. type FilerRemoteStorage struct {
  20. rules ptrie.Trie
  21. storageNameToConf map[string]*remote_pb.RemoteConf
  22. }
  23. func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
  24. rs = &FilerRemoteStorage{
  25. rules: ptrie.New(),
  26. storageNameToConf: make(map[string]*remote_pb.RemoteConf),
  27. }
  28. return rs
  29. }
  30. func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
  31. // execute this on filer
  32. limit := int64(math.MaxInt32)
  33. entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, limit, "", "", "")
  34. if err != nil {
  35. if err == filer_pb.ErrNotFound {
  36. return nil
  37. }
  38. glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
  39. return
  40. }
  41. for _, entry := range entries {
  42. if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
  43. if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
  44. return err
  45. }
  46. continue
  47. }
  48. if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
  49. return nil
  50. }
  51. conf := &remote_pb.RemoteConf{}
  52. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  53. return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
  54. }
  55. rs.storageNameToConf[conf.Name] = conf
  56. }
  57. return nil
  58. }
  59. func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
  60. mappings := &remote_pb.RemoteStorageMapping{}
  61. if err := proto.Unmarshal(data, mappings); err != nil {
  62. return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
  63. }
  64. for dir, storageLocation := range mappings.Mappings {
  65. rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
  66. }
  67. return nil
  68. }
  69. func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) {
  70. rs.rules.Put([]byte(dir+"/"), loc)
  71. }
  72. func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) {
  73. rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
  74. mountDir = util.FullPath(string(key[:len(key)-1]))
  75. remoteLocation = value.(*remote_pb.RemoteStorageLocation)
  76. return true
  77. })
  78. return
  79. }
  80. func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
  81. var storageLocation *remote_pb.RemoteStorageLocation
  82. rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
  83. storageLocation = value.(*remote_pb.RemoteStorageLocation)
  84. return true
  85. })
  86. if storageLocation == nil {
  87. found = false
  88. return
  89. }
  90. return rs.GetRemoteStorageClient(storageLocation.Name)
  91. }
  92. func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
  93. remoteConf, found = rs.storageNameToConf[storageName]
  94. if !found {
  95. return
  96. }
  97. var err error
  98. if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
  99. found = true
  100. return
  101. }
  102. return
  103. }
  104. func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) {
  105. mappings = &remote_pb.RemoteStorageMapping{
  106. Mappings: make(map[string]*remote_pb.RemoteStorageLocation),
  107. }
  108. if len(oldContent) > 0 {
  109. if err = proto.Unmarshal(oldContent, mappings); err != nil {
  110. glog.Warningf("unmarshal existing mappings: %v", err)
  111. }
  112. }
  113. return
  114. }
  115. func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
  116. var oldContent []byte
  117. if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  118. oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
  119. return readErr
  120. }); readErr != nil {
  121. return nil, readErr
  122. }
  123. // unmarshal storage configuration
  124. conf = &remote_pb.RemoteConf{}
  125. if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
  126. readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  127. return
  128. }
  129. return
  130. }
  131. func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
  132. mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
  133. if listErr != nil {
  134. return nil, "", nil, nil, listErr
  135. }
  136. if dir == "" {
  137. return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
  138. }
  139. var localMountedDir string
  140. var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
  141. for k, loc := range mappings.Mappings {
  142. if strings.HasPrefix(dir, k) {
  143. localMountedDir, remoteStorageMountedLocation = k, loc
  144. }
  145. }
  146. if localMountedDir == "" {
  147. return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
  148. }
  149. // find remote storage configuration
  150. remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name)
  151. if err != nil {
  152. return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
  153. }
  154. return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
  155. }