You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

258 lines
8.3 KiB

6 months ago
6 months ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "reflect"
  10. "strings"
  11. "time"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/security"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. var (
  18. metaBackup FilerMetaBackupOptions
  19. )
  20. type FilerMetaBackupOptions struct {
  21. grpcDialOption grpc.DialOption
  22. filerAddress *string
  23. filerDirectory *string
  24. restart *bool
  25. backupFilerConfig *string
  26. store filer.FilerStore
  27. clientId int32
  28. clientEpoch int32
  29. }
  30. func init() {
  31. cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
  32. metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
  33. metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
  34. metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
  35. metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
  36. metaBackup.clientId = util.RandomInt32()
  37. }
  38. var cmdFilerMetaBackup = &Command{
  39. UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
  40. Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
  41. Long: `continuously backup filer meta data changes.
  42. The backup writes to another filer store specified in a backup_filer.toml.
  43. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
  44. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
  45. `,
  46. }
  47. func runFilerMetaBackup(cmd *Command, args []string) bool {
  48. util.LoadSecurityConfiguration()
  49. metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  50. // load backup_filer.toml
  51. v := viper.New()
  52. v.SetConfigFile(*metaBackup.backupFilerConfig)
  53. if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
  54. glog.Fatalf("Failed to load %s file: %v\nPlease use this command to generate the a %s.toml file\n"+
  55. " weed scaffold -config=%s -output=.\n\n\n",
  56. *metaBackup.backupFilerConfig, err, "backup_filer", "filer")
  57. }
  58. if err := metaBackup.initStore(v); err != nil {
  59. glog.V(0).Infof("init backup filer store: %v", err)
  60. return true
  61. }
  62. missingPreviousBackup := false
  63. _, err := metaBackup.getOffset()
  64. if err != nil {
  65. missingPreviousBackup = true
  66. }
  67. if *metaBackup.restart || missingPreviousBackup {
  68. glog.V(0).Infof("traversing metadata tree...")
  69. startTime := time.Now()
  70. if err := metaBackup.traverseMetadata(); err != nil {
  71. glog.Errorf("traverse meta data: %v", err)
  72. return true
  73. }
  74. glog.V(0).Infof("metadata copied up to %v", startTime)
  75. if err := metaBackup.setOffset(startTime); err != nil {
  76. startTime = time.Now()
  77. }
  78. }
  79. for {
  80. err := metaBackup.streamMetadataBackup()
  81. if err != nil {
  82. glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
  83. time.Sleep(1747 * time.Millisecond)
  84. }
  85. }
  86. return true
  87. }
  88. func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
  89. // load configuration for default filer store
  90. hasDefaultStoreConfigured := false
  91. for _, store := range filer.Stores {
  92. if v.GetBool(store.GetName() + ".enabled") {
  93. store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
  94. if err := store.Initialize(v, store.GetName()+"."); err != nil {
  95. glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
  96. }
  97. glog.V(0).Infof("configured filer store to %s", store.GetName())
  98. hasDefaultStoreConfigured = true
  99. metaBackup.store = filer.NewFilerStoreWrapper(store)
  100. break
  101. }
  102. }
  103. if !hasDefaultStoreConfigured {
  104. return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
  105. }
  106. return nil
  107. }
  108. func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
  109. var saveErr error
  110. traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  111. println("+", parentPath.Child(entry.Name))
  112. if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
  113. saveErr = fmt.Errorf("insert entry error: %v\n", err)
  114. return
  115. }
  116. })
  117. if traverseErr != nil {
  118. return fmt.Errorf("traverse: %v", traverseErr)
  119. }
  120. return saveErr
  121. }
  122. var (
  123. MetaBackupKey = []byte("metaBackup")
  124. )
  125. func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
  126. startTime, err := metaBackup.getOffset()
  127. if err != nil {
  128. startTime = time.Now()
  129. }
  130. glog.V(0).Infof("streaming from %v", startTime)
  131. store := metaBackup.store
  132. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  133. ctx := context.Background()
  134. message := resp.EventNotification
  135. if filer_pb.IsEmpty(resp) {
  136. return nil
  137. } else if filer_pb.IsCreate(resp) {
  138. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  139. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  140. return store.InsertEntry(ctx, entry)
  141. } else if filer_pb.IsDelete(resp) {
  142. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  143. return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  144. } else if filer_pb.IsUpdate(resp) {
  145. println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  146. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  147. return store.UpdateEntry(ctx, entry)
  148. } else {
  149. // renaming
  150. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  151. if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
  152. return err
  153. }
  154. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  155. return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
  156. }
  157. return nil
  158. }
  159. processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
  160. lastTime := time.Unix(0, lastTsNs)
  161. glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
  162. return metaBackup.setOffset(lastTime)
  163. })
  164. metaBackup.clientEpoch++
  165. prefix := *metaBackup.filerDirectory
  166. if !strings.HasSuffix(prefix, "/") {
  167. prefix = prefix + "/"
  168. }
  169. metadataFollowOption := &pb.MetadataFollowOption{
  170. ClientName: "meta_backup",
  171. ClientId: metaBackup.clientId,
  172. ClientEpoch: metaBackup.clientEpoch,
  173. SelfSignature: 0,
  174. PathPrefix: prefix,
  175. AdditionalPathPrefixes: nil,
  176. DirectoriesToWatch: nil,
  177. StartTsNs: startTime.UnixNano(),
  178. StopTsNs: 0,
  179. EventErrorType: pb.RetryForeverOnError,
  180. }
  181. return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  182. }
  183. func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
  184. value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
  185. if err != nil {
  186. return
  187. }
  188. tsNs := util.BytesToUint64(value)
  189. return time.Unix(0, int64(tsNs)), nil
  190. }
  191. func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
  192. valueBuf := make([]byte, 8)
  193. util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
  194. if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
  195. return err
  196. }
  197. return nil
  198. }
  199. var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
  200. func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  201. return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  202. return fn(client)
  203. })
  204. }
  205. func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
  206. return location.Url
  207. }
  208. func (metaBackup *FilerMetaBackupOptions) GetDataCenter() string {
  209. return ""
  210. }