You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

221 lines
8.3 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  10. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. "os"
  15. "strings"
  16. "time"
  17. )
  18. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
  19. // read filer remote storage mount mappings
  20. _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
  21. if detectErr != nil {
  22. return fmt.Errorf("read mount info: %v", detectErr)
  23. }
  24. eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
  25. if err != nil {
  26. return err
  27. }
  28. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  29. lastTime := time.Unix(0, lastTsNs)
  30. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  31. return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
  32. })
  33. lastOffsetTs := collectLastSyncOffset(option, mountedDir)
  34. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
  35. mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  36. }
  37. func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  38. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  39. if err != nil {
  40. return nil, err
  41. }
  42. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  43. message := resp.EventNotification
  44. if message.NewEntry == nil {
  45. return nil
  46. }
  47. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  48. mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  49. if readErr != nil {
  50. return fmt.Errorf("unmarshal mappings: %v", readErr)
  51. }
  52. if remoteLoc, found := mappings.Mappings[mountedDir]; found {
  53. if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
  54. glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
  55. }
  56. } else {
  57. glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
  58. os.Exit(0)
  59. }
  60. }
  61. if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
  62. conf := &remote_pb.RemoteConf{}
  63. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  64. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  65. }
  66. remoteStorage = conf
  67. if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
  68. client = newClient
  69. } else {
  70. return err
  71. }
  72. }
  73. return nil
  74. }
  75. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  76. message := resp.EventNotification
  77. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  78. return handleEtcRemoteChanges(resp)
  79. }
  80. if message.OldEntry == nil && message.NewEntry == nil {
  81. return nil
  82. }
  83. if message.OldEntry == nil && message.NewEntry != nil {
  84. if !filer.HasData(message.NewEntry) {
  85. return nil
  86. }
  87. glog.V(2).Infof("create: %+v", resp)
  88. if !shouldSendToRemote(message.NewEntry) {
  89. glog.V(2).Infof("skipping creating: %+v", resp)
  90. return nil
  91. }
  92. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  93. if message.NewEntry.IsDirectory {
  94. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  95. return client.WriteDirectory(dest, message.NewEntry)
  96. }
  97. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  98. reader := filer.NewFileReader(filerSource, message.NewEntry)
  99. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  100. if writeErr != nil {
  101. return writeErr
  102. }
  103. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  104. }
  105. if message.OldEntry != nil && message.NewEntry == nil {
  106. glog.V(2).Infof("delete: %+v", resp)
  107. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  108. if message.OldEntry.IsDirectory {
  109. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  110. return client.RemoveDirectory(dest)
  111. }
  112. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  113. return client.DeleteFile(dest)
  114. }
  115. if message.OldEntry != nil && message.NewEntry != nil {
  116. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  117. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  118. if !shouldSendToRemote(message.NewEntry) {
  119. glog.V(2).Infof("skipping updating: %+v", resp)
  120. return nil
  121. }
  122. if message.NewEntry.IsDirectory {
  123. return client.WriteDirectory(dest, message.NewEntry)
  124. }
  125. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  126. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  127. glog.V(2).Infof("update meta: %+v", resp)
  128. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  129. }
  130. }
  131. glog.V(2).Infof("update: %+v", resp)
  132. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  133. if err := client.DeleteFile(oldDest); err != nil {
  134. return err
  135. }
  136. reader := filer.NewFileReader(filerSource, message.NewEntry)
  137. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  138. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  139. if writeErr != nil {
  140. return writeErr
  141. }
  142. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  143. }
  144. return nil
  145. }
  146. return eachEntryFunc, nil
  147. }
  148. func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time {
  149. // 1. specified by timeAgo
  150. // 2. last offset timestamp for this directory
  151. // 3. directory creation time
  152. var lastOffsetTs time.Time
  153. if *option.timeAgo == 0 {
  154. mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
  155. if err != nil {
  156. glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
  157. return time.Now()
  158. }
  159. lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
  160. if mountedDirEntry != nil {
  161. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  162. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  163. glog.V(0).Infof("resume from %v", lastOffsetTs)
  164. } else {
  165. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  166. }
  167. } else {
  168. lastOffsetTs = time.Now()
  169. }
  170. } else {
  171. lastOffsetTs = time.Now().Add(-*option.timeAgo)
  172. }
  173. return lastOffsetTs
  174. }
  175. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
  176. source := string(sourcePath[len(mountDir):])
  177. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  178. return &remote_pb.RemoteStorageLocation{
  179. Name: remoteMountLocation.Name,
  180. Bucket: remoteMountLocation.Bucket,
  181. Path: string(dest),
  182. }
  183. }
  184. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  185. if entry.RemoteEntry == nil {
  186. return true
  187. }
  188. if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
  189. return true
  190. }
  191. return false
  192. }
  193. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  194. remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
  195. entry.RemoteEntry = remoteEntry
  196. return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  197. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  198. Directory: dir,
  199. Entry: entry,
  200. })
  201. return err
  202. })
  203. }