diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index af0a624b1..587174059 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -20,19 +20,21 @@ import ( ) type SyncOptions struct { - isActivePassive *bool - filerA *string - filerB *string - aPath *string - bPath *string - aReplication *string - bReplication *string - aCollection *string - bCollection *string - aTtlSec *int - bTtlSec *int - aDebug *bool - bDebug *bool + isActivePassive *bool + filerA *string + filerB *string + aPath *string + bPath *string + aReplication *string + bReplication *string + aCollection *string + bCollection *string + aTtlSec *int + bTtlSec *int + aDebug *bool + bDebug *bool + aProxyByFiler *bool + bProxyByFiler *bool } var ( @@ -43,7 +45,7 @@ var ( func init() { cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle - syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true") + syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true") syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") @@ -54,6 +56,8 @@ func init() { syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") + syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", true, "read and write file chunks by filer A instead of volume servers") + syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", true, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") @@ -62,8 +66,8 @@ func init() { var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=: -b=:", - Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters", - Long: `continuously synchronize file changes between two active-active or active-passive filers + Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters", + Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate: @@ -86,8 +90,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug) + err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, + *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, + *syncOptions.bDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -98,8 +103,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { if !*syncOptions.isActivePassive { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug) + err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, + *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, + *syncOptions.aDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -113,8 +119,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string, - replicationStr, collection string, ttlSec int, debug bool) error { +func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string, + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error { // read source filer signature sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) @@ -138,9 +144,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath) + filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption) + filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index f19af43b2..4d78d769f 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -138,6 +138,22 @@ func ServerToGrpcAddress(server string) (serverGrpcAddress string) { return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) } +func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { + hostnameAndPort := strings.Split(grpcAddress, ":") + if len(hostnameAndPort) != 2 { + return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress) + } + + grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + if parseErr != nil { + return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) + } + + port := int(grpcPort) - 10000 + + return fmt.Sprintf("%s:%d", hostnameAndPort[0], port) +} + func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d193ff81c..b062adcfe 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st replicatedChunk, e := fs.replicateOneChunk(chunk, path) if e != nil { err = e + return } replicatedChunks[index] = replicatedChunk }(sourceChunk, chunkIndex) @@ -97,6 +98,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if fs.writeChunkByFiler { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) + } glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 9c0e4176f..600ff51f0 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -3,6 +3,7 @@ package filersink import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -18,14 +19,16 @@ import ( ) type FilerSink struct { - filerSource *source.FilerSource - grpcAddress string - dir string - replication string - collection string - ttlSec int32 - dataCenter string - grpcDialOption grpc.DialOption + filerSource *source.FilerSource + grpcAddress string + dir string + replication string + collection string + ttlSec int32 + dataCenter string + grpcDialOption grpc.DialOption + address string + writeChunkByFiler bool } func init() { @@ -42,26 +45,33 @@ func (fs *FilerSink) GetSinkToDirectory() string { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.DoInitialize( + "", configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), configuration.GetString(prefix+"replication"), configuration.GetString(prefix+"collection"), configuration.GetInt(prefix+"ttlSec"), - security.LoadClientTLS(util.GetViper(), "grpc.client")) + security.LoadClientTLS(util.GetViper(), "grpc.client"), + false) } func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { fs.filerSource = s } -func (fs *FilerSink) DoInitialize(grpcAddress string, dir string, - replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption) (err error) { +func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, + replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) { + fs.address = address + if fs.address == "" { + fs.address = pb.GrpcAddressToServerAddress(grpcAddress) + } fs.grpcAddress = grpcAddress fs.dir = dir fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) fs.grpcDialOption = grpcDialOption + fs.writeChunkByFiler = writeChunkByFiler return nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index ff4f2eb26..3982360b0 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -25,19 +25,28 @@ type FilerSource struct { grpcAddress string grpcDialOption grpc.DialOption Dir string + address string + proxyByFiler bool } func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.DoInitialize( + "", configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), + false, ) } -func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) { +func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) { + fs.address = address + if fs.address == "" { + fs.address = pb.GrpcAddressToServerAddress(grpcAddress) + } fs.grpcAddress = grpcAddress fs.Dir = dir fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fs.proxyByFiler = readChunkFromFiler return nil } @@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) return } -func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) { +func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { + + if fs.proxyByFiler { + return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId) + } - fileUrls, err := fs.LookupFileId(part) + fileUrls, err := fs.LookupFileId(fileId) if err != nil { return "", nil, nil, err } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 5b93c6d08..fcafe3893 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -3,30 +3,38 @@ package weed_server import ( "github.com/chrislusf/seaweedfs/weed/util" "net/http" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/stats" ) func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + + start := time.Now() + + // proxy to volume servers + var fileId string + if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") { + fileId = r.RequestURI[len("/?proxyChunkId="):] + } + if fileId != "" { + stats.FilerRequestCounter.WithLabelValues("proxy").Inc() + fs.proxyToVolumeServer(w,r,fileId) + stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds()) + return + } + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) if r.Header.Get("Origin") != "" { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") } - start := time.Now() switch r.Method { case "GET": - fileId := r.FormValue("proxyToFileId") - if fileId != "" { - stats.FilerRequestCounter.WithLabelValues("proxy").Inc() - fs.proxyToVolumeServer(w,r,fileId) - stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds()) - } else { - stats.FilerRequestCounter.WithLabelValues("get").Inc() - fs.GetOrHeadHandler(w, r, true) - stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) - } + stats.FilerRequestCounter.WithLabelValues("get").Inc() + fs.GetOrHeadHandler(w, r, true) + stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) case "HEAD": stats.FilerRequestCounter.WithLabelValues("head").Inc() fs.GetOrHeadHandler(w, r, false)