You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

238 lines
9.1 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "google.golang.org/grpc"
  14. "time"
  15. )
  16. type RemoteSyncOptions struct {
  17. filerAddress *string
  18. grpcDialOption grpc.DialOption
  19. readChunkFromFiler *bool
  20. debug *bool
  21. timeAgo *time.Duration
  22. dir *string
  23. }
  24. const (
  25. RemoteSyncKeyPrefix = "remote.sync."
  26. )
  27. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  28. func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  29. return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  30. return fn(client)
  31. })
  32. }
  33. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  34. return location.Url
  35. }
  36. var (
  37. remoteSyncOptions RemoteSyncOptions
  38. )
  39. func init() {
  40. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  41. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  42. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
  43. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  44. remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
  45. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  46. }
  47. var cmdFilerRemoteSynchronize = &Command{
  48. UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
  49. Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
  50. Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
  51. filer.remote.sync listens on filer update events.
  52. If any mounted remote file is updated, it will fetch the updated content,
  53. and write to the remote storage.
  54. `,
  55. }
  56. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  57. util.LoadConfiguration("security", false)
  58. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  59. remoteSyncOptions.grpcDialOption = grpcDialOption
  60. dir := *remoteSyncOptions.dir
  61. filerAddress := *remoteSyncOptions.filerAddress
  62. // read filer remote storage mount mappings
  63. _, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir)
  64. if detectErr != nil {
  65. fmt.Printf("read mount info: %v", detectErr)
  66. return false
  67. }
  68. filerSource := &source.FilerSource{}
  69. filerSource.DoInitialize(
  70. filerAddress,
  71. pb.ServerToGrpcAddress(filerAddress),
  72. "/", // does not matter
  73. *remoteSyncOptions.readChunkFromFiler,
  74. )
  75. fmt.Printf("synchronize %s to remote storage...\n", dir)
  76. util.RetryForever("filer.remote.sync "+dir, func() error {
  77. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
  78. }, func(err error) bool {
  79. if err != nil {
  80. glog.Errorf("synchronize %s: %v", dir, err)
  81. }
  82. return true
  83. })
  84. return true
  85. }
  86. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
  87. dirHash := util.HashStringToLong(mountedDir)
  88. // 1. specified by timeAgo
  89. // 2. last offset timestamp for this directory
  90. // 3. directory creation time
  91. var lastOffsetTs time.Time
  92. if *option.timeAgo == 0 {
  93. mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
  94. if err != nil {
  95. return fmt.Errorf("lookup %s: %v", mountedDir, err)
  96. }
  97. lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
  98. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  99. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  100. glog.V(0).Infof("resume from %v", lastOffsetTs)
  101. } else {
  102. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  103. }
  104. } else {
  105. lastOffsetTs = time.Now().Add(-*option.timeAgo)
  106. }
  107. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  108. if err != nil {
  109. return err
  110. }
  111. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  112. message := resp.EventNotification
  113. if message.OldEntry == nil && message.NewEntry == nil {
  114. return nil
  115. }
  116. if message.OldEntry == nil && message.NewEntry != nil {
  117. if !filer.HasData(message.NewEntry) {
  118. return nil
  119. }
  120. glog.V(2).Infof("create: %+v", resp)
  121. if !shouldSendToRemote(message.NewEntry) {
  122. glog.V(2).Infof("skipping creating: %+v", resp)
  123. return nil
  124. }
  125. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  126. if message.NewEntry.IsDirectory {
  127. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  128. return client.WriteDirectory(dest, message.NewEntry)
  129. }
  130. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  131. reader := filer.NewFileReader(filerSource, message.NewEntry)
  132. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  133. if writeErr != nil {
  134. return writeErr
  135. }
  136. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  137. }
  138. if message.OldEntry != nil && message.NewEntry == nil {
  139. glog.V(2).Infof("delete: %+v", resp)
  140. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  141. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  142. return client.DeleteFile(dest)
  143. }
  144. if message.OldEntry != nil && message.NewEntry != nil {
  145. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  146. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  147. if !shouldSendToRemote(message.NewEntry) {
  148. glog.V(2).Infof("skipping updating: %+v", resp)
  149. return nil
  150. }
  151. if message.NewEntry.IsDirectory {
  152. return client.WriteDirectory(dest, message.NewEntry)
  153. }
  154. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  155. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  156. glog.V(2).Infof("update meta: %+v", resp)
  157. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  158. }
  159. }
  160. glog.V(2).Infof("update: %+v", resp)
  161. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  162. if err := client.DeleteFile(oldDest); err != nil {
  163. return err
  164. }
  165. reader := filer.NewFileReader(filerSource, message.NewEntry)
  166. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  167. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  168. if writeErr != nil {
  169. return writeErr
  170. }
  171. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  172. }
  173. return nil
  174. }
  175. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  176. lastTime := time.Unix(0, lastTsNs)
  177. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  178. return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
  179. })
  180. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
  181. "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  182. }
  183. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
  184. source := string(sourcePath[len(mountDir):])
  185. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  186. return &filer_pb.RemoteStorageLocation{
  187. Name: remoteMountLocation.Name,
  188. Bucket: remoteMountLocation.Bucket,
  189. Path: string(dest),
  190. }
  191. }
  192. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  193. if entry.RemoteEntry == nil {
  194. return true
  195. }
  196. if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
  197. return true
  198. }
  199. return false
  200. }
  201. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  202. entry.RemoteEntry = remoteEntry
  203. return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  204. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  205. Directory: dir,
  206. Entry: entry,
  207. })
  208. return err
  209. })
  210. }