|
|
package command
import ( "context" "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink" "github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/security" statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/grace" "google.golang.org/grpc" "os" "regexp" "strings" "sync/atomic" "time" )
type SyncOptions struct { isActivePassive *bool filerA *string filerB *string aPath *string aExcludePaths *string bPath *string bExcludePaths *string aReplication *string bReplication *string aCollection *string bCollection *string aTtlSec *int bTtlSec *int aDiskType *string bDiskType *string aDebug *bool bDebug *bool aFromTsMs *int64 bFromTsMs *int64 aProxyByFiler *bool bProxyByFiler *bool metricsHttpIp *string metricsHttpPort *int concurrency *int aDoDeleteFiles *bool bDoDeleteFiles *bool clientId int32 clientEpoch atomic.Int32 }
const ( SyncKeyPrefix = "sync." DefaultConcurrencyLimit = 32 )
var ( syncOptions SyncOptions syncCpuProfile *string syncMemProfile *string )
func init() { cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true") syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A") syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B") syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A") syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B") syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers") syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip") syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A") syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B") syncOptions.clientId = util.RandomInt32() }
var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters", Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate:
* filer.sync only works between two filers. * filer.sync does not need any special message queue setup. * filer.sync supports both active-active and active-passive modes. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute. A fresh sync will start from the earliest metadata logs.
`, }
func runFilerSynchronize(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
filerA := pb.ServerAddress(*syncOptions.filerA) filerB := pb.ServerAddress(*syncOptions.filerB)
// start filer.sync metrics server
go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
// read a filer signature
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) if aFilerErr != nil { glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr) return true } // read b filer signature
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB) if bFilerErr != nil { glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr) return true }
go func() { // a->b
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath)) if initOffsetError != nil { glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) os.Exit(2) } for { syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, syncOptions.clientEpoch.Load(), grpcDialOption, filerA, *syncOptions.aPath, util.StringSplit(*syncOptions.aExcludePaths, ","), *syncOptions.aProxyByFiler, filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug, *syncOptions.concurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) } } }()
if !*syncOptions.isActivePassive { // b->a
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath)) if initOffsetError != nil { glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) os.Exit(2) } go func() { for { syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, syncOptions.clientEpoch.Load(), grpcDialOption, filerB, *syncOptions.bPath, util.StringSplit(*syncOptions.bExcludePaths, ","), *syncOptions.bProxyByFiler, filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug, *syncOptions.concurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) } } }() }
select {}
return true }
// initOffsetFromTsMs Initialize offset
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error { if fromTsMs <= 0 { return nil } // convert to nanosecond
fromTsNs := fromTsMs * 1000_000 // If not successful, exit the program.
setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs) if setOffsetErr != nil { return setOffsetErr } glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB) return nil }
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err }
glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
// create filer sink
filerSource := &source.FilerSource{} filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification for _, sig := range message.Signatures { if sig == targetFilerSignature && targetFilerSignature != 0 { fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) return nil } } return persistEventFn(resp) }
if concurrency < 0 || concurrency > 1024 { glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit) concurrency = DefaultConcurrencyLimit } processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processor.AddSyncJob(resp) return nil }, 3*time.Second, func(counter int64, lastTsNs int64) error { offsetTsNs := processor.processedTsWatermark.Load() if offsetTsNs == 0 { return nil } // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now // collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) })
prefix := sourcePath if !strings.HasSuffix(prefix, "/") { prefix = prefix + "/" }
metadataFollowOption := &pb.MetadataFollowOption{ ClientName: clientName, ClientId: clientId, ClientEpoch: clientEpoch, SelfSignature: targetFilerSignature, PathPrefix: prefix, AdditionalPathPrefixes: nil, DirectoriesToWatch: nil, StartTsNs: sourceFilerOffsetTsNs, StopTsNs: 0, EventErrorType: pb.RetryForeverOnError, }
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string { // compatible historical version
if path == "/" { return SyncKeyPrefix } else { return SyncKeyPrefix + path } }
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) if err != nil { return err }
if len(resp.Error) != 0 { return errors.New(resp.Error) } if len(resp.Value) < 8 { return nil }
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
return nil })
return
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8) util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{ Key: syncKey, Value: valueBuf, }) if err != nil { return err }
if len(resp.Error) != 0 { return errors.New(resp.Error) }
return nil
})
}
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { // process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification
var sourceOldKey, sourceNewKey util.FullPath if message.OldEntry != nil { sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) } if message.NewEntry != nil { sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) }
if debug { glog.V(0).Infof("received %v", resp) }
if isMultipartUploadDir(resp.Directory + "/") { return nil }
if !strings.HasPrefix(resp.Directory, sourcePath) { return nil } for _, excludePath := range excludePaths { if strings.HasPrefix(resp.Directory, excludePath) { return nil } } if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) { return nil } if dataSink.IsIncremental() { doDeleteFiles = false } // handle deletions
if filer_pb.IsDelete(resp) { if !doDeleteFiles { return nil } if !strings.HasPrefix(string(sourceOldKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) }
// handle new entries
if filer_pb.IsCreate(resp) { if !strings.HasPrefix(string(sourceNewKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { return fmt.Errorf("create entry1 : %v", err) } else { return nil } }
// this is something special?
if filer_pb.IsEmpty(resp) { return nil }
// handle updates
if strings.HasPrefix(string(sourceOldKey), sourcePath) { // old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) { // new key is also in the watched directory
if doDeleteFiles { oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) if foundExisting { return err }
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { return fmt.Errorf("delete old entry %v: %v", oldKey, err) } } // create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { return fmt.Errorf("create entry2 : %v", err) } else { return nil }
} else { // new key is outside of the watched directory
if doDeleteFiles { key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } } } else { // old key is outside of the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) { // new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { return fmt.Errorf("create entry3 : %v", err) } else { return nil } } else { // new key is also outside of the watched directory
// skip
} }
return nil } return processEventFn }
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) { if !dataSink.IsIncremental() { key = util.Join(targetPath, string(sourceKey)[len(sourcePath):]) } else { var mTime int64 if message.NewEntry != nil { mTime = message.NewEntry.Attributes.Mtime } else if message.OldEntry != nil { mTime = message.OldEntry.Attributes.Mtime } dateKey := time.Unix(mTime, 0).Format("2006-01-02") key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) }
return escapeKey(key) }
|