You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

295 lines
11 KiB

2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
1 year ago
1 year ago
2 years ago
1 year ago
2 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  15. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "google.golang.org/grpc"
  18. "google.golang.org/protobuf/proto"
  19. )
  20. func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
  21. // read filer remote storage mount mappings
  22. _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
  23. if detectErr != nil {
  24. return fmt.Errorf("read mount info: %v", detectErr)
  25. }
  26. eachEntryFunc, err := option.makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
  27. if err != nil {
  28. return err
  29. }
  30. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
  31. processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
  32. var lastLogTsNs = time.Now().UnixNano()
  33. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  34. if resp.EventNotification.NewEntry != nil {
  35. if *option.storageClass == "" {
  36. if _, ok := resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass]; ok {
  37. delete(resp.EventNotification.NewEntry.Extended, s3_constants.AmzStorageClass)
  38. }
  39. } else {
  40. resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass] = []byte(*option.storageClass)
  41. }
  42. }
  43. processor.AddSyncJob(resp)
  44. return nil
  45. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  46. offsetTsNs := processor.processedTsWatermark.Load()
  47. if offsetTsNs == 0 {
  48. return nil
  49. }
  50. // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
  51. now := time.Now().UnixNano()
  52. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  53. lastLogTsNs = now
  54. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, offsetTsNs)
  55. })
  56. option.clientEpoch++
  57. prefix := mountedDir
  58. if !strings.HasSuffix(prefix, "/") {
  59. prefix = prefix + "/"
  60. }
  61. metadataFollowOption := &pb.MetadataFollowOption{
  62. ClientName: "filer.remote.sync",
  63. ClientId: option.clientId,
  64. ClientEpoch: option.clientEpoch,
  65. SelfSignature: 0,
  66. PathPrefix: prefix,
  67. AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
  68. DirectoriesToWatch: nil,
  69. StartTsNs: lastOffsetTs.UnixNano(),
  70. StopTsNs: 0,
  71. EventErrorType: pb.RetryForeverOnError,
  72. }
  73. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  74. }
  75. func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  76. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  77. if err != nil {
  78. return nil, err
  79. }
  80. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  81. message := resp.EventNotification
  82. if message.NewEntry == nil {
  83. return nil
  84. }
  85. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  86. mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  87. if readErr != nil {
  88. return fmt.Errorf("unmarshal mappings: %v", readErr)
  89. }
  90. if remoteLoc, found := mappings.Mappings[mountedDir]; found {
  91. if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
  92. glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
  93. }
  94. } else {
  95. glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
  96. os.Exit(0)
  97. }
  98. }
  99. if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
  100. conf := &remote_pb.RemoteConf{}
  101. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  102. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  103. }
  104. remoteStorage = conf
  105. if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
  106. client = newClient
  107. } else {
  108. return err
  109. }
  110. }
  111. return nil
  112. }
  113. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  114. message := resp.EventNotification
  115. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  116. return handleEtcRemoteChanges(resp)
  117. }
  118. if filer_pb.IsEmpty(resp) {
  119. return nil
  120. }
  121. if filer_pb.IsCreate(resp) {
  122. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  123. return nil
  124. }
  125. if !filer.HasData(message.NewEntry) {
  126. return nil
  127. }
  128. glog.V(2).Infof("create: %+v", resp)
  129. if !shouldSendToRemote(message.NewEntry) {
  130. glog.V(2).Infof("skipping creating: %+v", resp)
  131. return nil
  132. }
  133. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  134. if message.NewEntry.IsDirectory {
  135. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  136. return client.WriteDirectory(dest, message.NewEntry)
  137. }
  138. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  139. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  140. if writeErr != nil {
  141. return writeErr
  142. }
  143. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  144. }
  145. if filer_pb.IsDelete(resp) {
  146. glog.V(2).Infof("delete: %+v", resp)
  147. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  148. if message.OldEntry.IsDirectory {
  149. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  150. return client.RemoveDirectory(dest)
  151. }
  152. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  153. return client.DeleteFile(dest)
  154. }
  155. if message.OldEntry != nil && message.NewEntry != nil {
  156. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  157. return nil
  158. }
  159. oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  160. dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  161. if !shouldSendToRemote(message.NewEntry) {
  162. glog.V(2).Infof("skipping updating: %+v", resp)
  163. return nil
  164. }
  165. if message.NewEntry.IsDirectory {
  166. return client.WriteDirectory(dest, message.NewEntry)
  167. }
  168. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  169. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  170. glog.V(2).Infof("update meta: %+v", resp)
  171. return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
  172. }
  173. }
  174. glog.V(2).Infof("update: %+v", resp)
  175. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  176. if err := client.DeleteFile(oldDest); err != nil {
  177. if isMultipartUploadFile(resp.Directory, message.OldEntry.Name) {
  178. return nil
  179. }
  180. }
  181. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  182. if writeErr != nil {
  183. return writeErr
  184. }
  185. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  186. }
  187. return nil
  188. }
  189. return eachEntryFunc, nil
  190. }
  191. func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
  192. var writeErr error
  193. err = util.Retry("writeFile", func() error {
  194. reader := filer.NewFileReader(filerSource, newEntry)
  195. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  196. remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader)
  197. if writeErr != nil {
  198. return writeErr
  199. }
  200. return nil
  201. })
  202. if err != nil {
  203. glog.Errorf("write to %s: %v", dest, err)
  204. }
  205. return
  206. }
  207. func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
  208. // 1. specified by timeAgo
  209. // 2. last offset timestamp for this directory
  210. // 3. directory creation time
  211. var lastOffsetTs time.Time
  212. if timeAgo == 0 {
  213. mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir))
  214. if err != nil {
  215. glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
  216. return time.Now()
  217. }
  218. lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir)
  219. if mountedDirEntry != nil {
  220. if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
  221. lastOffsetTs = time.Unix(0, lastOffsetTsNs)
  222. glog.V(0).Infof("resume from %v", lastOffsetTs)
  223. } else {
  224. lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
  225. }
  226. } else {
  227. lastOffsetTs = time.Now()
  228. }
  229. } else {
  230. lastOffsetTs = time.Now().Add(-timeAgo)
  231. }
  232. return lastOffsetTs
  233. }
  234. func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
  235. source := string(sourcePath[len(mountDir):])
  236. dest := util.FullPath(remoteMountLocation.Path).Child(source)
  237. return &remote_pb.RemoteStorageLocation{
  238. Name: remoteMountLocation.Name,
  239. Bucket: remoteMountLocation.Bucket,
  240. Path: string(dest),
  241. }
  242. }
  243. func shouldSendToRemote(entry *filer_pb.Entry) bool {
  244. if entry.RemoteEntry == nil {
  245. return true
  246. }
  247. if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
  248. return true
  249. }
  250. return false
  251. }
  252. func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
  253. remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
  254. entry.RemoteEntry = remoteEntry
  255. return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  256. _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
  257. Directory: dir,
  258. Entry: entry,
  259. })
  260. return err
  261. })
  262. }
  263. func isMultipartUploadFile(dir string, name string) bool {
  264. return isMultipartUploadDir(dir) && strings.HasSuffix(name, ".part")
  265. }
  266. func isMultipartUploadDir(dir string) bool {
  267. return strings.HasPrefix(dir, "/buckets/") &&
  268. strings.Contains(dir, "/"+s3_constants.MultipartUploadsFolder+"/")
  269. }