You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
8.3 KiB

  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  9. "github.com/chrislusf/seaweedfs/weed/replication/source"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "google.golang.org/grpc"
  14. "strings"
  15. "time"
  16. )
  17. type RemoteSyncOptions struct {
  18. filerAddress *string
  19. grpcDialOption grpc.DialOption
  20. readChunkFromFiler *bool
  21. debug *bool
  22. timeAgo *time.Duration
  23. dir *string
  24. }
  25. const (
  26. RemoteSyncKeyPrefix = "remote.sync."
  27. )
  28. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  29. func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  30. return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  31. return fn(client)
  32. })
  33. }
  34. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  35. return location.Url
  36. }
  37. var (
  38. remoteSyncOptions RemoteSyncOptions
  39. )
  40. func init() {
  41. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  42. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  43. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
  44. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  45. remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
  46. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  47. }
  48. var cmdFilerRemoteSynchronize = &Command{
  49. UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
  50. Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
  51. Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
  52. filer.remote.sync listens on filer update events.
  53. If any mounted remote file is updated, it will fetch the updated content,
  54. and write to the remote storage.
  55. `,
  56. }
  57. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  58. util.LoadConfiguration("security", false)
  59. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  60. remoteSyncOptions.grpcDialOption = grpcDialOption
  61. // read filer remote storage mount mappings
  62. mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
  63. if readErr != nil {
  64. fmt.Printf("read mount mapping: %v", readErr)
  65. return false
  66. }
  67. filerSource := &source.FilerSource{}
  68. filerSource.DoInitialize(
  69. *remoteSyncOptions.filerAddress,
  70. pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
  71. "/", // does not matter
  72. *remoteSyncOptions.readChunkFromFiler,
  73. )
  74. var found bool
  75. for dir, remoteStorageMountLocation := range mappings.Mappings {
  76. if *remoteSyncOptions.dir == dir {
  77. found = true
  78. storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
  79. if readErr != nil {
  80. fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
  81. continue
  82. }
  83. fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
  84. if err := util.Retry("filer.remote.sync "+dir, func() error {
  85. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
  86. }); err != nil {
  87. fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
  88. }
  89. break
  90. }
  91. }
  92. if !found {
  93. fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
  94. return false
  95. }
  96. return true
  97. }
  98. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
  99. dirHash := util.HashStringToLong(mountedDir)
  100. // 1. specified by timeAgo
  101. // 2. last offset timestamp for this directory
  102. // 3. directory creation time
  103. var lastOffsetTs time.Time
  104. if *option.timeAgo == 0 {
  105. mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
  106. if err != nil {
  107. return fmt.Errorf("lookup %s: %v", mountedDir, err)
  108. }
  109. lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
  110. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  111. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  112. glog.V(0).Infof("resume from %v", lastOffsetTs)
  113. } else {
  114. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  115. }
  116. } else {
  117. lastOffsetTs = time.Now().Add(-*option.timeAgo)
  118. }
  119. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  120. if err != nil {
  121. return err
  122. }
  123. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  124. message := resp.EventNotification
  125. if message.OldEntry == nil && message.NewEntry == nil {
  126. return nil
  127. }
  128. if message.OldEntry == nil && message.NewEntry != nil {
  129. if len(message.NewEntry.Chunks) == 0 {
  130. return nil
  131. }
  132. fmt.Printf("create: %+v\n", resp)
  133. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  134. reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
  135. return client.WriteFile(dest, message.NewEntry, reader)
  136. }
  137. if message.OldEntry != nil && message.NewEntry == nil {
  138. fmt.Printf("delete: %+v\n", resp)
  139. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  140. return client.DeleteFile(dest)
  141. }
  142. if message.OldEntry != nil && message.NewEntry != nil {
  143. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  144. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  145. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  146. if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
  147. fmt.Printf("update meta: %+v\n", resp)
  148. return client.UpdateFileMetadata(dest, message.NewEntry)
  149. }
  150. }
  151. fmt.Printf("update: %+v\n", resp)
  152. if err := client.DeleteFile(oldDest); err != nil {
  153. return err
  154. }
  155. reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
  156. return client.WriteFile(dest, message.NewEntry, reader)
  157. }
  158. return nil
  159. }
  160. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  161. lastTime := time.Unix(0, lastTsNs)
  162. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  163. return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
  164. })
  165. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
  166. "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  167. }
  168. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
  169. var dest string
  170. source := string(sourcePath[len(mountDir):])
  171. if strings.HasSuffix(remoteMountLocation.Path, "/") {
  172. dest = remoteMountLocation.Path + source[1:]
  173. } else {
  174. dest = remoteMountLocation.Path + source
  175. }
  176. return &filer_pb.RemoteStorageLocation{
  177. Name: remoteMountLocation.Name,
  178. Bucket: remoteMountLocation.Bucket,
  179. Path: dest,
  180. }
  181. }
  182. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  183. if len(a) != len(b) {
  184. return false
  185. }
  186. for i := 0; i < len(a); i++ {
  187. x, y := a[i], b[i]
  188. if !proto.Equal(x, y) {
  189. return false
  190. }
  191. }
  192. return true
  193. }