You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

417 lines
15 KiB

2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "google.golang.org/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "path/filepath"
  16. "strings"
  17. "time"
  18. )
  19. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  20. // read filer remote storage mount mappings
  21. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  22. return fmt.Errorf("read mount info: %v", detectErr)
  23. }
  24. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  25. if err != nil {
  26. return err
  27. }
  28. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  29. lastTime := time.Unix(0, lastTsNs)
  30. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  31. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
  32. })
  33. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  34. option.clientEpoch++
  35. metadataFollowOption := &pb.MetadataFollowOption{
  36. ClientName: "filer.remote.sync",
  37. ClientId: option.clientId,
  38. ClientEpoch: option.clientEpoch,
  39. SelfSignature: 0,
  40. PathPrefix: option.bucketsDir,
  41. AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
  42. DirectoriesToWatch: nil,
  43. StartTsNs: lastOffsetTs.UnixNano(),
  44. StopTsNs: 0,
  45. EventErrorType: pb.TrivialOnError,
  46. }
  47. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  48. }
  49. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  50. handleCreateBucket := func(entry *filer_pb.Entry) error {
  51. if !entry.IsDirectory {
  52. return nil
  53. }
  54. if entry.RemoteEntry != nil {
  55. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  56. return nil
  57. }
  58. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  59. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  60. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  61. }
  62. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  63. for k := range option.mappings.Mappings {
  64. *option.createBucketAt = k
  65. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  66. }
  67. }
  68. if *option.createBucketAt == "" {
  69. return nil
  70. }
  71. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  72. if !found {
  73. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  74. }
  75. client, err := remote_storage.GetRemoteStorage(remoteConf)
  76. if err != nil {
  77. return err
  78. }
  79. bucketName := strings.ToLower(entry.Name)
  80. if *option.include != "" {
  81. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  82. return nil
  83. }
  84. }
  85. if *option.exclude != "" {
  86. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  87. return nil
  88. }
  89. }
  90. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  91. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  92. if !found {
  93. if *option.createBucketRandomSuffix {
  94. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  95. if len(bucketName)+5 > 63 {
  96. bucketName = bucketName[:58]
  97. }
  98. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  99. }
  100. remoteLocation = &remote_pb.RemoteStorageLocation{
  101. Name: *option.createBucketAt,
  102. Bucket: bucketName,
  103. Path: "/",
  104. }
  105. // need to add new mapping here before getting updates from metadata tailing
  106. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  107. } else {
  108. bucketName = remoteLocation.Bucket
  109. }
  110. glog.V(0).Infof("create bucket %s", bucketName)
  111. if err := client.CreateBucket(bucketName); err != nil {
  112. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  113. }
  114. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  115. }
  116. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  117. if !entry.IsDirectory {
  118. return nil
  119. }
  120. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  121. if err != nil {
  122. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  123. }
  124. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  125. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  126. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  127. }
  128. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  129. return filer.DeleteMountMapping(option, string(bucketPath))
  130. }
  131. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  132. message := resp.EventNotification
  133. if message.NewEntry != nil {
  134. // update
  135. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  136. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  137. if readErr != nil {
  138. return fmt.Errorf("unmarshal mappings: %v", readErr)
  139. }
  140. option.mappings = newMappings
  141. }
  142. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  143. conf := &remote_pb.RemoteConf{}
  144. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  145. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  146. }
  147. option.remoteConfs[conf.Name] = conf
  148. }
  149. } else if message.OldEntry != nil {
  150. // deletion
  151. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  152. conf := &remote_pb.RemoteConf{}
  153. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  154. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  155. }
  156. delete(option.remoteConfs, conf.Name)
  157. }
  158. }
  159. return nil
  160. }
  161. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  162. message := resp.EventNotification
  163. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  164. return handleEtcRemoteChanges(resp)
  165. }
  166. if filer_pb.IsEmpty(resp) {
  167. return nil
  168. }
  169. if filer_pb.IsCreate(resp) {
  170. if message.NewParentPath == option.bucketsDir {
  171. return handleCreateBucket(message.NewEntry)
  172. }
  173. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  174. return nil
  175. }
  176. if !filer.HasData(message.NewEntry) {
  177. return nil
  178. }
  179. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  180. if !ok {
  181. return nil
  182. }
  183. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  184. if err != nil {
  185. return err
  186. }
  187. glog.V(2).Infof("create: %+v", resp)
  188. if !shouldSendToRemote(message.NewEntry) {
  189. glog.V(2).Infof("skipping creating: %+v", resp)
  190. return nil
  191. }
  192. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  193. if message.NewEntry.IsDirectory {
  194. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  195. return client.WriteDirectory(dest, message.NewEntry)
  196. }
  197. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  198. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  199. if writeErr != nil {
  200. return writeErr
  201. }
  202. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  203. }
  204. if filer_pb.IsDelete(resp) {
  205. if resp.Directory == option.bucketsDir {
  206. return handleDeleteBucket(message.OldEntry)
  207. }
  208. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  209. if !ok {
  210. return nil
  211. }
  212. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  213. if err != nil {
  214. return err
  215. }
  216. glog.V(2).Infof("delete: %+v", resp)
  217. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  218. if message.OldEntry.IsDirectory {
  219. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  220. return client.RemoveDirectory(dest)
  221. }
  222. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  223. return client.DeleteFile(dest)
  224. }
  225. if message.OldEntry != nil && message.NewEntry != nil {
  226. if resp.Directory == option.bucketsDir {
  227. if message.NewParentPath == option.bucketsDir {
  228. if message.OldEntry.Name == message.NewEntry.Name {
  229. return nil
  230. }
  231. if err := handleCreateBucket(message.NewEntry); err != nil {
  232. return err
  233. }
  234. if err := handleDeleteBucket(message.OldEntry); err != nil {
  235. return err
  236. }
  237. }
  238. }
  239. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  240. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  241. if oldOk && newOk {
  242. if !shouldSendToRemote(message.NewEntry) {
  243. glog.V(2).Infof("skipping updating: %+v", resp)
  244. return nil
  245. }
  246. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  247. if err != nil {
  248. return err
  249. }
  250. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  251. // update the same entry
  252. if message.NewEntry.IsDirectory {
  253. // update directory property
  254. return nil
  255. }
  256. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  257. glog.V(2).Infof("update meta: %+v", resp)
  258. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  259. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  260. } else {
  261. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  262. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  263. if writeErr != nil {
  264. return writeErr
  265. }
  266. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  267. }
  268. }
  269. }
  270. // the following is entry rename
  271. if oldOk {
  272. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  273. if err != nil {
  274. return err
  275. }
  276. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  277. if message.OldEntry.IsDirectory {
  278. return client.RemoveDirectory(oldDest)
  279. }
  280. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  281. if err := client.DeleteFile(oldDest); err != nil {
  282. return err
  283. }
  284. }
  285. if newOk {
  286. if !shouldSendToRemote(message.NewEntry) {
  287. glog.V(2).Infof("skipping updating: %+v", resp)
  288. return nil
  289. }
  290. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  291. if err != nil {
  292. return err
  293. }
  294. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  295. if message.NewEntry.IsDirectory {
  296. return client.WriteDirectory(newDest, message.NewEntry)
  297. }
  298. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  299. if writeErr != nil {
  300. return writeErr
  301. }
  302. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  303. }
  304. }
  305. return nil
  306. }
  307. return eachEntryFunc, nil
  308. }
  309. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  310. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  311. var isMounted bool
  312. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  313. if !isMounted {
  314. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  315. }
  316. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  317. if !hasClient {
  318. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  319. }
  320. client, err = remote_storage.GetRemoteStorage(remoteConf)
  321. if err != nil {
  322. return nil, remoteStorageMountLocation, err
  323. }
  324. return client, remoteStorageMountLocation, nil
  325. }
  326. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  327. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  328. if !ok {
  329. return "", nil, nil, false
  330. }
  331. var isMounted bool
  332. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  333. if !isMounted {
  334. glog.Warningf("%s is not mounted", bucket)
  335. return "", nil, nil, false
  336. }
  337. var hasClient bool
  338. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  339. if !hasClient {
  340. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  341. return "", nil, nil, false
  342. }
  343. return bucket, remoteStorageMountLocation, remoteConf, true
  344. }
  345. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  346. if !strings.HasPrefix(dir, bucketsDir+"/") {
  347. return "", false
  348. }
  349. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  350. return util.FullPath(bucketsDir).Child(parts[0]), true
  351. }
  352. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  353. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  354. if err == filer_pb.ErrNotFound {
  355. return fmt.Errorf("remote storage is not configured in filer server")
  356. }
  357. return err
  358. } else {
  359. option.mappings = mappings
  360. }
  361. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  362. var lastConfName string
  363. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  364. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  365. return nil
  366. }
  367. conf := &remote_pb.RemoteConf{}
  368. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  369. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  370. }
  371. option.remoteConfs[conf.Name] = conf
  372. lastConfName = conf.Name
  373. return nil
  374. }, "", false, math.MaxUint32)
  375. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  376. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  377. option.mappings.PrimaryBucketStorageName = lastConfName
  378. }
  379. return
  380. }