You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							258 lines
						
					
					
						
							8.3 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							258 lines
						
					
					
						
							8.3 KiB
						
					
					
				| package command | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/spf13/viper" | |
| 	"google.golang.org/grpc" | |
| 	"reflect" | |
| 	"strings" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/security" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| ) | |
| 
 | |
| var ( | |
| 	metaBackup FilerMetaBackupOptions | |
| ) | |
| 
 | |
| type FilerMetaBackupOptions struct { | |
| 	grpcDialOption    grpc.DialOption | |
| 	filerAddress      *string | |
| 	filerDirectory    *string | |
| 	restart           *bool | |
| 	backupFilerConfig *string | |
| 
 | |
| 	store       filer.FilerStore | |
| 	clientId    int32 | |
| 	clientEpoch int32 | |
| } | |
| 
 | |
| func init() { | |
| 	cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle | |
| 	metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port") | |
| 	metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer") | |
| 	metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup") | |
| 	metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store") | |
| 	metaBackup.clientId = util.RandomInt32() | |
| } | |
| 
 | |
| var cmdFilerMetaBackup = &Command{ | |
| 	UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml", | |
| 	Short:     "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml", | |
| 	Long: `continuously backup filer meta data changes.  | |
| The backup writes to another filer store specified in a backup_filer.toml. | |
|  | |
| 	weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" | |
| 	weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart | |
|  | |
|   `, | |
| } | |
| 
 | |
| func runFilerMetaBackup(cmd *Command, args []string) bool { | |
| 
 | |
| 	util.LoadSecurityConfiguration() | |
| 	metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") | |
| 
 | |
| 	// load backup_filer.toml | |
| 	v := viper.New() | |
| 	v.SetConfigFile(*metaBackup.backupFilerConfig) | |
| 
 | |
| 	if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file | |
| 		glog.Fatalf("Failed to load %s file: %v\nPlease use this command to generate the a %s.toml file\n"+ | |
| 			"    weed scaffold -config=%s -output=.\n\n\n", | |
| 			*metaBackup.backupFilerConfig, err, "backup_filer", "filer") | |
| 	} | |
| 
 | |
| 	if err := metaBackup.initStore(v); err != nil { | |
| 		glog.V(0).Infof("init backup filer store: %v", err) | |
| 		return true | |
| 	} | |
| 
 | |
| 	missingPreviousBackup := false | |
| 	_, err := metaBackup.getOffset() | |
| 	if err != nil { | |
| 		missingPreviousBackup = true | |
| 	} | |
| 
 | |
| 	if *metaBackup.restart || missingPreviousBackup { | |
| 		glog.V(0).Infof("traversing metadata tree...") | |
| 		startTime := time.Now() | |
| 		if err := metaBackup.traverseMetadata(); err != nil { | |
| 			glog.Errorf("traverse meta data: %v", err) | |
| 			return true | |
| 		} | |
| 		glog.V(0).Infof("metadata copied up to %v", startTime) | |
| 		if err := metaBackup.setOffset(startTime); err != nil { | |
| 			startTime = time.Now() | |
| 		} | |
| 	} | |
| 
 | |
| 	for { | |
| 		err := metaBackup.streamMetadataBackup() | |
| 		if err != nil { | |
| 			glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err) | |
| 			time.Sleep(1747 * time.Millisecond) | |
| 		} | |
| 	} | |
| 
 | |
| 	return true | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error { | |
| 	// load configuration for default filer store | |
| 	hasDefaultStoreConfigured := false | |
| 	for _, store := range filer.Stores { | |
| 		if v.GetBool(store.GetName() + ".enabled") { | |
| 			store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore) | |
| 			if err := store.Initialize(v, store.GetName()+"."); err != nil { | |
| 				glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) | |
| 			} | |
| 			glog.V(0).Infof("configured filer store to %s", store.GetName()) | |
| 			hasDefaultStoreConfigured = true | |
| 			metaBackup.store = filer.NewFilerStoreWrapper(store) | |
| 			break | |
| 		} | |
| 	} | |
| 	if !hasDefaultStoreConfigured { | |
| 		return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed()) | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) { | |
| 	var saveErr error | |
| 
 | |
| 	traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) { | |
| 
 | |
| 		println("+", parentPath.Child(entry.Name)) | |
| 		if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil { | |
| 			saveErr = fmt.Errorf("insert entry error: %v\n", err) | |
| 			return | |
| 		} | |
| 
 | |
| 	}) | |
| 
 | |
