You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
7.8 KiB

4 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "reflect"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. metaBackup FilerMetaBackupOptions
  18. )
  19. type FilerMetaBackupOptions struct {
  20. grpcDialOption grpc.DialOption
  21. filerAddress *string
  22. filerDirectory *string
  23. restart *bool
  24. backupFilerConfig *string
  25. store filer.FilerStore
  26. clientId int32
  27. }
  28. func init() {
  29. cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
  30. metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
  31. metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
  32. metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
  33. metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
  34. metaBackup.clientId = util.RandomInt32()
  35. }
  36. var cmdFilerMetaBackup = &Command{
  37. UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
  38. Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
  39. Long: `continuously backup filer meta data changes.
  40. The backup writes to another filer store specified in a backup_filer.toml.
  41. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
  42. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
  43. `,
  44. }
  45. func runFilerMetaBackup(cmd *Command, args []string) bool {
  46. util.LoadConfiguration("security", false)
  47. metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  48. // load backup_filer.toml
  49. v := viper.New()
  50. v.SetConfigFile(*metaBackup.backupFilerConfig)
  51. if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
  52. glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
  53. " weed scaffold -config=%s -output=.\n\n\n",
  54. *metaBackup.backupFilerConfig, "backup_filer", "filer")
  55. }
  56. if err := metaBackup.initStore(v); err != nil {
  57. glog.V(0).Infof("init backup filer store: %v", err)
  58. return true
  59. }
  60. missingPreviousBackup := false
  61. _, err := metaBackup.getOffset()
  62. if err != nil {
  63. missingPreviousBackup = true
  64. }
  65. if *metaBackup.restart || missingPreviousBackup {
  66. glog.V(0).Infof("traversing metadata tree...")
  67. startTime := time.Now()
  68. if err := metaBackup.traverseMetadata(); err != nil {
  69. glog.Errorf("traverse meta data: %v", err)
  70. return true
  71. }
  72. glog.V(0).Infof("metadata copied up to %v", startTime)
  73. if err := metaBackup.setOffset(startTime); err != nil {
  74. startTime = time.Now()
  75. }
  76. }
  77. for {
  78. err := metaBackup.streamMetadataBackup()
  79. if err != nil {
  80. glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
  81. time.Sleep(1747 * time.Millisecond)
  82. }
  83. }
  84. return true
  85. }
  86. func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
  87. // load configuration for default filer store
  88. hasDefaultStoreConfigured := false
  89. for _, store := range filer.Stores {
  90. if v.GetBool(store.GetName() + ".enabled") {
  91. store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
  92. if err := store.Initialize(v, store.GetName()+"."); err != nil {
  93. glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
  94. }
  95. glog.V(0).Infof("configured filer store to %s", store.GetName())
  96. hasDefaultStoreConfigured = true
  97. metaBackup.store = filer.NewFilerStoreWrapper(store)
  98. break
  99. }
  100. }
  101. if !hasDefaultStoreConfigured {
  102. return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
  103. }
  104. return nil
  105. }
  106. func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
  107. var saveErr error
  108. traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  109. println("+", parentPath.Child(entry.Name))
  110. if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
  111. saveErr = fmt.Errorf("insert entry error: %v\n", err)
  112. return
  113. }
  114. })
  115. if traverseErr != nil {
  116. return fmt.Errorf("traverse: %v", traverseErr)
  117. }
  118. return saveErr
  119. }
  120. var (
  121. MetaBackupKey = []byte("metaBackup")
  122. )
  123. func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
  124. startTime, err := metaBackup.getOffset()
  125. if err != nil {
  126. startTime = time.Now()
  127. }
  128. glog.V(0).Infof("streaming from %v", startTime)
  129. store := metaBackup.store
  130. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  131. ctx := context.Background()
  132. message := resp.EventNotification
  133. if message.OldEntry == nil && message.NewEntry == nil {
  134. return nil
  135. }
  136. if message.OldEntry == nil && message.NewEntry != nil {
  137. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  138. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  139. return store.InsertEntry(ctx, entry)
  140. }
  141. if message.OldEntry != nil && message.NewEntry == nil {
  142. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  143. return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  144. }
  145. if message.OldEntry != nil && message.NewEntry != nil {
  146. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  147. println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  148. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  149. return store.UpdateEntry(ctx, entry)
  150. }
  151. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  152. if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
  153. return err
  154. }
  155. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  156. return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
  157. }
  158. return nil
  159. }
  160. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  161. lastTime := time.Unix(0, lastTsNs)
  162. glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
  163. return metaBackup.setOffset(lastTime)
  164. })
  165. return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId,
  166. *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
  167. }
  168. func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
  169. value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
  170. if err != nil {
  171. return
  172. }
  173. tsNs := util.BytesToUint64(value)
  174. return time.Unix(0, int64(tsNs)), nil
  175. }
  176. func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
  177. valueBuf := make([]byte, 8)
  178. util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
  179. if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
  180. return err
  181. }
  182. return nil
  183. }
  184. var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
  185. func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  186. return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  187. return fn(client)
  188. })
  189. }
  190. func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
  191. return location.Url
  192. }