You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
9.5 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. "google.golang.org/grpc"
  15. "strings"
  16. "time"
  17. )
  18. type RemoteSyncOptions struct {
  19. filerAddress *string
  20. grpcDialOption grpc.DialOption
  21. readChunkFromFiler *bool
  22. debug *bool
  23. timeAgo *time.Duration
  24. dir *string
  25. }
  26. const (
  27. RemoteSyncKeyPrefix = "remote.sync."
  28. )
  29. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  30. func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  31. return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  32. return fn(client)
  33. })
  34. }
  35. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  36. return location.Url
  37. }
  38. var (
  39. remoteSyncOptions RemoteSyncOptions
  40. )
  41. func init() {
  42. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  43. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  44. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
  45. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  46. remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
  47. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  48. }
  49. var cmdFilerRemoteSynchronize = &Command{
  50. UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
  51. Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
  52. Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
  53. filer.remote.sync listens on filer update events.
  54. If any mounted remote file is updated, it will fetch the updated content,
  55. and write to the remote storage.
  56. `,
  57. }
  58. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  59. util.LoadConfiguration("security", false)
  60. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  61. remoteSyncOptions.grpcDialOption = grpcDialOption
  62. // read filer remote storage mount mappings
  63. mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
  64. if readErr != nil {
  65. fmt.Printf("read mount mapping: %v", readErr)
  66. return false
  67. }
  68. filerSource := &source.FilerSource{}
  69. filerSource.DoInitialize(
  70. *remoteSyncOptions.filerAddress,
  71. pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
  72. "/", // does not matter
  73. *remoteSyncOptions.readChunkFromFiler,
  74. )
  75. var found bool
  76. for dir, remoteStorageMountLocation := range mappings.Mappings {
  77. if *remoteSyncOptions.dir == dir {
  78. found = true
  79. storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
  80. if readErr != nil {
  81. fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
  82. continue
  83. }
  84. fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
  85. if err := util.Retry("filer.remote.sync "+dir, func() error {
  86. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
  87. }); err != nil {
  88. fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
  89. }
  90. break
  91. }
  92. }
  93. if !found {
  94. fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
  95. return false
  96. }
  97. return true
  98. }
  99. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
  100. dirHash := util.HashStringToLong(mountedDir)
  101. // 1. specified by timeAgo
  102. // 2. last offset timestamp for this directory
  103. // 3. directory creation time
  104. var lastOffsetTs time.Time
  105. if *option.timeAgo == 0 {
  106. mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
  107. if err != nil {
  108. return fmt.Errorf("lookup %s: %v", mountedDir, err)
  109. }
  110. lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
  111. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  112. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  113. glog.V(0).Infof("resume from %v", lastOffsetTs)
  114. } else {
  115. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  116. }
  117. } else {
  118. lastOffsetTs = time.Now().Add(-*option.timeAgo)
  119. }
  120. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  121. if err != nil {
  122. return err
  123. }
  124. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  125. message := resp.EventNotification
  126. if message.OldEntry == nil && message.NewEntry == nil {
  127. return nil
  128. }
  129. if message.OldEntry == nil && message.NewEntry != nil {
  130. if len(message.NewEntry.Chunks) == 0 {
  131. return nil
  132. }
  133. fmt.Printf("create: %+v\n", resp)
  134. if !shouldSendToRemote(message.NewEntry) {
  135. fmt.Printf("skipping creating: %+v\n", resp)
  136. return nil
  137. }
  138. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  139. reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
  140. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  141. if writeErr != nil {
  142. return writeErr
  143. }
  144. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  145. }
  146. if message.OldEntry != nil && message.NewEntry == nil {
  147. fmt.Printf("delete: %+v\n", resp)
  148. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  149. return client.DeleteFile(dest)
  150. }
  151. if message.OldEntry != nil && message.NewEntry != nil {
  152. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  153. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  154. if !shouldSendToRemote(message.NewEntry) {
  155. fmt.Printf("skipping updating: %+v\n", resp)
  156. return nil
  157. }
  158. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  159. if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
  160. fmt.Printf("update meta: %+v\n", resp)
  161. return client.UpdateFileMetadata(dest, message.NewEntry)
  162. }
  163. }
  164. fmt.Printf("update: %+v\n", resp)
  165. if err := client.DeleteFile(oldDest); err != nil {
  166. return err
  167. }
  168. reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
  169. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  170. if writeErr != nil {
  171. return writeErr
  172. }
  173. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  174. }
  175. return nil
  176. }
  177. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  178. lastTime := time.Unix(0, lastTsNs)
  179. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  180. return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
  181. })
  182. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
  183. "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  184. }
  185. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
  186. var dest string
  187. source := string(sourcePath[len(mountDir):])
  188. if strings.HasSuffix(remoteMountLocation.Path, "/") {
  189. dest = remoteMountLocation.Path + source[1:]
  190. } else {
  191. dest = remoteMountLocation.Path + source
  192. }
  193. return &filer_pb.RemoteStorageLocation{
  194. Name: remoteMountLocation.Name,
  195. Bucket: remoteMountLocation.Bucket,
  196. Path: dest,
  197. }
  198. }
  199. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  200. if len(a) != len(b) {
  201. return false
  202. }
  203. for i := 0; i < len(a); i++ {
  204. x, y := a[i], b[i]
  205. if !proto.Equal(x, y) {
  206. return false
  207. }
  208. }
  209. return true
  210. }
  211. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  212. if entry.RemoteEntry == nil {
  213. return true
  214. }
  215. if entry.RemoteEntry.Size != int64(filer.FileSize(entry)) {
  216. return true
  217. }
  218. if entry.RemoteEntry.LastModifiedAt < entry.Attributes.Mtime {
  219. return true
  220. }
  221. return false
  222. }
  223. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  224. entry.RemoteEntry = remoteEntry
  225. return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  226. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  227. Directory: dir,
  228. Entry: entry,
  229. })
  230. return err
  231. })
  232. }