| 	if traverseErr != nil { | |
| 		return fmt.Errorf("traverse: %v", traverseErr) | |
| 	} | |
| 	return saveErr | |
| } | |
| 
 | |
| var ( | |
| 	MetaBackupKey = []byte("metaBackup") | |
| ) | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { | |
| 
 | |
| 	startTime, err := metaBackup.getOffset() | |
| 	if err != nil { | |
| 		startTime = time.Now() | |
| 	} | |
| 	glog.V(0).Infof("streaming from %v", startTime) | |
| 
 | |
| 	store := metaBackup.store | |
| 
 | |
| 	eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { | |
| 
 | |
| 		ctx := context.Background() | |
| 		message := resp.EventNotification | |
| 
 | |
| 		if filer_pb.IsEmpty(resp) { | |
| 			return nil | |
| 		} else if filer_pb.IsCreate(resp) { | |
| 			println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) | |
| 			entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) | |
| 			return store.InsertEntry(ctx, entry) | |
| 		} else if filer_pb.IsDelete(resp) { | |
| 			println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) | |
| 			return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)) | |
| 		} else if filer_pb.IsUpdate(resp) { | |
| 			println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) | |
| 			entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) | |
| 			return store.UpdateEntry(ctx, entry) | |
| 		} else { | |
| 			// renaming | |
| 			println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) | |
| 			if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil { | |
| 				return err | |
| 			} | |
| 			println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) | |
| 			return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry)) | |
| 		} | |
| 
 | |
| 		return nil | |
| 	} | |
| 
 | |
| 	processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { | |
| 		lastTime := time.Unix(0, lastTsNs) | |
| 		glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3)) | |
| 		return metaBackup.setOffset(lastTime) | |
| 	}) | |
| 
 | |
| 	metaBackup.clientEpoch++ | |
| 
 | |
| 	prefix := *metaBackup.filerDirectory | |
| 	if !strings.HasSuffix(prefix, "/") { | |
| 		prefix = prefix + "/" | |
| 	} | |
| 	metadataFollowOption := &pb.MetadataFollowOption{ | |
| 		ClientName:             "meta_backup", | |
| 		ClientId:               metaBackup.clientId, | |
| 		ClientEpoch:            metaBackup.clientEpoch, | |
| 		SelfSignature:          0, | |
| 		PathPrefix:             prefix, | |
| 		AdditionalPathPrefixes: nil, | |
| 		DirectoriesToWatch:     nil, | |
| 		StartTsNs:              startTime.UnixNano(), | |
| 		StopTsNs:               0, | |
| 		EventErrorType:         pb.RetryForeverOnError, | |
| 	} | |
| 
 | |
| 	return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset) | |
| 
 | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { | |
| 	value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey) | |
| 	if err != nil { | |
| 		return | |
| 	} | |
| 	tsNs := util.BytesToUint64(value) | |
| 
 | |
| 	return time.Unix(0, int64(tsNs)), nil | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error { | |
| 	valueBuf := make([]byte, 8) | |
| 	util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano())) | |
| 
 | |
| 	if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil { | |
| 		return err | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { | |
| 
 | |
| 	return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { | |
| 		return fn(client) | |
| 	}) | |
| 
 | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string { | |
| 	return location.Url | |
| } | |
| 
 | |
| func (metaBackup *FilerMetaBackupOptions) GetDataCenter() string { | |
| 	return "" | |
| }
 |