You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
11 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  10. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/golang/protobuf/proto"
  15. "google.golang.org/grpc"
  16. "os"
  17. "strings"
  18. "time"
  19. )
  20. type RemoteSyncOptions struct {
  21. filerAddress *string
  22. grpcDialOption grpc.DialOption
  23. readChunkFromFiler *bool
  24. debug *bool
  25. timeAgo *time.Duration
  26. dir *string
  27. }
  28. const (
  29. RemoteSyncKeyPrefix = "remote.sync."
  30. )
  31. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  32. func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  33. return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  34. return fn(client)
  35. })
  36. }
  37. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  38. return location.Url
  39. }
  40. var (
  41. remoteSyncOptions RemoteSyncOptions
  42. )
  43. func init() {
  44. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  45. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  46. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
  47. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  48. remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
  49. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  50. }
  51. var cmdFilerRemoteSynchronize = &Command{
  52. UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
  53. Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
  54. Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
  55. filer.remote.sync listens on filer update events.
  56. If any mounted remote file is updated, it will fetch the updated content,
  57. and write to the remote storage.
  58. `,
  59. }
  60. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  61. util.LoadConfiguration("security", false)
  62. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  63. remoteSyncOptions.grpcDialOption = grpcDialOption
  64. dir := *remoteSyncOptions.dir
  65. filerAddress := *remoteSyncOptions.filerAddress
  66. filerSource := &source.FilerSource{}
  67. filerSource.DoInitialize(
  68. filerAddress,
  69. pb.ServerToGrpcAddress(filerAddress),
  70. "/", // does not matter
  71. *remoteSyncOptions.readChunkFromFiler,
  72. )
  73. fmt.Printf("synchronize %s to remote storage...\n", dir)
  74. util.RetryForever("filer.remote.sync "+dir, func() error {
  75. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
  76. }, func(err error) bool {
  77. if err != nil {
  78. glog.Errorf("synchronize %s: %v", dir, err)
  79. }
  80. return true
  81. })
  82. return true
  83. }
  84. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
  85. // read filer remote storage mount mappings
  86. _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
  87. if detectErr != nil {
  88. return fmt.Errorf("read mount info: %v", detectErr)
  89. }
  90. eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
  91. if err != nil {
  92. return err
  93. }
  94. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  95. lastTime := time.Unix(0, lastTsNs)
  96. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  97. return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
  98. })
  99. lastOffsetTs := collectLastSyncOffset(option, mountedDir)
  100. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
  101. mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  102. }
  103. func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  104. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  105. if err != nil {
  106. return nil, err
  107. }
  108. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  109. message := resp.EventNotification
  110. if message.NewEntry == nil {
  111. return nil
  112. }
  113. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  114. mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  115. if readErr != nil {
  116. return fmt.Errorf("unmarshal mappings: %v", readErr)
  117. }
  118. if remoteLoc, found := mappings.Mappings[mountedDir]; found {
  119. if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
  120. glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
  121. }
  122. } else {
  123. glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
  124. os.Exit(0)
  125. }
  126. }
  127. if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
  128. conf := &remote_pb.RemoteConf{}
  129. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  130. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  131. }
  132. remoteStorage = conf
  133. if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
  134. client = newClient
  135. } else {
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  142. message := resp.EventNotification
  143. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  144. return handleEtcRemoteChanges(resp)
  145. }
  146. if message.OldEntry == nil && message.NewEntry == nil {
  147. return nil
  148. }
  149. if message.OldEntry == nil && message.NewEntry != nil {
  150. if !filer.HasData(message.NewEntry) {
  151. return nil
  152. }
  153. glog.V(2).Infof("create: %+v", resp)
  154. if !shouldSendToRemote(message.NewEntry) {
  155. glog.V(2).Infof("skipping creating: %+v", resp)
  156. return nil
  157. }
  158. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  159. if message.NewEntry.IsDirectory {
  160. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  161. return client.WriteDirectory(dest, message.NewEntry)
  162. }
  163. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  164. reader := filer.NewFileReader(filerSource, message.NewEntry)
  165. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  166. if writeErr != nil {
  167. return writeErr
  168. }
  169. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  170. }
  171. if message.OldEntry != nil && message.NewEntry == nil {
  172. glog.V(2).Infof("delete: %+v", resp)
  173. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  174. if message.OldEntry.IsDirectory {
  175. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  176. return client.RemoveDirectory(dest)
  177. }
  178. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  179. return client.DeleteFile(dest)
  180. }
  181. if message.OldEntry != nil && message.NewEntry != nil {
  182. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  183. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  184. if !shouldSendToRemote(message.NewEntry) {
  185. glog.V(2).Infof("skipping updating: %+v", resp)
  186. return nil
  187. }
  188. if message.NewEntry.IsDirectory {
  189. return client.WriteDirectory(dest, message.NewEntry)
  190. }
  191. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  192. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  193. glog.V(2).Infof("update meta: %+v", resp)
  194. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  195. }
  196. }
  197. glog.V(2).Infof("update: %+v", resp)
  198. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  199. if err := client.DeleteFile(oldDest); err != nil {
  200. return err
  201. }
  202. reader := filer.NewFileReader(filerSource, message.NewEntry)
  203. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  204. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  205. if writeErr != nil {
  206. return writeErr
  207. }
  208. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  209. }
  210. return nil
  211. }
  212. return eachEntryFunc, nil
  213. }
  214. func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time {
  215. // 1. specified by timeAgo
  216. // 2. last offset timestamp for this directory
  217. // 3. directory creation time
  218. var lastOffsetTs time.Time
  219. if *option.timeAgo == 0 {
  220. mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
  221. if err != nil {
  222. glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
  223. return time.Now()
  224. }
  225. lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
  226. if mountedDirEntry != nil {
  227. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  228. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  229. glog.V(0).Infof("resume from %v", lastOffsetTs)
  230. } else {
  231. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  232. }
  233. } else {
  234. lastOffsetTs = time.Now()
  235. }
  236. } else {
  237. lastOffsetTs = time.Now().Add(-*option.timeAgo)
  238. }
  239. return lastOffsetTs
  240. }
  241. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
  242. source := string(sourcePath[len(mountDir):])
  243. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  244. return &remote_pb.RemoteStorageLocation{
  245. Name: remoteMountLocation.Name,
  246. Bucket: remoteMountLocation.Bucket,
  247. Path: string(dest),
  248. }
  249. }
  250. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  251. if entry.RemoteEntry == nil {
  252. return true
  253. }
  254. if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
  255. return true
  256. }
  257. return false
  258. }
  259. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  260. entry.RemoteEntry = remoteEntry
  261. return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  262. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  263. Directory: dir,
  264. Entry: entry,
  265. })
  266. return err
  267. })
  268. }