You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

222 lines
8.6 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  10. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. "google.golang.org/grpc"
  15. "os"
  16. "strings"
  17. "time"
  18. )
  19. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
  20. // read filer remote storage mount mappings
  21. _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
  22. if detectErr != nil {
  23. return fmt.Errorf("read mount info: %v", detectErr)
  24. }
  25. eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
  26. if err != nil {
  27. return err
  28. }
  29. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  30. lastTime := time.Unix(0, lastTsNs)
  31. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  32. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs)
  33. })
  34. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
  35. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
  36. mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  37. }
  38. func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  39. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  40. if err != nil {
  41. return nil, err
  42. }
  43. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  44. message := resp.EventNotification
  45. if message.NewEntry == nil {
  46. return nil
  47. }
  48. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  49. mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  50. if readErr != nil {
  51. return fmt.Errorf("unmarshal mappings: %v", readErr)
  52. }
  53. if remoteLoc, found := mappings.Mappings[mountedDir]; found {
  54. if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
  55. glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
  56. }
  57. } else {
  58. glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
  59. os.Exit(0)
  60. }
  61. }
  62. if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
  63. conf := &remote_pb.RemoteConf{}
  64. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  65. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  66. }
  67. remoteStorage = conf
  68. if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
  69. client = newClient
  70. } else {
  71. return err
  72. }
  73. }
  74. return nil
  75. }
  76. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  77. message := resp.EventNotification
  78. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  79. return handleEtcRemoteChanges(resp)
  80. }
  81. if message.OldEntry == nil && message.NewEntry == nil {
  82. return nil
  83. }
  84. if message.OldEntry == nil && message.NewEntry != nil {
  85. if !filer.HasData(message.NewEntry) {
  86. return nil
  87. }
  88. glog.V(2).Infof("create: %+v", resp)
  89. if !shouldSendToRemote(message.NewEntry) {
  90. glog.V(2).Infof("skipping creating: %+v", resp)
  91. return nil
  92. }
  93. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  94. if message.NewEntry.IsDirectory {
  95. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  96. return client.WriteDirectory(dest, message.NewEntry)
  97. }
  98. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  99. reader := filer.NewFileReader(filerSource, message.NewEntry)
  100. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  101. if writeErr != nil {
  102. return writeErr
  103. }
  104. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  105. }
  106. if message.OldEntry != nil && message.NewEntry == nil {
  107. glog.V(2).Infof("delete: %+v", resp)
  108. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  109. if message.OldEntry.IsDirectory {
  110. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  111. return client.RemoveDirectory(dest)
  112. }
  113. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  114. return client.DeleteFile(dest)
  115. }
  116. if message.OldEntry != nil && message.NewEntry != nil {
  117. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  118. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  119. if !shouldSendToRemote(message.NewEntry) {
  120. glog.V(2).Infof("skipping updating: %+v", resp)
  121. return nil
  122. }
  123. if message.NewEntry.IsDirectory {
  124. return client.WriteDirectory(dest, message.NewEntry)
  125. }
  126. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  127. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  128. glog.V(2).Infof("update meta: %+v", resp)
  129. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  130. }
  131. }
  132. glog.V(2).Infof("update: %+v", resp)
  133. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  134. if err := client.DeleteFile(oldDest); err != nil {
  135. return err
  136. }
  137. reader := filer.NewFileReader(filerSource, message.NewEntry)
  138. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  139. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  140. if writeErr != nil {
  141. return writeErr
  142. }
  143. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  144. }
  145. return nil
  146. }
  147. return eachEntryFunc, nil
  148. }
  149. func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
  150. // 1. specified by timeAgo
  151. // 2. last offset timestamp for this directory
  152. // 3. directory creation time
  153. var lastOffsetTs time.Time
  154. if timeAgo == 0 {
  155. mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir))
  156. if err != nil {
  157. glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
  158. return time.Now()
  159. }
  160. lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir)
  161. if mountedDirEntry != nil {
  162. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  163. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  164. glog.V(0).Infof("resume from %v", lastOffsetTs)
  165. } else {
  166. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  167. }
  168. } else {
  169. lastOffsetTs = time.Now()
  170. }
  171. } else {
  172. lastOffsetTs = time.Now().Add(-timeAgo)
  173. }
  174. return lastOffsetTs
  175. }
  176. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
  177. source := string(sourcePath[len(mountDir):])
  178. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  179. return &remote_pb.RemoteStorageLocation{
  180. Name: remoteMountLocation.Name,
  181. Bucket: remoteMountLocation.Bucket,
  182. Path: string(dest),
  183. }
  184. }
  185. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  186. if entry.RemoteEntry == nil {
  187. return true
  188. }
  189. if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
  190. return true
  191. }
  192. return false
  193. }
  194. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  195. remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
  196. entry.RemoteEntry = remoteEntry
  197. return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  198. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  199. Directory: dir,
  200. Entry: entry,
  201. })
  202. return err
  203. })
  204. }