You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
13 KiB

  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "math"
  14. "math/rand"
  15. "strings"
  16. "time"
  17. )
  18. func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  19. // read filer remote storage mount mappings
  20. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  21. return fmt.Errorf("read mount info: %v", detectErr)
  22. }
  23. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  24. if err != nil {
  25. return err
  26. }
  27. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  28. lastTime := time.Unix(0, lastTsNs)
  29. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  30. return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs)
  31. })
  32. lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
  33. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
  34. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  35. }
  36. func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  37. handleCreateBucket := func(entry *filer_pb.Entry) error {
  38. if !entry.IsDirectory {
  39. return nil
  40. }
  41. if entry.RemoteEntry != nil {
  42. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  43. return nil
  44. }
  45. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  46. if !found {
  47. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  48. }
  49. client, err := remote_storage.GetRemoteStorage(remoteConf)
  50. if err != nil {
  51. return err
  52. }
  53. bucketName := strings.ToLower(entry.Name)
  54. if *option.createBucketRandomSuffix {
  55. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  56. if len(bucketName)+5 > 63 {
  57. bucketName = bucketName[:58]
  58. }
  59. bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000)
  60. }
  61. glog.V(0).Infof("create bucket %s", bucketName)
  62. if err := client.CreateBucket(bucketName); err != nil {
  63. return err
  64. }
  65. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  66. remoteLocation := &remote_pb.RemoteStorageLocation{
  67. Name: *option.createBucketAt,
  68. Bucket: bucketName,
  69. Path: "/",
  70. }
  71. // need to add new mapping here before getting upates from metadata tailing
  72. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  73. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  74. }
  75. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  76. if !entry.IsDirectory {
  77. return nil
  78. }
  79. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  80. if err != nil {
  81. return err
  82. }
  83. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  84. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  85. return err
  86. }
  87. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  88. return filer.DeleteMountMapping(option, string(bucketPath))
  89. }
  90. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  91. message := resp.EventNotification
  92. if message.NewEntry != nil {
  93. // update
  94. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  95. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  96. if readErr != nil {
  97. return fmt.Errorf("unmarshal mappings: %v", readErr)
  98. }
  99. option.mappings = newMappings
  100. }
  101. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  102. conf := &remote_pb.RemoteConf{}
  103. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  104. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  105. }
  106. option.remoteConfs[conf.Name] = conf
  107. }
  108. } else if message.OldEntry != nil {
  109. // deletion
  110. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  111. conf := &remote_pb.RemoteConf{}
  112. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  113. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  114. }
  115. delete(option.remoteConfs, conf.Name)
  116. }
  117. }
  118. return nil
  119. }
  120. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  121. message := resp.EventNotification
  122. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  123. return handleEtcRemoteChanges(resp)
  124. }
  125. if message.OldEntry == nil && message.NewEntry == nil {
  126. return nil
  127. }
  128. if message.OldEntry == nil && message.NewEntry != nil {
  129. if message.NewParentPath == option.bucketsDir {
  130. return handleCreateBucket(message.NewEntry)
  131. }
  132. if !filer.HasData(message.NewEntry) {
  133. return nil
  134. }
  135. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  136. if !ok {
  137. return nil
  138. }
  139. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  140. if err != nil {
  141. return err
  142. }
  143. glog.V(2).Infof("create: %+v", resp)
  144. if !shouldSendToRemote(message.NewEntry) {
  145. glog.V(2).Infof("skipping creating: %+v", resp)
  146. return nil
  147. }
  148. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  149. if message.NewEntry.IsDirectory {
  150. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  151. return client.WriteDirectory(dest, message.NewEntry)
  152. }
  153. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  154. reader := filer.NewFileReader(filerSource, message.NewEntry)
  155. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  156. if writeErr != nil {
  157. return writeErr
  158. }
  159. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  160. }
  161. if message.OldEntry != nil && message.NewEntry == nil {
  162. if resp.Directory == option.bucketsDir {
  163. return handleDeleteBucket(message.OldEntry)
  164. }
  165. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  166. if !ok {
  167. return nil
  168. }
  169. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  170. if err != nil {
  171. return err
  172. }
  173. glog.V(2).Infof("delete: %+v", resp)
  174. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  175. if message.OldEntry.IsDirectory {
  176. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  177. return client.RemoveDirectory(dest)
  178. }
  179. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  180. return client.DeleteFile(dest)
  181. }
  182. if message.OldEntry != nil && message.NewEntry != nil {
  183. if resp.Directory == option.bucketsDir {
  184. if message.NewParentPath == option.bucketsDir {
  185. if message.OldEntry.Name == message.NewEntry.Name {
  186. return nil
  187. }
  188. if err := handleCreateBucket(message.NewEntry); err != nil {
  189. return err
  190. }
  191. if err := handleDeleteBucket(message.OldEntry); err != nil {
  192. return err
  193. }
  194. }
  195. }
  196. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  197. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  198. if oldOk && newOk {
  199. if !shouldSendToRemote(message.NewEntry) {
  200. glog.V(2).Infof("skipping updating: %+v", resp)
  201. return nil
  202. }
  203. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  204. if err != nil {
  205. return err
  206. }
  207. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  208. // update the same entry
  209. if message.NewEntry.IsDirectory {
  210. // update directory property
  211. return nil
  212. }
  213. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  214. glog.V(2).Infof("update meta: %+v", resp)
  215. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  216. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  217. } else {
  218. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  219. reader := filer.NewFileReader(filerSource, message.NewEntry)
  220. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  221. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  222. if writeErr != nil {
  223. return writeErr
  224. }
  225. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  226. }
  227. }
  228. }
  229. // the following is entry rename
  230. if oldOk {
  231. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  232. if err != nil {
  233. return err
  234. }
  235. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  236. if message.OldEntry.IsDirectory {
  237. return client.RemoveDirectory(oldDest)
  238. }
  239. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  240. if err := client.DeleteFile(oldDest); err != nil {
  241. return err
  242. }
  243. }
  244. if newOk {
  245. if !shouldSendToRemote(message.NewEntry) {
  246. glog.V(2).Infof("skipping updating: %+v", resp)
  247. return nil
  248. }
  249. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  250. if err != nil {
  251. return err
  252. }
  253. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  254. if message.NewEntry.IsDirectory {
  255. return client.WriteDirectory(newDest, message.NewEntry)
  256. }
  257. reader := filer.NewFileReader(filerSource, message.NewEntry)
  258. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  259. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  260. if writeErr != nil {
  261. return writeErr
  262. }
  263. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  264. }
  265. }
  266. return nil
  267. }
  268. return eachEntryFunc, nil
  269. }
  270. func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  271. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  272. var isMounted bool
  273. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  274. if !isMounted {
  275. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  276. }
  277. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  278. if !hasClient {
  279. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  280. }
  281. client, err = remote_storage.GetRemoteStorage(remoteConf)
  282. if err != nil {
  283. return nil, remoteStorageMountLocation, err
  284. }
  285. return client, remoteStorageMountLocation, nil
  286. }
  287. func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  288. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  289. if !ok {
  290. return "", nil, nil, false
  291. }
  292. var isMounted bool
  293. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  294. if !isMounted {
  295. glog.Warningf("%s is not mounted", bucket)
  296. return "", nil, nil, false
  297. }
  298. var hasClient bool
  299. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  300. if !hasClient {
  301. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  302. return "", nil, nil, false
  303. }
  304. return bucket, remoteStorageMountLocation, remoteConf, true
  305. }
  306. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  307. if !strings.HasPrefix(dir, bucketsDir+"/") {
  308. return "", false
  309. }
  310. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  311. return util.FullPath(bucketsDir).Child(parts[0]), true
  312. }
  313. func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
  314. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil {
  315. return err
  316. } else {
  317. option.mappings = mappings
  318. }
  319. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  320. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  321. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  322. return nil
  323. }
  324. conf := &remote_pb.RemoteConf{}
  325. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  326. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  327. }
  328. option.remoteConfs[conf.Name] = conf
  329. return nil
  330. }, "", false, math.MaxUint32)
  331. return
  332. }