You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
9.9 KiB

2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  15. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "google.golang.org/grpc"
  18. "google.golang.org/protobuf/proto"
  19. )
  20. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
  21. // read filer remote storage mount mappings
  22. _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
  23. if detectErr != nil {
  24. return fmt.Errorf("read mount info: %v", detectErr)
  25. }
  26. eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
  27. if err != nil {
  28. return err
  29. }
  30. processor := NewMetadataProcessor(eachEntryFunc, 128)
  31. var lastLogTsNs = time.Now().UnixNano()
  32. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  33. if *option.storageClass == "" {
  34. delete(resp.EventNotification.NewEntry.Extended, s3_constants.AmzStorageClass)
  35. } else {
  36. resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass] = []byte(*option.storageClass)
  37. }
  38. processor.AddSyncJob(resp)
  39. return nil
  40. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  41. if processor.processedTsWatermark == 0 {
  42. return nil
  43. }
  44. // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
  45. now := time.Now().UnixNano()
  46. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  47. lastLogTsNs = now
  48. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, processor.processedTsWatermark)
  49. })
  50. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
  51. option.clientEpoch++
  52. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
  53. mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
  54. }
  55. func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  56. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  57. if err != nil {
  58. return nil, err
  59. }
  60. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  61. message := resp.EventNotification
  62. if message.NewEntry == nil {
  63. return nil
  64. }
  65. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  66. mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  67. if readErr != nil {
  68. return fmt.Errorf("unmarshal mappings: %v", readErr)
  69. }
  70. if remoteLoc, found := mappings.Mappings[mountedDir]; found {
  71. if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
  72. glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
  73. }
  74. } else {
  75. glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
  76. os.Exit(0)
  77. }
  78. }
  79. if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
  80. conf := &remote_pb.RemoteConf{}
  81. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  82. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  83. }
  84. remoteStorage = conf
  85. if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
  86. client = newClient
  87. } else {
  88. return err
  89. }
  90. }
  91. return nil
  92. }
  93. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  94. message := resp.EventNotification
  95. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  96. return handleEtcRemoteChanges(resp)
  97. }
  98. if filer_pb.IsEmpty(resp) {
  99. return nil
  100. }
  101. if filer_pb.IsCreate(resp) {
  102. if strings.Contains(message.NewParentPath, "/"+s3_constants.MultipartUploadsFolder+"/") {
  103. return nil
  104. }
  105. if !filer.HasData(message.NewEntry) {
  106. return nil
  107. }
  108. glog.V(2).Infof("create: %+v", resp)
  109. if !shouldSendToRemote(message.NewEntry) {
  110. glog.V(2).Infof("skipping creating: %+v", resp)
  111. return nil
  112. }
  113. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  114. if message.NewEntry.IsDirectory {
  115. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  116. return client.WriteDirectory(dest, message.NewEntry)
  117. }
  118. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  119. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  120. if writeErr != nil {
  121. return writeErr
  122. }
  123. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  124. }
  125. if filer_pb.IsDelete(resp) {
  126. glog.V(2).Infof("delete: %+v", resp)
  127. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  128. if message.OldEntry.IsDirectory {
  129. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  130. return client.RemoveDirectory(dest)
  131. }
  132. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  133. return client.DeleteFile(dest)
  134. }
  135. if message.OldEntry != nil && message.NewEntry != nil {
  136. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  137. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  138. if !shouldSendToRemote(message.NewEntry) {
  139. glog.V(2).Infof("skipping updating: %+v", resp)
  140. return nil
  141. }
  142. if message.NewEntry.IsDirectory {
  143. return client.WriteDirectory(dest, message.NewEntry)
  144. }
  145. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  146. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  147. glog.V(2).Infof("update meta: %+v", resp)
  148. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  149. }
  150. }
  151. glog.V(2).Infof("update: %+v", resp)
  152. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  153. if err := client.DeleteFile(oldDest); err != nil {
  154. if !strings.Contains(resp.Directory, "/"+s3_constants.MultipartUploadsFolder+"/") {
  155. return err
  156. }
  157. }
  158. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  159. if writeErr != nil {
  160. return writeErr
  161. }
  162. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  163. }
  164. return nil
  165. }
  166. return eachEntryFunc, nil
  167. }
  168. func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
  169. var writeErr error
  170. err = util.Retry("writeFile", func() error {
  171. reader := filer.NewFileReader(filerSource, newEntry)
  172. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  173. remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader)
  174. if writeErr != nil {
  175. return writeErr
  176. }
  177. return nil
  178. })
  179. if err != nil {
  180. glog.Errorf("write to %s: %v", dest, err)
  181. }
  182. return
  183. }
  184. func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
  185. // 1. specified by timeAgo
  186. // 2. last offset timestamp for this directory
  187. // 3. directory creation time
  188. var lastOffsetTs time.Time
  189. if timeAgo == 0 {
  190. mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir))
  191. if err != nil {
  192. glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
  193. return time.Now()
  194. }
  195. lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir)
  196. if mountedDirEntry != nil {
  197. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  198. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  199. glog.V(0).Infof("resume from %v", lastOffsetTs)
  200. } else {
  201. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  202. }
  203. } else {
  204. lastOffsetTs = time.Now()
  205. }
  206. } else {
  207. lastOffsetTs = time.Now().Add(-timeAgo)
  208. }
  209. return lastOffsetTs
  210. }
  211. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
  212. source := string(sourcePath[len(mountDir):])
  213. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  214. return &remote_pb.RemoteStorageLocation{
  215. Name: remoteMountLocation.Name,
  216. Bucket: remoteMountLocation.Bucket,
  217. Path: string(dest),
  218. }
  219. }
  220. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  221. if entry.RemoteEntry == nil {
  222. return true
  223. }
  224. if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
  225. return true
  226. }
  227. return false
  228. }
  229. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  230. remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
  231. entry.RemoteEntry = remoteEntry
  232. return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  233. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  234. Directory: dir,
  235. Entry: entry,
  236. })
  237. return err
  238. })
  239. }