You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
14 KiB

  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "strings"
  16. "time"
  17. )
  18. func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  19. // read filer remote storage mount mappings
  20. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  21. return fmt.Errorf("read mount info: %v", detectErr)
  22. }
  23. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  24. if err != nil {
  25. return err
  26. }
  27. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  28. lastTime := time.Unix(0, lastTsNs)
  29. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  30. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
  31. })
  32. lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
  33. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
  34. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  35. }
  36. func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  37. handleCreateBucket := func(entry *filer_pb.Entry) error {
  38. if !entry.IsDirectory {
  39. return nil
  40. }
  41. if entry.RemoteEntry != nil {
  42. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  43. return nil
  44. }
  45. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  46. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  47. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  48. }
  49. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  50. for k := range option.mappings.Mappings {
  51. *option.createBucketAt = k
  52. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  53. }
  54. }
  55. if *option.createBucketAt == "" {
  56. return nil
  57. }
  58. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  59. if !found {
  60. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  61. }
  62. client, err := remote_storage.GetRemoteStorage(remoteConf)
  63. if err != nil {
  64. return err
  65. }
  66. bucketName := strings.ToLower(entry.Name)
  67. if *option.createBucketRandomSuffix {
  68. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  69. if len(bucketName)+5 > 63 {
  70. bucketName = bucketName[:58]
  71. }
  72. bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000)
  73. }
  74. glog.V(0).Infof("create bucket %s", bucketName)
  75. if err := client.CreateBucket(bucketName); err != nil {
  76. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  77. }
  78. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  79. remoteLocation := &remote_pb.RemoteStorageLocation{
  80. Name: *option.createBucketAt,
  81. Bucket: bucketName,
  82. Path: "/",
  83. }
  84. // need to add new mapping here before getting upates from metadata tailing
  85. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  86. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  87. }
  88. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  89. if !entry.IsDirectory {
  90. return nil
  91. }
  92. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  93. if err != nil {
  94. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  95. }
  96. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  97. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  98. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  99. }
  100. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  101. return filer.DeleteMountMapping(option, string(bucketPath))
  102. }
  103. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  104. message := resp.EventNotification
  105. if message.NewEntry != nil {
  106. // update
  107. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  108. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  109. if readErr != nil {
  110. return fmt.Errorf("unmarshal mappings: %v", readErr)
  111. }
  112. option.mappings = newMappings
  113. }
  114. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  115. conf := &remote_pb.RemoteConf{}
  116. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  117. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  118. }
  119. option.remoteConfs[conf.Name] = conf
  120. }
  121. } else if message.OldEntry != nil {
  122. // deletion
  123. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  124. conf := &remote_pb.RemoteConf{}
  125. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  126. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  127. }
  128. delete(option.remoteConfs, conf.Name)
  129. }
  130. }
  131. return nil
  132. }
  133. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  134. message := resp.EventNotification
  135. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  136. return handleEtcRemoteChanges(resp)
  137. }
  138. if message.OldEntry == nil && message.NewEntry == nil {
  139. return nil
  140. }
  141. if message.OldEntry == nil && message.NewEntry != nil {
  142. if message.NewParentPath == option.bucketsDir {
  143. return handleCreateBucket(message.NewEntry)
  144. }
  145. if !filer.HasData(message.NewEntry) {
  146. return nil
  147. }
  148. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  149. if !ok {
  150. return nil
  151. }
  152. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  153. if err != nil {
  154. return err
  155. }
  156. glog.V(2).Infof("create: %+v", resp)
  157. if !shouldSendToRemote(message.NewEntry) {
  158. glog.V(2).Infof("skipping creating: %+v", resp)
  159. return nil
  160. }
  161. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  162. if message.NewEntry.IsDirectory {
  163. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  164. return client.WriteDirectory(dest, message.NewEntry)
  165. }
  166. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  167. reader := filer.NewFileReader(filerSource, message.NewEntry)
  168. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  169. if writeErr != nil {
  170. return writeErr
  171. }
  172. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  173. }
  174. if message.OldEntry != nil && message.NewEntry == nil {
  175. if resp.Directory == option.bucketsDir {
  176. return handleDeleteBucket(message.OldEntry)
  177. }
  178. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  179. if !ok {
  180. return nil
  181. }
  182. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  183. if err != nil {
  184. return err
  185. }
  186. glog.V(2).Infof("delete: %+v", resp)
  187. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  188. if message.OldEntry.IsDirectory {
  189. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  190. return client.RemoveDirectory(dest)
  191. }
  192. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  193. return client.DeleteFile(dest)
  194. }
  195. if message.OldEntry != nil && message.NewEntry != nil {
  196. if resp.Directory == option.bucketsDir {
  197. if message.NewParentPath == option.bucketsDir {
  198. if message.OldEntry.Name == message.NewEntry.Name {
  199. return nil
  200. }
  201. if err := handleCreateBucket(message.NewEntry); err != nil {
  202. return err
  203. }
  204. if err := handleDeleteBucket(message.OldEntry); err != nil {
  205. return err
  206. }
  207. }
  208. }
  209. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  210. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  211. if oldOk && newOk {
  212. if !shouldSendToRemote(message.NewEntry) {
  213. glog.V(2).Infof("skipping updating: %+v", resp)
  214. return nil
  215. }
  216. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  217. if err != nil {
  218. return err
  219. }
  220. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  221. // update the same entry
  222. if message.NewEntry.IsDirectory {
  223. // update directory property
  224. return nil
  225. }
  226. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  227. glog.V(2).Infof("update meta: %+v", resp)
  228. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  229. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  230. } else {
  231. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  232. reader := filer.NewFileReader(filerSource, message.NewEntry)
  233. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  234. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  235. if writeErr != nil {
  236. return writeErr
  237. }
  238. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  239. }
  240. }
  241. }
  242. // the following is entry rename
  243. if oldOk {
  244. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  245. if err != nil {
  246. return err
  247. }
  248. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  249. if message.OldEntry.IsDirectory {
  250. return client.RemoveDirectory(oldDest)
  251. }
  252. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  253. if err := client.DeleteFile(oldDest); err != nil {
  254. return err
  255. }
  256. }
  257. if newOk {
  258. if !shouldSendToRemote(message.NewEntry) {
  259. glog.V(2).Infof("skipping updating: %+v", resp)
  260. return nil
  261. }
  262. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  263. if err != nil {
  264. return err
  265. }
  266. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  267. if message.NewEntry.IsDirectory {
  268. return client.WriteDirectory(newDest, message.NewEntry)
  269. }
  270. reader := filer.NewFileReader(filerSource, message.NewEntry)
  271. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  272. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  273. if writeErr != nil {
  274. return writeErr
  275. }
  276. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  277. }
  278. }
  279. return nil
  280. }
  281. return eachEntryFunc, nil
  282. }
  283. func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  284. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  285. var isMounted bool
  286. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  287. if !isMounted {
  288. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  289. }
  290. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  291. if !hasClient {
  292. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  293. }
  294. client, err = remote_storage.GetRemoteStorage(remoteConf)
  295. if err != nil {
  296. return nil, remoteStorageMountLocation, err
  297. }
  298. return client, remoteStorageMountLocation, nil
  299. }
  300. func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  301. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  302. if !ok {
  303. return "", nil, nil, false
  304. }
  305. var isMounted bool
  306. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  307. if !isMounted {
  308. glog.Warningf("%s is not mounted", bucket)
  309. return "", nil, nil, false
  310. }
  311. var hasClient bool
  312. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  313. if !hasClient {
  314. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  315. return "", nil, nil, false
  316. }
  317. return bucket, remoteStorageMountLocation, remoteConf, true
  318. }
  319. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  320. if !strings.HasPrefix(dir, bucketsDir+"/") {
  321. return "", false
  322. }
  323. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  324. return util.FullPath(bucketsDir).Child(parts[0]), true
  325. }
  326. func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
  327. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  328. return err
  329. } else {
  330. option.mappings = mappings
  331. }
  332. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  333. var lastConfName string
  334. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  335. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  336. return nil
  337. }
  338. conf := &remote_pb.RemoteConf{}
  339. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  340. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  341. }
  342. option.remoteConfs[conf.Name] = conf
  343. lastConfName = conf.Name
  344. return nil
  345. }, "", false, math.MaxUint32)
  346. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  347. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  348. option.mappings.PrimaryBucketStorageName = lastConfName
  349. }
  350. return
  351. }