You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

348 lines
12 KiB

  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  10. "github.com/chrislusf/seaweedfs/weed/replication/source"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "math"
  14. "strings"
  15. "time"
  16. )
  17. func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  18. // read filer remote storage mount mappings
  19. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  20. return fmt.Errorf("read mount info: %v", detectErr)
  21. }
  22. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  23. if err != nil {
  24. return err
  25. }
  26. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  27. lastTime := time.Unix(0, lastTsNs)
  28. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
  29. return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs)
  30. })
  31. lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
  32. return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
  33. option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
  34. }
  35. func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  36. handleCreateBucket := func(entry *filer_pb.Entry) error {
  37. if !entry.IsDirectory {
  38. return nil
  39. }
  40. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  41. if !found {
  42. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  43. }
  44. client, err := remote_storage.GetRemoteStorage(remoteConf)
  45. if err != nil {
  46. return err
  47. }
  48. glog.V(0).Infof("create bucket %s", entry.Name)
  49. if err := client.CreateBucket(entry.Name); err != nil {
  50. return err
  51. }
  52. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  53. remoteLocation := &remote_pb.RemoteStorageLocation{
  54. Name: *option.createBucketAt,
  55. Bucket: entry.Name,
  56. Path: "/",
  57. }
  58. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  59. }
  60. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  61. if !entry.IsDirectory {
  62. return nil
  63. }
  64. client, err := option.findRemoteStorageClient(entry.Name)
  65. if err != nil {
  66. return err
  67. }
  68. glog.V(0).Infof("delete bucket %s", entry.Name)
  69. if err := client.DeleteBucket(entry.Name); err != nil {
  70. return err
  71. }
  72. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  73. return filer.DeleteMountMapping(option, string(bucketPath))
  74. }
  75. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  76. message := resp.EventNotification
  77. if message.NewEntry != nil {
  78. // update
  79. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  80. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  81. if readErr != nil {
  82. return fmt.Errorf("unmarshal mappings: %v", readErr)
  83. }
  84. option.mappings = newMappings
  85. }
  86. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  87. conf := &remote_pb.RemoteConf{}
  88. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  89. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  90. }
  91. option.remoteConfs[conf.Name] = conf
  92. }
  93. } else if message.OldEntry != nil {
  94. // deletion
  95. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  96. conf := &remote_pb.RemoteConf{}
  97. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  98. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  99. }
  100. delete(option.remoteConfs, conf.Name)
  101. }
  102. }
  103. return nil
  104. }
  105. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  106. message := resp.EventNotification
  107. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  108. return handleEtcRemoteChanges(resp)
  109. }
  110. if message.OldEntry == nil && message.NewEntry == nil {
  111. return nil
  112. }
  113. if message.OldEntry == nil && message.NewEntry != nil {
  114. if message.NewParentPath == option.bucketsDir {
  115. return handleCreateBucket(message.NewEntry)
  116. }
  117. if !filer.HasData(message.NewEntry) {
  118. return nil
  119. }
  120. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  121. if !ok {
  122. return nil
  123. }
  124. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  125. if err != nil {
  126. return err
  127. }
  128. glog.V(2).Infof("create: %+v", resp)
  129. if !shouldSendToRemote(message.NewEntry) {
  130. glog.V(2).Infof("skipping creating: %+v", resp)
  131. return nil
  132. }
  133. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  134. if message.NewEntry.IsDirectory {
  135. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  136. return client.WriteDirectory(dest, message.NewEntry)
  137. }
  138. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  139. reader := filer.NewFileReader(filerSource, message.NewEntry)
  140. remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
  141. if writeErr != nil {
  142. return writeErr
  143. }
  144. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  145. }
  146. if message.OldEntry != nil && message.NewEntry == nil {
  147. if resp.Directory == option.bucketsDir {
  148. return handleDeleteBucket(message.OldEntry)
  149. }
  150. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  151. if !ok {
  152. return nil
  153. }
  154. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  155. if err != nil {
  156. return err
  157. }
  158. glog.V(2).Infof("delete: %+v", resp)
  159. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  160. if message.OldEntry.IsDirectory {
  161. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  162. return client.RemoveDirectory(dest)
  163. }
  164. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  165. return client.DeleteFile(dest)
  166. }
  167. if message.OldEntry != nil && message.NewEntry != nil {
  168. if resp.Directory == option.bucketsDir {
  169. if message.NewParentPath == option.bucketsDir {
  170. if message.OldEntry.Name == message.NewEntry.Name {
  171. return nil
  172. }
  173. if err := handleCreateBucket(message.NewEntry); err != nil {
  174. return err
  175. }
  176. if err := handleDeleteBucket(message.OldEntry); err != nil {
  177. return err
  178. }
  179. }
  180. }
  181. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  182. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  183. if oldOk && newOk {
  184. if !shouldSendToRemote(message.NewEntry) {
  185. glog.V(2).Infof("skipping updating: %+v", resp)
  186. return nil
  187. }
  188. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  189. if err != nil {
  190. return err
  191. }
  192. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  193. // update the same entry
  194. if message.NewEntry.IsDirectory {
  195. // update directory property
  196. return nil
  197. }
  198. if filer.IsSameData(message.OldEntry, message.NewEntry) {
  199. glog.V(2).Infof("update meta: %+v", resp)
  200. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  201. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  202. } else {
  203. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  204. reader := filer.NewFileReader(filerSource, message.NewEntry)
  205. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  206. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  207. if writeErr != nil {
  208. return writeErr
  209. }
  210. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  211. }
  212. }
  213. }
  214. // the following is entry rename
  215. if oldOk {
  216. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  217. if err != nil {
  218. return err
  219. }
  220. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  221. if message.OldEntry.IsDirectory {
  222. return client.RemoveDirectory(oldDest)
  223. }
  224. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  225. if err := client.DeleteFile(oldDest); err != nil {
  226. return err
  227. }
  228. }
  229. if newOk {
  230. if !shouldSendToRemote(message.NewEntry) {
  231. glog.V(2).Infof("skipping updating: %+v", resp)
  232. return nil
  233. }
  234. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  235. if err != nil {
  236. return err
  237. }
  238. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  239. if message.NewEntry.IsDirectory {
  240. return client.WriteDirectory(newDest, message.NewEntry)
  241. }
  242. reader := filer.NewFileReader(filerSource, message.NewEntry)
  243. glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
  244. remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
  245. if writeErr != nil {
  246. return writeErr
  247. }
  248. return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
  249. }
  250. }
  251. return nil
  252. }
  253. return eachEntryFunc, nil
  254. }
  255. func (option *RemoteSyncOptions)findRemoteStorageClient(bucketName string) (remote_storage.RemoteStorageClient, error) {
  256. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  257. remoteStorageMountLocation, isMounted := option.mappings.Mappings[string(bucket)]
  258. if !isMounted {
  259. return nil, fmt.Errorf("%s is not mounted", bucket)
  260. }
  261. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  262. if !hasClient {
  263. return nil, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  264. }
  265. client, err := remote_storage.GetRemoteStorage(remoteConf)
  266. if err != nil {
  267. return nil, err
  268. }
  269. return client, nil
  270. }
  271. func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  272. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  273. if !ok {
  274. return "", nil, nil, false
  275. }
  276. var isMounted bool
  277. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  278. if !isMounted {
  279. glog.Warningf("%s is not mounted", bucket)
  280. return "", nil, nil, false
  281. }
  282. var hasClient bool
  283. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  284. if !hasClient {
  285. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  286. return "", nil, nil, false
  287. }
  288. return bucket, remoteStorageMountLocation, remoteConf, true
  289. }
  290. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  291. if !strings.HasPrefix(dir, bucketsDir+"/") {
  292. return "", false
  293. }
  294. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  295. return util.FullPath(bucketsDir).Child(parts[0]), true
  296. }
  297. func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
  298. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil {
  299. return err
  300. } else {
  301. option.mappings = mappings
  302. }
  303. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  304. err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  305. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  306. return nil
  307. }
  308. conf := &remote_pb.RemoteConf{}
  309. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  310. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  311. }
  312. option.remoteConfs[conf.Name] = conf
  313. return nil
  314. }, "", false, math.MaxUint32)
  315. return
  316. }