You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
14 KiB

3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "path/filepath"
  16. "strings"
  17. "time"
  18. )
  19. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  20. // read filer remote storage mount mappings
  21. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  22. return fmt.Errorf("read mount info: %v", detectErr)
  23. }
  24. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  25. if err != nil {
  26. return err
  27. }
  28. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  29. lastTime := time.Unix(0, lastTsNs)
  30. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  31. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
  32. })
  33. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  34. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
  35. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
  36. }
  37. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  38. handleCreateBucket := func(entry *filer_pb.Entry) error {
  39. if !entry.IsDirectory {
  40. return nil
  41. }
  42. if entry.RemoteEntry != nil {
  43. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  44. return nil
  45. }
  46. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  47. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  48. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  49. }
  50. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  51. for k := range option.mappings.Mappings {
  52. *option.createBucketAt = k
  53. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  54. }
  55. }
  56. if *option.createBucketAt == "" {
  57. return nil
  58. }
  59. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  60. if !found {
  61. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  62. }
  63. client, err := remote_storage.GetRemoteStorage(remoteConf)
  64. if err != nil {
  65. return err
  66. }
  67. bucketName := strings.ToLower(entry.Name)
  68. if *option.include != "" {
  69. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  70. return nil
  71. }
  72. }
  73. if *option.exclude != "" {
  74. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  75. return nil
  76. }
  77. }
  78. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  79. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  80. if !found {
  81. if *option.createBucketRandomSuffix {
  82. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  83. if len(bucketName)+5 > 63 {
  84. bucketName = bucketName[:58]
  85. }
  86. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  87. }
  88. remoteLocation = &remote_pb.RemoteStorageLocation{
  89. Name: *option.createBucketAt,
  90. Bucket: bucketName,
  91. Path: "/",
  92. }
  93. // need to add new mapping here before getting updates from metadata tailing
  94. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  95. } else {
  96. bucketName = remoteLocation.Bucket
  97. }
  98. glog.V(0).Infof("create bucket %s", bucketName)
  99. if err := client.CreateBucket(bucketName); err != nil {
  100. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  101. }
  102. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  103. }
  104. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  105. if !entry.IsDirectory {
  106. return nil
  107. }
  108. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  109. if err != nil {
  110. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  111. }
  112. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  113. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  114. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  115. }
  116. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  117. return filer.DeleteMountMapping(option, string(bucketPath))
  118. }
  119. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  120. message := resp.EventNotification
  121. if message.NewEntry != nil {
  122. // update
  123. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  124. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  125. if readErr != nil {
  126. return fmt.Errorf("unmarshal mappings: %v", readErr)
  127. }
  128. option.mappings = newMappings
  129. }
  130. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  131. conf := &remote_pb.RemoteConf{}
  132. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  133. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  134. }
  135. option.remoteConfs[conf.Name] = conf
  136. }
  137. } else if message.OldEntry != nil {
  138. // deletion
  139. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  140. conf := &remote_pb.RemoteConf{}
  141. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  142. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  143. }
  144. delete(option.remoteConfs, conf.Name)
  145. }
  146. }
  147. return nil
  148. }
  149. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  150. message := resp.EventNotification
  151. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  152. return handleEtcRemoteChanges(resp)
  153. }
  154. if filer_pb.IsEmpty(resp) {
  155. return nil
  156. }
  157. if filer_pb.IsCreate(resp) {
  158. if message.NewParentPath == option.bucketsDir {
  159. return handleCreateBucket(message.NewEntry)
  160. }
  161. if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/.uploads/") {
  162. return nil
  163. }
  164. if !filer.HasData(message.NewEntry) {
  165. return nil
  166. }
  167. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  168. if !ok {
  169. return nil
  170. }
  171. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  172. if err != nil {
  173. return err
  174. }
  175. glog.V(2).Infof("create: %+v", resp)
  176. if !shouldSendToRemote(message.NewEntry) {
  177. glog.V(2).Infof("skipping creating: %+v", resp)
  178. return nil
  179. }
  180. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  181. if message.NewEntry.IsDirectory {
  182. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  183. return client.WriteDirectory(dest, message.NewEntry)
  184. }
  185. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  186. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  187. if writeErr != nil {
  188. return writeErr
  189. }
  190. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  191. }
  192. if filer_pb.IsDelete(resp) {
  193. if resp.Directory == option.bucketsDir {
  194. return handleDeleteBucket(message.OldEntry)
  195. }
  196. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  197. if !ok {
  198. return nil
  199. }
  200. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  201. if err != nil {
  202. return err
  203. }
  204. glog.V(2).Infof("delete: %+v", resp)
  205. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  206. if message.OldEntry.IsDirectory {
  207. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  208. return client.RemoveDirectory(dest)
  209. }
  210. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  211. return client.DeleteFile(dest)
  212. }
  213. if message.OldEntry != nil && message.NewEntry != nil {
  214. if resp.Directory == option.bucketsDir {
  215. if message.NewParentPath == option.bucketsDir {
  216. if message.OldEntry.Name == message.NewEntry.Name {
  217. return nil
  218. }
  219. if err := handleCreateBucket(message.NewEntry); err != nil {
  220. return err
  221. }
  222. if err := handleDeleteBucket(message.OldEntry); err != nil {
  223. return err
  224. }
  225. }
  226. }
  227. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  228. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  229. if oldOk && newOk {
  230. if !shouldSendToRemote(message.NewEntry) {
  231. glog.V(2).Infof("skipping updating: %+v", resp)
  232. return nil
  233. }
  234. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  235. if err != nil {
  236. return err
  237. }
  238. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  239. // update the same entry
  240. if message.NewEntry.IsDirectory {
  241. // update directory property
  242. return nil
  243. }
  244. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  245. glog.V(2).Infof("update meta: %+v", resp)
  246. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  247. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  248. } else {
  249. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  250. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  251. if writeErr != nil {
  252. return writeErr
  253. }
  254. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  255. }
  256. }
  257. }
  258. // the following is entry rename
  259. if oldOk {
  260. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  261. if err != nil {
  262. return err
  263. }
  264. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  265. if message.OldEntry.IsDirectory {
  266. return client.RemoveDirectory(oldDest)
  267. }
  268. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  269. if err := client.DeleteFile(oldDest); err != nil {
  270. return err
  271. }
  272. }
  273. if newOk {
  274. if !shouldSendToRemote(message.NewEntry) {
  275. glog.V(2).Infof("skipping updating: %+v", resp)
  276. return nil
  277. }
  278. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  279. if err != nil {
  280. return err
  281. }
  282. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  283. if message.NewEntry.IsDirectory {
  284. return client.WriteDirectory(newDest, message.NewEntry)
  285. }
  286. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  287. if writeErr != nil {
  288. return writeErr
  289. }
  290. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  291. }
  292. }
  293. return nil
  294. }
  295. return eachEntryFunc, nil
  296. }
  297. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  298. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  299. var isMounted bool
  300. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  301. if !isMounted {
  302. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  303. }
  304. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  305. if !hasClient {
  306. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  307. }
  308. client, err = remote_storage.GetRemoteStorage(remoteConf)
  309. if err != nil {
  310. return nil, remoteStorageMountLocation, err
  311. }
  312. return client, remoteStorageMountLocation, nil
  313. }
  314. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  315. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  316. if !ok {
  317. return "", nil, nil, false
  318. }
  319. var isMounted bool
  320. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  321. if !isMounted {
  322. glog.Warningf("%s is not mounted", bucket)
  323. return "", nil, nil, false
  324. }
  325. var hasClient bool
  326. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  327. if !hasClient {
  328. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  329. return "", nil, nil, false
  330. }
  331. return bucket, remoteStorageMountLocation, remoteConf, true
  332. }
  333. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  334. if !strings.HasPrefix(dir, bucketsDir+"/") {
  335. return "", false
  336. }
  337. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  338. return util.FullPath(bucketsDir).Child(parts[0]), true
  339. }
  340. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  341. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  342. return err
  343. } else {
  344. option.mappings = mappings
  345. }
  346. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  347. var lastConfName string
  348. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  349. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  350. return nil
  351. }
  352. conf := &remote_pb.RemoteConf{}
  353. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  354. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  355. }
  356. option.remoteConfs[conf.Name] = conf
  357. lastConfName = conf.Name
  358. return nil
  359. }, "", false, math.MaxUint32)
  360. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  361. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  362. option.mappings.PrimaryBucketStorageName = lastConfName
  363. }
  364. return
  365. }