You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
7.7 KiB

4 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "reflect"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. metaBackup FilerMetaBackupOptions
  18. )
  19. type FilerMetaBackupOptions struct {
  20. grpcDialOption grpc.DialOption
  21. filerAddress *string
  22. filerDirectory *string
  23. restart *bool
  24. backupFilerConfig *string
  25. store filer.FilerStore
  26. }
  27. func init() {
  28. cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
  29. metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
  30. metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
  31. metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
  32. metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
  33. }
  34. var cmdFilerMetaBackup = &Command{
  35. UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
  36. Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
  37. Long: `continuously backup filer meta data changes.
  38. The backup writes to another filer store specified in a backup_filer.toml.
  39. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
  40. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
  41. `,
  42. }
  43. func runFilerMetaBackup(cmd *Command, args []string) bool {
  44. util.LoadConfiguration("security", false)
  45. metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  46. // load backup_filer.toml
  47. v := viper.New()
  48. v.SetConfigFile(*metaBackup.backupFilerConfig)
  49. if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
  50. glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
  51. " weed scaffold -config=%s -output=.\n\n\n",
  52. *metaBackup.backupFilerConfig, "backup_filer", "filer")
  53. }
  54. if err := metaBackup.initStore(v); err != nil {
  55. glog.V(0).Infof("init backup filer store: %v", err)
  56. return true
  57. }
  58. missingPreviousBackup := false
  59. _, err := metaBackup.getOffset()
  60. if err != nil {
  61. missingPreviousBackup = true
  62. }
  63. if *metaBackup.restart || missingPreviousBackup {
  64. glog.V(0).Infof("traversing metadata tree...")
  65. startTime := time.Now()
  66. if err := metaBackup.traverseMetadata(); err != nil {
  67. glog.Errorf("traverse meta data: %v", err)
  68. return true
  69. }
  70. glog.V(0).Infof("metadata copied up to %v", startTime)
  71. if err := metaBackup.setOffset(startTime); err != nil {
  72. startTime = time.Now()
  73. }
  74. }
  75. for {
  76. err := metaBackup.streamMetadataBackup()
  77. if err != nil {
  78. glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
  79. time.Sleep(1747 * time.Millisecond)
  80. }
  81. }
  82. return true
  83. }
  84. func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
  85. // load configuration for default filer store
  86. hasDefaultStoreConfigured := false
  87. for _, store := range filer.Stores {
  88. if v.GetBool(store.GetName() + ".enabled") {
  89. store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
  90. if err := store.Initialize(v, store.GetName()+"."); err != nil {
  91. glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
  92. }
  93. glog.V(0).Infof("configured filer store to %s", store.GetName())
  94. hasDefaultStoreConfigured = true
  95. metaBackup.store = filer.NewFilerStoreWrapper(store)
  96. break
  97. }
  98. }
  99. if !hasDefaultStoreConfigured {
  100. return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
  101. }
  102. return nil
  103. }
  104. func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
  105. var saveErr error
  106. traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  107. println("+", parentPath.Child(entry.Name))
  108. if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
  109. saveErr = fmt.Errorf("insert entry error: %v\n", err)
  110. return
  111. }
  112. })
  113. if traverseErr != nil {
  114. return fmt.Errorf("traverse: %v", traverseErr)
  115. }
  116. return saveErr
  117. }
  118. var (
  119. MetaBackupKey = []byte("metaBackup")
  120. )
  121. func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
  122. startTime, err := metaBackup.getOffset()
  123. if err != nil {
  124. startTime = time.Now()
  125. }
  126. glog.V(0).Infof("streaming from %v", startTime)
  127. store := metaBackup.store
  128. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  129. ctx := context.Background()
  130. message := resp.EventNotification
  131. if message.OldEntry == nil && message.NewEntry == nil {
  132. return nil
  133. }
  134. if message.OldEntry == nil && message.NewEntry != nil {
  135. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  136. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  137. return store.InsertEntry(ctx, entry)
  138. }
  139. if message.OldEntry != nil && message.NewEntry == nil {
  140. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  141. return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  142. }
  143. if message.OldEntry != nil && message.NewEntry != nil {
  144. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  145. println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  146. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  147. return store.UpdateEntry(ctx, entry)
  148. }
  149. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  150. if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
  151. return err
  152. }
  153. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  154. return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
  155. }
  156. return nil
  157. }
  158. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  159. lastTime := time.Unix(0, lastTsNs)
  160. glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
  161. return metaBackup.setOffset(lastTime)
  162. })
  163. return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup",
  164. *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
  165. }
  166. func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
  167. value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
  168. if err != nil {
  169. return
  170. }
  171. tsNs := util.BytesToUint64(value)
  172. return time.Unix(0, int64(tsNs)), nil
  173. }
  174. func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
  175. valueBuf := make([]byte, 8)
  176. util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
  177. if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
  178. return err
  179. }
  180. return nil
  181. }
  182. var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
  183. func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  184. return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  185. return fn(client)
  186. })
  187. }
  188. func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
  189. return location.Url
  190. }