You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
8.2 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "io"
  10. "reflect"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. var (
  18. metaBackup FilerMetaBackupOptions
  19. )
  20. type FilerMetaBackupOptions struct {
  21. grpcDialOption grpc.DialOption
  22. filerAddress *string
  23. filerDirectory *string
  24. restart *bool
  25. backupFilerConfig *string
  26. store filer.FilerStore
  27. }
  28. func init() {
  29. cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
  30. metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
  31. metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
  32. metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
  33. metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
  34. }
  35. var cmdFilerMetaBackup = &Command{
  36. UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
  37. Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
  38. Long: `continuously backup filer meta data changes.
  39. The backup writes to another filer store specified in a backup_filer.toml.
  40. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
  41. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
  42. `,
  43. }
  44. func runFilerMetaBackup(cmd *Command, args []string) bool {
  45. metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  46. // load backup_filer.toml
  47. v := viper.New()
  48. v.SetConfigFile(*metaBackup.backupFilerConfig)
  49. if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
  50. glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
  51. " weed scaffold -config=%s -output=.\n\n\n",
  52. *metaBackup.backupFilerConfig, "backup_filer", "filer")
  53. }
  54. if err := metaBackup.initStore(v); err != nil {
  55. glog.V(0).Infof("init backup filer store: %v", err)
  56. return true
  57. }
  58. missingPreviousBackup := false
  59. _, err := metaBackup.getOffset()
  60. if err != nil {
  61. missingPreviousBackup = true
  62. }
  63. if *metaBackup.restart || missingPreviousBackup {
  64. glog.V(0).Infof("traversing metadata tree...")
  65. startTime := time.Now()
  66. if err := metaBackup.traverseMetadata(); err != nil {
  67. glog.Errorf("traverse meta data: %v", err)
  68. return true
  69. }
  70. glog.V(0).Infof("metadata copied up to %v", startTime)
  71. if err := metaBackup.setOffset(startTime); err != nil {
  72. startTime = time.Now()
  73. }
  74. }
  75. for {
  76. err := metaBackup.streamMetadataBackup()
  77. if err != nil {
  78. glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
  79. time.Sleep(1747 * time.Millisecond)
  80. }
  81. }
  82. return true
  83. }
  84. func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
  85. // load configuration for default filer store
  86. hasDefaultStoreConfigured := false
  87. for _, store := range filer.Stores {
  88. if v.GetBool(store.GetName() + ".enabled") {
  89. store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
  90. if err := store.Initialize(v, store.GetName()+"."); err != nil {
  91. glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
  92. }
  93. glog.V(0).Infof("configured filer store to %s", store.GetName())
  94. hasDefaultStoreConfigured = true
  95. metaBackup.store = filer.NewFilerStoreWrapper(store)
  96. break
  97. }
  98. }
  99. if !hasDefaultStoreConfigured {
  100. return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
  101. }
  102. return nil
  103. }
  104. func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
  105. var saveErr error
  106. traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  107. println("+", parentPath.Child(entry.Name))
  108. if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
  109. saveErr = fmt.Errorf("insert entry error: %v\n", err)
  110. return
  111. }
  112. })
  113. if traverseErr != nil {
  114. return fmt.Errorf("traverse: %v", traverseErr)
  115. }
  116. return saveErr
  117. }
  118. var (
  119. MetaBackupKey = []byte("metaBackup")
  120. )
  121. func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
  122. startTime, err := metaBackup.getOffset()
  123. if err != nil {
  124. startTime = time.Now()
  125. }
  126. glog.V(0).Infof("streaming from %v", startTime)
  127. store := metaBackup.store
  128. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  129. ctx := context.Background()
  130. message := resp.EventNotification
  131. if message.OldEntry == nil && message.NewEntry == nil {
  132. return nil
  133. }
  134. if message.OldEntry == nil && message.NewEntry != nil {
  135. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  136. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  137. return store.InsertEntry(ctx, entry)
  138. }
  139. if message.OldEntry != nil && message.NewEntry == nil {
  140. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  141. return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  142. }
  143. if message.OldEntry != nil && message.NewEntry != nil {
  144. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  145. println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  146. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  147. return store.UpdateEntry(ctx, entry)
  148. }
  149. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  150. if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
  151. return err
  152. }
  153. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  154. return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
  155. }
  156. return nil
  157. }
  158. tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  159. ctx, cancel := context.WithCancel(context.Background())
  160. defer cancel()
  161. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  162. ClientName: "meta_backup",
  163. PathPrefix: *metaBackup.filerDirectory,
  164. SinceNs: startTime.UnixNano(),
  165. })
  166. if err != nil {
  167. return fmt.Errorf("listen: %v", err)
  168. }
  169. var counter int64
  170. var lastWriteTime time.Time
  171. for {
  172. resp, listenErr := stream.Recv()
  173. if listenErr == io.EOF {
  174. return nil
  175. }
  176. if listenErr != nil {
  177. return listenErr
  178. }
  179. if err = eachEntryFunc(resp); err != nil {
  180. return err
  181. }
  182. counter++
  183. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  184. glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  185. counter = 0
  186. lastWriteTime = time.Now()
  187. if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
  188. return err2
  189. }
  190. }
  191. }
  192. })
  193. return tailErr
  194. }
  195. func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
  196. value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
  197. if err != nil {
  198. return
  199. }
  200. tsNs := util.BytesToUint64(value)
  201. return time.Unix(0, int64(tsNs)), nil
  202. }
  203. func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
  204. valueBuf := make([]byte, 8)
  205. util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
  206. if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
  207. return err
  208. }
  209. return nil
  210. }
  211. var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
  212. func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  213. return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  214. return fn(client)
  215. })
  216. }
  217. func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
  218. return location.Url
  219. }