You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
15 KiB

2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "google.golang.org/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "path/filepath"
  16. "strings"
  17. "time"
  18. )
  19. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  20. // read filer remote storage mount mappings
  21. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  22. return fmt.Errorf("read mount info: %v", detectErr)
  23. }
  24. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  25. if err != nil {
  26. return err
  27. }
  28. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  29. processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
  30. var lastLogTsNs = time.Now().UnixNano()
  31. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  32. processor.AddSyncJob(resp)
  33. return nil
  34. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  35. offsetTsNs := processor.processedTsWatermark.Load()
  36. if offsetTsNs == 0 {
  37. return nil
  38. }
  39. now := time.Now().UnixNano()
  40. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  41. lastLogTsNs = now
  42. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
  43. })
  44. option.clientEpoch++
  45. metadataFollowOption := &pb.MetadataFollowOption{
  46. ClientName: "filer.remote.sync",
  47. ClientId: option.clientId,
  48. ClientEpoch: option.clientEpoch,
  49. SelfSignature: 0,
  50. PathPrefix: option.bucketsDir,
  51. AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
  52. DirectoriesToWatch: nil,
  53. StartTsNs: lastOffsetTs.UnixNano(),
  54. StopTsNs: 0,
  55. EventErrorType: pb.RetryForeverOnError,
  56. }
  57. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  58. }
  59. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  60. handleCreateBucket := func(entry *filer_pb.Entry) error {
  61. if !entry.IsDirectory {
  62. return nil
  63. }
  64. if entry.RemoteEntry != nil {
  65. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  66. return nil
  67. }
  68. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  69. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  70. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  71. }
  72. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  73. for k := range option.mappings.Mappings {
  74. *option.createBucketAt = k
  75. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  76. }
  77. }
  78. if *option.createBucketAt == "" {
  79. return nil
  80. }
  81. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  82. if !found {
  83. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  84. }
  85. client, err := remote_storage.GetRemoteStorage(remoteConf)
  86. if err != nil {
  87. return err
  88. }
  89. bucketName := strings.ToLower(entry.Name)
  90. if *option.include != "" {
  91. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  92. return nil
  93. }
  94. }
  95. if *option.exclude != "" {
  96. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  97. return nil
  98. }
  99. }
  100. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  101. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  102. if !found {
  103. if *option.createBucketRandomSuffix {
  104. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  105. if len(bucketName)+5 > 63 {
  106. bucketName = bucketName[:58]
  107. }
  108. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  109. }
  110. remoteLocation = &remote_pb.RemoteStorageLocation{
  111. Name: *option.createBucketAt,
  112. Bucket: bucketName,
  113. Path: "/",
  114. }
  115. // need to add new mapping here before getting updates from metadata tailing
  116. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  117. } else {
  118. bucketName = remoteLocation.Bucket
  119. }
  120. glog.V(0).Infof("create bucket %s", bucketName)
  121. if err := client.CreateBucket(bucketName); err != nil {
  122. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  123. }
  124. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  125. }
  126. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  127. if !entry.IsDirectory {
  128. return nil
  129. }
  130. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  131. if err != nil {
  132. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  133. }
  134. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  135. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  136. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  137. }
  138. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  139. return filer.DeleteMountMapping(option, string(bucketPath))
  140. }
  141. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  142. message := resp.EventNotification
  143. if message.NewEntry != nil {
  144. // update
  145. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  146. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  147. if readErr != nil {
  148. return fmt.Errorf("unmarshal mappings: %v", readErr)
  149. }
  150. option.mappings = newMappings
  151. }
  152. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  153. conf := &remote_pb.RemoteConf{}
  154. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  155. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  156. }
  157. option.remoteConfs[conf.Name] = conf
  158. }
  159. } else if message.OldEntry != nil {
  160. // deletion
  161. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  162. conf := &remote_pb.RemoteConf{}
  163. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  164. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  165. }
  166. delete(option.remoteConfs, conf.Name)
  167. }
  168. }
  169. return nil
  170. }
  171. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  172. message := resp.EventNotification
  173. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  174. return handleEtcRemoteChanges(resp)
  175. }
  176. if filer_pb.IsEmpty(resp) {
  177. return nil
  178. }
  179. if filer_pb.IsCreate(resp) {
  180. if message.NewParentPath == option.bucketsDir {
  181. return handleCreateBucket(message.NewEntry)
  182. }
  183. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  184. return nil
  185. }
  186. if !filer.HasData(message.NewEntry) {
  187. return nil
  188. }
  189. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  190. if !ok {
  191. return nil
  192. }
  193. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  194. if err != nil {
  195. return err
  196. }
  197. glog.V(2).Infof("create: %+v", resp)
  198. if !shouldSendToRemote(message.NewEntry) {
  199. glog.V(2).Infof("skipping creating: %+v", resp)
  200. return nil
  201. }
  202. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  203. if message.NewEntry.IsDirectory {
  204. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  205. return client.WriteDirectory(dest, message.NewEntry)
  206. }
  207. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  208. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  209. if writeErr != nil {
  210. return writeErr
  211. }
  212. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  213. }
  214. if filer_pb.IsDelete(resp) {
  215. if resp.Directory == option.bucketsDir {
  216. return handleDeleteBucket(message.OldEntry)
  217. }
  218. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  219. if !ok {
  220. return nil
  221. }
  222. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  223. if err != nil {
  224. return err
  225. }
  226. glog.V(2).Infof("delete: %+v", resp)
  227. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  228. if message.OldEntry.IsDirectory {
  229. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  230. return client.RemoveDirectory(dest)
  231. }
  232. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  233. return client.DeleteFile(dest)
  234. }
  235. if message.OldEntry != nil && message.NewEntry != nil {
  236. if resp.Directory == option.bucketsDir {
  237. if message.NewParentPath == option.bucketsDir {
  238. if message.OldEntry.Name == message.NewEntry.Name {
  239. return nil
  240. }
  241. if err := handleCreateBucket(message.NewEntry); err != nil {
  242. return err
  243. }
  244. if err := handleDeleteBucket(message.OldEntry); err != nil {
  245. return err
  246. }
  247. }
  248. }
  249. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  250. return nil
  251. }
  252. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  253. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  254. if oldOk && newOk {
  255. if !shouldSendToRemote(message.NewEntry) {
  256. glog.V(2).Infof("skipping updating: %+v", resp)
  257. return nil
  258. }
  259. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  260. if err != nil {
  261. return err
  262. }
  263. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  264. // update the same entry
  265. if message.NewEntry.IsDirectory {
  266. // update directory property
  267. return nil
  268. }
  269. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  270. glog.V(2).Infof("update meta: %+v", resp)
  271. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  272. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  273. } else {
  274. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  275. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  276. if writeErr != nil {
  277. return writeErr
  278. }
  279. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  280. }
  281. }
  282. }
  283. // the following is entry rename
  284. if oldOk {
  285. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  286. if err != nil {
  287. return err
  288. }
  289. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  290. if message.OldEntry.IsDirectory {
  291. return client.RemoveDirectory(oldDest)
  292. }
  293. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  294. if err := client.DeleteFile(oldDest); err != nil {
  295. return err
  296. }
  297. }
  298. if newOk {
  299. if !shouldSendToRemote(message.NewEntry) {
  300. glog.V(2).Infof("skipping updating: %+v", resp)
  301. return nil
  302. }
  303. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  304. if err != nil {
  305. return err
  306. }
  307. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  308. if message.NewEntry.IsDirectory {
  309. return client.WriteDirectory(newDest, message.NewEntry)
  310. }
  311. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  312. if writeErr != nil {
  313. return writeErr
  314. }
  315. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  316. }
  317. }
  318. return nil
  319. }
  320. return eachEntryFunc, nil
  321. }
  322. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  323. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  324. var isMounted bool
  325. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  326. if !isMounted {
  327. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  328. }
  329. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  330. if !hasClient {
  331. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  332. }
  333. client, err = remote_storage.GetRemoteStorage(remoteConf)
  334. if err != nil {
  335. return nil, remoteStorageMountLocation, err
  336. }
  337. return client, remoteStorageMountLocation, nil
  338. }
  339. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  340. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  341. if !ok {
  342. return "", nil, nil, false
  343. }
  344. var isMounted bool
  345. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  346. if !isMounted {
  347. glog.Warningf("%s is not mounted", bucket)
  348. return "", nil, nil, false
  349. }
  350. var hasClient bool
  351. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  352. if !hasClient {
  353. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  354. return "", nil, nil, false
  355. }
  356. return bucket, remoteStorageMountLocation, remoteConf, true
  357. }
  358. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  359. if !strings.HasPrefix(dir, bucketsDir+"/") {
  360. return "", false
  361. }
  362. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  363. return util.FullPath(bucketsDir).Child(parts[0]), true
  364. }
  365. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  366. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  367. if err == filer_pb.ErrNotFound {
  368. return fmt.Errorf("remote storage is not configured in filer server")
  369. }
  370. return err
  371. } else {
  372. option.mappings = mappings
  373. }
  374. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  375. var lastConfName string
  376. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  377. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  378. return nil
  379. }
  380. conf := &remote_pb.RemoteConf{}
  381. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  382. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  383. }
  384. option.remoteConfs[conf.Name] = conf
  385. lastConfName = conf.Name
  386. return nil
  387. }, "", false, math.MaxUint32)
  388. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  389. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  390. option.mappings.PrimaryBucketStorageName = lastConfName
  391. }
  392. return
  393. }