You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

427 lines
15 KiB

2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "google.golang.org/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "path/filepath"
  16. "strings"
  17. "time"
  18. )
  19. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  20. // read filer remote storage mount mappings
  21. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  22. return fmt.Errorf("read mount info: %v", detectErr)
  23. }
  24. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  25. if err != nil {
  26. return err
  27. }
  28. processor := NewMetadataProcessor(eachEntryFunc, 128)
  29. var lastLogTsNs = time.Now().UnixNano()
  30. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  31. processor.AddSyncJob(resp)
  32. return nil
  33. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  34. if processor.processedTsWatermark == 0 {
  35. return nil
  36. }
  37. now := time.Now().UnixNano()
  38. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  39. lastLogTsNs = now
  40. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark)
  41. })
  42. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  43. option.clientEpoch++
  44. metadataFollowOption := &pb.MetadataFollowOption{
  45. ClientName: "filer.remote.sync",
  46. ClientId: option.clientId,
  47. ClientEpoch: option.clientEpoch,
  48. SelfSignature: 0,
  49. PathPrefix: option.bucketsDir,
  50. AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
  51. DirectoriesToWatch: nil,
  52. StartTsNs: lastOffsetTs.UnixNano(),
  53. StopTsNs: 0,
  54. EventErrorType: pb.TrivialOnError,
  55. }
  56. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  57. }
  58. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  59. handleCreateBucket := func(entry *filer_pb.Entry) error {
  60. if !entry.IsDirectory {
  61. return nil
  62. }
  63. if entry.RemoteEntry != nil {
  64. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  65. return nil
  66. }
  67. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  68. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  69. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  70. }
  71. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  72. for k := range option.mappings.Mappings {
  73. *option.createBucketAt = k
  74. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  75. }
  76. }
  77. if *option.createBucketAt == "" {
  78. return nil
  79. }
  80. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  81. if !found {
  82. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  83. }
  84. client, err := remote_storage.GetRemoteStorage(remoteConf)
  85. if err != nil {
  86. return err
  87. }
  88. bucketName := strings.ToLower(entry.Name)
  89. if *option.include != "" {
  90. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  91. return nil
  92. }
  93. }
  94. if *option.exclude != "" {
  95. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  96. return nil
  97. }
  98. }
  99. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  100. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  101. if !found {
  102. if *option.createBucketRandomSuffix {
  103. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  104. if len(bucketName)+5 > 63 {
  105. bucketName = bucketName[:58]
  106. }
  107. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  108. }
  109. remoteLocation = &remote_pb.RemoteStorageLocation{
  110. Name: *option.createBucketAt,
  111. Bucket: bucketName,
  112. Path: "/",
  113. }
  114. // need to add new mapping here before getting updates from metadata tailing
  115. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  116. } else {
  117. bucketName = remoteLocation.Bucket
  118. }
  119. glog.V(0).Infof("create bucket %s", bucketName)
  120. if err := client.CreateBucket(bucketName); err != nil {
  121. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  122. }
  123. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  124. }
  125. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  126. if !entry.IsDirectory {
  127. return nil
  128. }
  129. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  130. if err != nil {
  131. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  132. }
  133. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  134. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  135. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  136. }
  137. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  138. return filer.DeleteMountMapping(option, string(bucketPath))
  139. }
  140. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  141. message := resp.EventNotification
  142. if message.NewEntry != nil {
  143. // update
  144. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  145. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  146. if readErr != nil {
  147. return fmt.Errorf("unmarshal mappings: %v", readErr)
  148. }
  149. option.mappings = newMappings
  150. }
  151. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  152. conf := &remote_pb.RemoteConf{}
  153. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  154. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  155. }
  156. option.remoteConfs[conf.Name] = conf
  157. }
  158. } else if message.OldEntry != nil {
  159. // deletion
  160. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  161. conf := &remote_pb.RemoteConf{}
  162. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  163. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  164. }
  165. delete(option.remoteConfs, conf.Name)
  166. }
  167. }
  168. return nil
  169. }
  170. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  171. message := resp.EventNotification
  172. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  173. return handleEtcRemoteChanges(resp)
  174. }
  175. if filer_pb.IsEmpty(resp) {
  176. return nil
  177. }
  178. if filer_pb.IsCreate(resp) {
  179. if message.NewParentPath == option.bucketsDir {
  180. return handleCreateBucket(message.NewEntry)
  181. }
  182. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  183. return nil
  184. }
  185. if !filer.HasData(message.NewEntry) {
  186. return nil
  187. }
  188. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  189. if !ok {
  190. return nil
  191. }
  192. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  193. if err != nil {
  194. return err
  195. }
  196. glog.V(2).Infof("create: %+v", resp)
  197. if !shouldSendToRemote(message.NewEntry) {
  198. glog.V(2).Infof("skipping creating: %+v", resp)
  199. return nil
  200. }
  201. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  202. if message.NewEntry.IsDirectory {
  203. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  204. return client.WriteDirectory(dest, message.NewEntry)
  205. }
  206. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  207. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  208. if writeErr != nil {
  209. return writeErr
  210. }
  211. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  212. }
  213. if filer_pb.IsDelete(resp) {
  214. if resp.Directory == option.bucketsDir {
  215. return handleDeleteBucket(message.OldEntry)
  216. }
  217. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  218. if !ok {
  219. return nil
  220. }
  221. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  222. if err != nil {
  223. return err
  224. }
  225. glog.V(2).Infof("delete: %+v", resp)
  226. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  227. if message.OldEntry.IsDirectory {
  228. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  229. return client.RemoveDirectory(dest)
  230. }
  231. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  232. return client.DeleteFile(dest)
  233. }
  234. if message.OldEntry != nil && message.NewEntry != nil {
  235. if resp.Directory == option.bucketsDir {
  236. if message.NewParentPath == option.bucketsDir {
  237. if message.OldEntry.Name == message.NewEntry.Name {
  238. return nil
  239. }
  240. if err := handleCreateBucket(message.NewEntry); err != nil {
  241. return err
  242. }
  243. if err := handleDeleteBucket(message.OldEntry); err != nil {
  244. return err
  245. }
  246. }
  247. }
  248. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  249. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  250. if oldOk && newOk {
  251. if !shouldSendToRemote(message.NewEntry) {
  252. glog.V(2).Infof("skipping updating: %+v", resp)
  253. return nil
  254. }
  255. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  256. if err != nil {
  257. return err
  258. }
  259. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  260. // update the same entry
  261. if message.NewEntry.IsDirectory {
  262. // update directory property
  263. return nil
  264. }
  265. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  266. glog.V(2).Infof("update meta: %+v", resp)
  267. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  268. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  269. } else {
  270. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  271. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  272. if writeErr != nil {
  273. return writeErr
  274. }
  275. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  276. }
  277. }
  278. }
  279. // the following is entry rename
  280. if oldOk {
  281. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  282. if err != nil {
  283. return err
  284. }
  285. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  286. if message.OldEntry.IsDirectory {
  287. return client.RemoveDirectory(oldDest)
  288. }
  289. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  290. if err := client.DeleteFile(oldDest); err != nil {
  291. return err
  292. }
  293. }
  294. if newOk {
  295. if !shouldSendToRemote(message.NewEntry) {
  296. glog.V(2).Infof("skipping updating: %+v", resp)
  297. return nil
  298. }
  299. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  300. if err != nil {
  301. return err
  302. }
  303. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  304. if message.NewEntry.IsDirectory {
  305. return client.WriteDirectory(newDest, message.NewEntry)
  306. }
  307. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  308. if writeErr != nil {
  309. return writeErr
  310. }
  311. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  312. }
  313. }
  314. return nil
  315. }
  316. return eachEntryFunc, nil
  317. }
  318. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  319. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  320. var isMounted bool
  321. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  322. if !isMounted {
  323. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  324. }
  325. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  326. if !hasClient {
  327. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  328. }
  329. client, err = remote_storage.GetRemoteStorage(remoteConf)
  330. if err != nil {
  331. return nil, remoteStorageMountLocation, err
  332. }
  333. return client, remoteStorageMountLocation, nil
  334. }
  335. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  336. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  337. if !ok {
  338. return "", nil, nil, false
  339. }
  340. var isMounted bool
  341. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  342. if !isMounted {
  343. glog.Warningf("%s is not mounted", bucket)
  344. return "", nil, nil, false
  345. }
  346. var hasClient bool
  347. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  348. if !hasClient {
  349. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  350. return "", nil, nil, false
  351. }
  352. return bucket, remoteStorageMountLocation, remoteConf, true
  353. }
  354. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  355. if !strings.HasPrefix(dir, bucketsDir+"/") {
  356. return "", false
  357. }
  358. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  359. return util.FullPath(bucketsDir).Child(parts[0]), true
  360. }
  361. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  362. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  363. if err == filer_pb.ErrNotFound {
  364. return fmt.Errorf("remote storage is not configured in filer server")
  365. }
  366. return err
  367. } else {
  368. option.mappings = mappings
  369. }
  370. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  371. var lastConfName string
  372. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  373. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  374. return nil
  375. }
  376. conf := &remote_pb.RemoteConf{}
  377. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  378. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  379. }
  380. option.remoteConfs[conf.Name] = conf
  381. lastConfName = conf.Name
  382. return nil
  383. }, "", false, math.MaxUint32)
  384. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  385. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  386. option.mappings.PrimaryBucketStorageName = lastConfName
  387. }
  388. return
  389. }