You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

174 lines
5.5 KiB

package replication
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
)
type Replicator struct {
sink sink.ReplicationSink
source *source.FilerSource
excludeDirs []string
}
func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator {
source := &source.FilerSource{}
source.Initialize(sourceConfig, configPrefix)
dataSink.SetSourceFiler(source)
return &Replicator{
sink: dataSink,
source: source,
excludeDirs: sourceConfig.GetStringSlice(configPrefix + "excludeDirectories"),
}
}
func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_pb.EventNotification) error {
if message.IsFromOtherCluster && r.sink.GetName() == "filer" {
return nil
}
oldEntry := message.OldEntry
newEntry := message.NewEntry
newParentPath := message.NewParentPath
oldInSource := util.IsEqualOrUnder(key, r.source.Dir) && !r.isExcluded(key)
// For rename events (both old and new entry present), check both paths
// against the source directory. Convert cross-boundary renames to
// create or delete so the sink stays consistent.
if oldEntry != nil && newEntry != nil {
newFullPath, targetParent := metadataEventTarget(key, newEntry, newParentPath)
newInSource := util.IsEqualOrUnder(newFullPath, r.source.Dir) && !r.isExcluded(newFullPath)
if !oldInSource && !newInSource {
return nil
}
if !oldInSource {
// Rename into watched directory: treat as create
oldEntry = nil
key = newFullPath
newParentPath = targetParent
} else if !newInSource {
// Rename out of watched directory: treat as delete
newEntry = nil
newParentPath = ""
}
} else if !oldInSource {
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
var dateKey string
if r.sink.IsIncremental() {
var mTime int64
if newEntry != nil {
mTime = newEntry.Attributes.Mtime
} else if oldEntry != nil {
mTime = oldEntry.Attributes.Mtime
}
dateKey = time.Unix(mTime, 0).Format("2006-01-02")
}
oldSinkKey := r.sourceToSinkKey(key, dateKey)
glog.V(3).Infof("replicate %s => %s", key, oldSinkKey)
newSinkKey := oldSinkKey
newSinkParentPath := newParentPath
if oldEntry != nil && newEntry != nil {
targetSourceKey, targetSourceParent := metadataEventTarget(key, newEntry, newParentPath)
newSinkKey = r.sourceToSinkKey(targetSourceKey, dateKey)
newSinkParentPath = r.sourceToSinkPath(targetSourceParent, dateKey)
} else if newParentPath != "" && util.IsEqualOrUnder(newParentPath, r.source.Dir) {
newSinkParentPath = r.sourceToSinkPath(newParentPath, dateKey)
}
if oldEntry != nil && newEntry == nil {
glog.V(4).Infof("deleting %v", oldSinkKey)
return r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
if oldEntry == nil && newEntry != nil {
glog.V(4).Infof("creating %v", oldSinkKey)
return r.sink.CreateEntry(oldSinkKey, newEntry, message.Signatures)
}
if oldEntry == nil && newEntry == nil {
glog.V(0).Infof("weird message %+v", message)
return nil
}
if oldSinkKey != newSinkKey && r.sink.GetName() != "filer" {
if err := r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err)
}
glog.V(4).Infof("creating renamed %v", newSinkKey)
return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures)
}
foundExisting, err := r.sink.UpdateEntry(oldSinkKey, oldEntry, newSinkParentPath, newEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
glog.V(4).Infof("updated %v", oldSinkKey)
return err
}
err = r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures)
if err != nil {
return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err)
}
glog.V(4).Infof("creating missing %v", newSinkKey)
return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures)
}
func (r *Replicator) isExcluded(path string) bool {
for _, excludeDir := range r.excludeDirs {
if util.IsEqualOrUnder(path, excludeDir) {
return true
}
}
return false
}
func (r *Replicator) sourceToSinkKey(sourceKey, dateKey string) string {
return util.Join(r.sink.GetSinkToDirectory(), dateKey, sourceKey[len(r.source.Dir):])
}
func (r *Replicator) sourceToSinkPath(sourcePath, dateKey string) string {
return util.Join(r.sink.GetSinkToDirectory(), dateKey, sourcePath[len(r.source.Dir):])
}
func metadataEventTarget(key string, newEntry *filer_pb.Entry, newParentPath string) (targetKey, targetParent string) {
if newEntry == nil {
return "", ""
}
targetParent = newParentPath
if targetParent == "" {
targetParent, _ = util.FullPath(key).DirAndName()
}
return util.Join(targetParent, newEntry.Name), targetParent
}
func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) {
if readErr = pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil {
return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err)
} else {
filerSignature = resp.Signature
}
return nil
}); readErr != nil {
return 0, readErr
}
return filerSignature, nil
}