You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
8.3 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/spf13/viper"
  8. "google.golang.org/grpc"
  9. "io"
  10. "reflect"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. var (
  18. metaBackup FilerMetaBackupOptions
  19. )
  20. type FilerMetaBackupOptions struct {
  21. grpcDialOption grpc.DialOption
  22. filerAddress *string
  23. filerDirectory *string
  24. restart *bool
  25. backupFilerConfig *string
  26. store filer.FilerStore
  27. }
  28. func init() {
  29. cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
  30. metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
  31. metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
  32. metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
  33. metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
  34. }
  35. var cmdFilerMetaBackup = &Command{
  36. UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
  37. Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
  38. Long: `continuously backup filer meta data changes.
  39. The backup writes to another filer store specified in a backup_filer.toml.
  40. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
  41. weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
  42. `,
  43. }
  44. func runFilerMetaBackup(cmd *Command, args []string) bool {
  45. util.LoadConfiguration("security", false)
  46. metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  47. // load backup_filer.toml
  48. v := viper.New()
  49. v.SetConfigFile(*metaBackup.backupFilerConfig)
  50. if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
  51. glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
  52. " weed scaffold -config=%s -output=.\n\n\n",
  53. *metaBackup.backupFilerConfig, "backup_filer", "filer")
  54. }
  55. if err := metaBackup.initStore(v); err != nil {
  56. glog.V(0).Infof("init backup filer store: %v", err)
  57. return true
  58. }
  59. missingPreviousBackup := false
  60. _, err := metaBackup.getOffset()
  61. if err != nil {
  62. missingPreviousBackup = true
  63. }
  64. if *metaBackup.restart || missingPreviousBackup {
  65. glog.V(0).Infof("traversing metadata tree...")
  66. startTime := time.Now()
  67. if err := metaBackup.traverseMetadata(); err != nil {
  68. glog.Errorf("traverse meta data: %v", err)
  69. return true
  70. }
  71. glog.V(0).Infof("metadata copied up to %v", startTime)
  72. if err := metaBackup.setOffset(startTime); err != nil {
  73. startTime = time.Now()
  74. }
  75. }
  76. for {
  77. err := metaBackup.streamMetadataBackup()
  78. if err != nil {
  79. glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
  80. time.Sleep(1747 * time.Millisecond)
  81. }
  82. }
  83. return true
  84. }
  85. func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
  86. // load configuration for default filer store
  87. hasDefaultStoreConfigured := false
  88. for _, store := range filer.Stores {
  89. if v.GetBool(store.GetName() + ".enabled") {
  90. store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
  91. if err := store.Initialize(v, store.GetName()+"."); err != nil {
  92. glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
  93. }
  94. glog.V(0).Infof("configured filer store to %s", store.GetName())
  95. hasDefaultStoreConfigured = true
  96. metaBackup.store = filer.NewFilerStoreWrapper(store)
  97. break
  98. }
  99. }
  100. if !hasDefaultStoreConfigured {
  101. return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
  102. }
  103. return nil
  104. }
  105. func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
  106. var saveErr error
  107. traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  108. println("+", parentPath.Child(entry.Name))
  109. if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
  110. saveErr = fmt.Errorf("insert entry error: %v\n", err)
  111. return
  112. }
  113. })
  114. if traverseErr != nil {
  115. return fmt.Errorf("traverse: %v", traverseErr)
  116. }
  117. return saveErr
  118. }
  119. var (
  120. MetaBackupKey = []byte("metaBackup")
  121. )
  122. func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
  123. startTime, err := metaBackup.getOffset()
  124. if err != nil {
  125. startTime = time.Now()
  126. }
  127. glog.V(0).Infof("streaming from %v", startTime)
  128. store := metaBackup.store
  129. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  130. ctx := context.Background()
  131. message := resp.EventNotification
  132. if message.OldEntry == nil && message.NewEntry == nil {
  133. return nil
  134. }
  135. if message.OldEntry == nil && message.NewEntry != nil {
  136. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  137. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  138. return store.InsertEntry(ctx, entry)
  139. }
  140. if message.OldEntry != nil && message.NewEntry == nil {
  141. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  142. return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  143. }
  144. if message.OldEntry != nil && message.NewEntry != nil {
  145. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  146. println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  147. entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
  148. return store.UpdateEntry(ctx, entry)
  149. }
  150. println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
  151. if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
  152. return err
  153. }
  154. println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
  155. return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
  156. }
  157. return nil
  158. }
  159. tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  160. ctx, cancel := context.WithCancel(context.Background())
  161. defer cancel()
  162. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  163. ClientName: "meta_backup",
  164. PathPrefix: *metaBackup.filerDirectory,
  165. SinceNs: startTime.UnixNano(),
  166. })
  167. if err != nil {
  168. return fmt.Errorf("listen: %v", err)
  169. }
  170. var counter int64
  171. var lastWriteTime time.Time
  172. for {
  173. resp, listenErr := stream.Recv()
  174. if listenErr == io.EOF {
  175. return nil
  176. }
  177. if listenErr != nil {
  178. return listenErr
  179. }
  180. if err = eachEntryFunc(resp); err != nil {
  181. return err
  182. }
  183. counter++
  184. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  185. glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  186. counter = 0
  187. lastWriteTime = time.Now()
  188. if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
  189. return err2
  190. }
  191. }
  192. }
  193. })
  194. return tailErr
  195. }
  196. func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
  197. value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
  198. if err != nil {
  199. return
  200. }
  201. tsNs := util.BytesToUint64(value)
  202. return time.Unix(0, int64(tsNs)), nil
  203. }
  204. func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
  205. valueBuf := make([]byte, 8)
  206. util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
  207. if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
  208. return err
  209. }
  210. return nil
  211. }
  212. var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
  213. func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  214. return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  215. return fn(client)
  216. })
  217. }
  218. func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
  219. return location.Url
  220. }