You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
15 KiB

3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. "math"
  15. "math/rand"
  16. "path/filepath"
  17. "strings"
  18. "time"
  19. )
  20. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  21. // read filer remote storage mount mappings
  22. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  23. return fmt.Errorf("read mount info: %v", detectErr)
  24. }
  25. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  26. if err != nil {
  27. return err
  28. }
  29. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  30. lastTime := time.Unix(0, lastTsNs)
  31. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  32. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
  33. })
  34. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  35. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
  36. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
  37. }
  38. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  39. handleCreateBucket := func(entry *filer_pb.Entry) error {
  40. if !entry.IsDirectory {
  41. return nil
  42. }
  43. if entry.RemoteEntry != nil {
  44. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  45. return nil
  46. }
  47. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  48. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  49. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  50. }
  51. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  52. for k := range option.mappings.Mappings {
  53. *option.createBucketAt = k
  54. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  55. }
  56. }
  57. if *option.createBucketAt == "" {
  58. return nil
  59. }
  60. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  61. if !found {
  62. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  63. }
  64. client, err := remote_storage.GetRemoteStorage(remoteConf)
  65. if err != nil {
  66. return err
  67. }
  68. bucketName := strings.ToLower(entry.Name)
  69. if *option.include != "" {
  70. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  71. return nil
  72. }
  73. }
  74. if *option.exclude != "" {
  75. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  76. return nil
  77. }
  78. }
  79. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  80. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  81. if !found {
  82. if *option.createBucketRandomSuffix {
  83. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  84. if len(bucketName)+5 > 63 {
  85. bucketName = bucketName[:58]
  86. }
  87. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  88. }
  89. remoteLocation = &remote_pb.RemoteStorageLocation{
  90. Name: *option.createBucketAt,
  91. Bucket: bucketName,
  92. Path: "/",
  93. }
  94. // need to add new mapping here before getting updates from metadata tailing
  95. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  96. } else {
  97. bucketName = remoteLocation.Bucket
  98. }
  99. glog.V(0).Infof("create bucket %s", bucketName)
  100. if err := client.CreateBucket(bucketName); err != nil {
  101. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  102. }
  103. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  104. }
  105. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  106. if !entry.IsDirectory {
  107. return nil
  108. }
  109. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  110. if err != nil {
  111. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  112. }
  113. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  114. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  115. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  116. }
  117. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  118. return filer.DeleteMountMapping(option, string(bucketPath))
  119. }
  120. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  121. message := resp.EventNotification
  122. if message.NewEntry != nil {
  123. // update
  124. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  125. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  126. if readErr != nil {
  127. return fmt.Errorf("unmarshal mappings: %v", readErr)
  128. }
  129. option.mappings = newMappings
  130. }
  131. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  132. conf := &remote_pb.RemoteConf{}
  133. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  134. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  135. }
  136. option.remoteConfs[conf.Name] = conf
  137. }
  138. } else if message.OldEntry != nil {
  139. // deletion
  140. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  141. conf := &remote_pb.RemoteConf{}
  142. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  143. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  144. }
  145. delete(option.remoteConfs, conf.Name)
  146. }
  147. }
  148. return nil
  149. }
  150. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  151. message := resp.EventNotification
  152. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  153. return handleEtcRemoteChanges(resp)
  154. }
  155. if filer_pb.IsEmpty(resp) {
  156. return nil
  157. }
  158. if filer_pb.IsCreate(resp) {
  159. if message.NewParentPath == option.bucketsDir {
  160. return handleCreateBucket(message.NewEntry)
  161. }
  162. if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/"+s3_constants.MultipartUploadsFolder+"/") {
  163. return nil
  164. }
  165. if !filer.HasData(message.NewEntry) {
  166. return nil
  167. }
  168. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  169. if !ok {
  170. return nil
  171. }
  172. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  173. if err != nil {
  174. return err
  175. }
  176. glog.V(2).Infof("create: %+v", resp)
  177. if !shouldSendToRemote(message.NewEntry) {
  178. glog.V(2).Infof("skipping creating: %+v", resp)
  179. return nil
  180. }
  181. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  182. if message.NewEntry.IsDirectory {
  183. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  184. return client.WriteDirectory(dest, message.NewEntry)
  185. }
  186. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  187. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  188. if writeErr != nil {
  189. return writeErr
  190. }
  191. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  192. }
  193. if filer_pb.IsDelete(resp) {
  194. if resp.Directory == option.bucketsDir {
  195. return handleDeleteBucket(message.OldEntry)
  196. }
  197. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  198. if !ok {
  199. return nil
  200. }
  201. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  202. if err != nil {
  203. return err
  204. }
  205. glog.V(2).Infof("delete: %+v", resp)
  206. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  207. if message.OldEntry.IsDirectory {
  208. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  209. return client.RemoveDirectory(dest)
  210. }
  211. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  212. return client.DeleteFile(dest)
  213. }
  214. if message.OldEntry != nil && message.NewEntry != nil {
  215. if resp.Directory == option.bucketsDir {
  216. if message.NewParentPath == option.bucketsDir {
  217. if message.OldEntry.Name == message.NewEntry.Name {
  218. return nil
  219. }
  220. if err := handleCreateBucket(message.NewEntry); err != nil {
  221. return err
  222. }
  223. if err := handleDeleteBucket(message.OldEntry); err != nil {
  224. return err
  225. }
  226. }
  227. }
  228. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  229. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  230. if oldOk && newOk {
  231. if !shouldSendToRemote(message.NewEntry) {
  232. glog.V(2).Infof("skipping updating: %+v", resp)
  233. return nil
  234. }
  235. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  236. if err != nil {
  237. return err
  238. }
  239. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  240. // update the same entry
  241. if message.NewEntry.IsDirectory {
  242. // update directory property
  243. return nil
  244. }
  245. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  246. glog.V(2).Infof("update meta: %+v", resp)
  247. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  248. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  249. } else {
  250. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  251. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  252. if writeErr != nil {
  253. return writeErr
  254. }
  255. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  256. }
  257. }
  258. }
  259. // the following is entry rename
  260. if oldOk {
  261. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  262. if err != nil {
  263. return err
  264. }
  265. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  266. if message.OldEntry.IsDirectory {
  267. return client.RemoveDirectory(oldDest)
  268. }
  269. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  270. if err := client.DeleteFile(oldDest); err != nil {
  271. return err
  272. }
  273. }
  274. if newOk {
  275. if !shouldSendToRemote(message.NewEntry) {
  276. glog.V(2).Infof("skipping updating: %+v", resp)
  277. return nil
  278. }
  279. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  280. if err != nil {
  281. return err
  282. }
  283. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  284. if message.NewEntry.IsDirectory {
  285. return client.WriteDirectory(newDest, message.NewEntry)
  286. }
  287. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  288. if writeErr != nil {
  289. return writeErr
  290. }
  291. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  292. }
  293. }
  294. return nil
  295. }
  296. return eachEntryFunc, nil
  297. }
  298. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  299. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  300. var isMounted bool
  301. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  302. if !isMounted {
  303. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  304. }
  305. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  306. if !hasClient {
  307. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  308. }
  309. client, err = remote_storage.GetRemoteStorage(remoteConf)
  310. if err != nil {
  311. return nil, remoteStorageMountLocation, err
  312. }
  313. return client, remoteStorageMountLocation, nil
  314. }
  315. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  316. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  317. if !ok {
  318. return "", nil, nil, false
  319. }
  320. var isMounted bool
  321. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  322. if !isMounted {
  323. glog.Warningf("%s is not mounted", bucket)
  324. return "", nil, nil, false
  325. }
  326. var hasClient bool
  327. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  328. if !hasClient {
  329. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  330. return "", nil, nil, false
  331. }
  332. return bucket, remoteStorageMountLocation, remoteConf, true
  333. }
  334. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  335. if !strings.HasPrefix(dir, bucketsDir+"/") {
  336. return "", false
  337. }
  338. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  339. return util.FullPath(bucketsDir).Child(parts[0]), true
  340. }
  341. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  342. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  343. return err
  344. } else {
  345. option.mappings = mappings
  346. }
  347. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  348. var lastConfName string
  349. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  350. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  351. return nil
  352. }
  353. conf := &remote_pb.RemoteConf{}
  354. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  355. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  356. }
  357. option.remoteConfs[conf.Name] = conf
  358. lastConfName = conf.Name
  359. return nil
  360. }, "", false, math.MaxUint32)
  361. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  362. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  363. option.mappings.PrimaryBucketStorageName = lastConfName
  364. }
  365. return
  366. }