You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
15 KiB

3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  11. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "google.golang.org/protobuf/proto"
  14. "math"
  15. "math/rand"
  16. "path/filepath"
  17. "strings"
  18. "time"
  19. )
  20. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  21. // read filer remote storage mount mappings
  22. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  23. return fmt.Errorf("read mount info: %v", detectErr)
  24. }
  25. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  26. if err != nil {
  27. return err
  28. }
  29. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  30. lastTime := time.Unix(0, lastTsNs)
  31. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  32. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
  33. })
  34. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  35. option.clientEpoch++
  36. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
  37. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
  38. }
  39. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  40. handleCreateBucket := func(entry *filer_pb.Entry) error {
  41. if !entry.IsDirectory {
  42. return nil
  43. }
  44. if entry.RemoteEntry != nil {
  45. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  46. return nil
  47. }
  48. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  49. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  50. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  51. }
  52. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  53. for k := range option.mappings.Mappings {
  54. *option.createBucketAt = k
  55. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  56. }
  57. }
  58. if *option.createBucketAt == "" {
  59. return nil
  60. }
  61. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  62. if !found {
  63. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  64. }
  65. client, err := remote_storage.GetRemoteStorage(remoteConf)
  66. if err != nil {
  67. return err
  68. }
  69. bucketName := strings.ToLower(entry.Name)
  70. if *option.include != "" {
  71. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  72. return nil
  73. }
  74. }
  75. if *option.exclude != "" {
  76. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  77. return nil
  78. }
  79. }
  80. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  81. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  82. if !found {
  83. if *option.createBucketRandomSuffix {
  84. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  85. if len(bucketName)+5 > 63 {
  86. bucketName = bucketName[:58]
  87. }
  88. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  89. }
  90. remoteLocation = &remote_pb.RemoteStorageLocation{
  91. Name: *option.createBucketAt,
  92. Bucket: bucketName,
  93. Path: "/",
  94. }
  95. // need to add new mapping here before getting updates from metadata tailing
  96. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  97. } else {
  98. bucketName = remoteLocation.Bucket
  99. }
  100. glog.V(0).Infof("create bucket %s", bucketName)
  101. if err := client.CreateBucket(bucketName); err != nil {
  102. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  103. }
  104. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  105. }
  106. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  107. if !entry.IsDirectory {
  108. return nil
  109. }
  110. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  111. if err != nil {
  112. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  113. }
  114. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  115. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  116. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  117. }
  118. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  119. return filer.DeleteMountMapping(option, string(bucketPath))
  120. }
  121. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  122. message := resp.EventNotification
  123. if message.NewEntry != nil {
  124. // update
  125. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  126. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  127. if readErr != nil {
  128. return fmt.Errorf("unmarshal mappings: %v", readErr)
  129. }
  130. option.mappings = newMappings
  131. }
  132. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  133. conf := &remote_pb.RemoteConf{}
  134. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  135. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  136. }
  137. option.remoteConfs[conf.Name] = conf
  138. }
  139. } else if message.OldEntry != nil {
  140. // deletion
  141. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  142. conf := &remote_pb.RemoteConf{}
  143. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  144. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  145. }
  146. delete(option.remoteConfs, conf.Name)
  147. }
  148. }
  149. return nil
  150. }
  151. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  152. message := resp.EventNotification
  153. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  154. return handleEtcRemoteChanges(resp)
  155. }
  156. if filer_pb.IsEmpty(resp) {
  157. return nil
  158. }
  159. if filer_pb.IsCreate(resp) {
  160. if message.NewParentPath == option.bucketsDir {
  161. return handleCreateBucket(message.NewEntry)
  162. }
  163. if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/"+s3_constants.MultipartUploadsFolder+"/") {
  164. return nil
  165. }
  166. if !filer.HasData(message.NewEntry) {
  167. return nil
  168. }
  169. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  170. if !ok {
  171. return nil
  172. }
  173. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  174. if err != nil {
  175. return err
  176. }
  177. glog.V(2).Infof("create: %+v", resp)
  178. if !shouldSendToRemote(message.NewEntry) {
  179. glog.V(2).Infof("skipping creating: %+v", resp)
  180. return nil
  181. }
  182. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  183. if message.NewEntry.IsDirectory {
  184. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  185. return client.WriteDirectory(dest, message.NewEntry)
  186. }
  187. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  188. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  189. if writeErr != nil {
  190. return writeErr
  191. }
  192. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  193. }
  194. if filer_pb.IsDelete(resp) {
  195. if resp.Directory == option.bucketsDir {
  196. return handleDeleteBucket(message.OldEntry)
  197. }
  198. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  199. if !ok {
  200. return nil
  201. }
  202. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  203. if err != nil {
  204. return err
  205. }
  206. glog.V(2).Infof("delete: %+v", resp)
  207. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  208. if message.OldEntry.IsDirectory {
  209. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  210. return client.RemoveDirectory(dest)
  211. }
  212. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  213. return client.DeleteFile(dest)
  214. }
  215. if message.OldEntry != nil && message.NewEntry != nil {
  216. if resp.Directory == option.bucketsDir {
  217. if message.NewParentPath == option.bucketsDir {
  218. if message.OldEntry.Name == message.NewEntry.Name {
  219. return nil
  220. }
  221. if err := handleCreateBucket(message.NewEntry); err != nil {
  222. return err
  223. }
  224. if err := handleDeleteBucket(message.OldEntry); err != nil {
  225. return err
  226. }
  227. }
  228. }
  229. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  230. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  231. if oldOk && newOk {
  232. if !shouldSendToRemote(message.NewEntry) {
  233. glog.V(2).Infof("skipping updating: %+v", resp)
  234. return nil
  235. }
  236. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  237. if err != nil {
  238. return err
  239. }
  240. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  241. // update the same entry
  242. if message.NewEntry.IsDirectory {
  243. // update directory property
  244. return nil
  245. }
  246. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  247. glog.V(2).Infof("update meta: %+v", resp)
  248. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  249. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  250. } else {
  251. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  252. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  253. if writeErr != nil {
  254. return writeErr
  255. }
  256. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  257. }
  258. }
  259. }
  260. // the following is entry rename
  261. if oldOk {
  262. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  263. if err != nil {
  264. return err
  265. }
  266. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  267. if message.OldEntry.IsDirectory {
  268. return client.RemoveDirectory(oldDest)
  269. }
  270. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  271. if err := client.DeleteFile(oldDest); err != nil {
  272. return err
  273. }
  274. }
  275. if newOk {
  276. if !shouldSendToRemote(message.NewEntry) {
  277. glog.V(2).Infof("skipping updating: %+v", resp)
  278. return nil
  279. }
  280. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  281. if err != nil {
  282. return err
  283. }
  284. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  285. if message.NewEntry.IsDirectory {
  286. return client.WriteDirectory(newDest, message.NewEntry)
  287. }
  288. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  289. if writeErr != nil {
  290. return writeErr
  291. }
  292. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  293. }
  294. }
  295. return nil
  296. }
  297. return eachEntryFunc, nil
  298. }
  299. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  300. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  301. var isMounted bool
  302. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  303. if !isMounted {
  304. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  305. }
  306. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  307. if !hasClient {
  308. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  309. }
  310. client, err = remote_storage.GetRemoteStorage(remoteConf)
  311. if err != nil {
  312. return nil, remoteStorageMountLocation, err
  313. }
  314. return client, remoteStorageMountLocation, nil
  315. }
  316. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  317. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  318. if !ok {
  319. return "", nil, nil, false
  320. }
  321. var isMounted bool
  322. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  323. if !isMounted {
  324. glog.Warningf("%s is not mounted", bucket)
  325. return "", nil, nil, false
  326. }
  327. var hasClient bool
  328. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  329. if !hasClient {
  330. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  331. return "", nil, nil, false
  332. }
  333. return bucket, remoteStorageMountLocation, remoteConf, true
  334. }
  335. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  336. if !strings.HasPrefix(dir, bucketsDir+"/") {
  337. return "", false
  338. }
  339. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  340. return util.FullPath(bucketsDir).Child(parts[0]), true
  341. }
  342. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  343. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  344. return err
  345. } else {
  346. option.mappings = mappings
  347. }
  348. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  349. var lastConfName string
  350. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  351. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  352. return nil
  353. }
  354. conf := &remote_pb.RemoteConf{}
  355. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  356. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  357. }
  358. option.remoteConfs[conf.Name] = conf
  359. lastConfName = conf.Name
  360. return nil
  361. }, "", false, math.MaxUint32)
  362. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  363. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  364. option.mappings.PrimaryBucketStorageName = lastConfName
  365. }
  366. return
  367. }