Browse Source

filer.sync: replicate outside of either cluster, only need to see filers

pull/1759/head
Chris Lu 4 years ago
parent
commit
80b8692688
  1. 28
      weed/command/filer_sync.go
  2. 16
      weed/pb/grpc_client_server.go
  3. 4
      weed/replication/sink/filersink/fetch_write.go
  4. 16
      weed/replication/sink/filersink/filer_sink.go
  5. 19
      weed/replication/source/filer_source.go
  6. 24
      weed/server/filer_server_handlers.go

28
weed/command/filer_sync.go

@ -33,6 +33,8 @@ type SyncOptions struct {
bTtlSec *int
aDebug *bool
bDebug *bool
aProxyByFiler *bool
bProxyByFiler *bool
}
var (
@ -43,7 +45,7 @@ var (
func init() {
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true")
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
@ -54,6 +56,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", true, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", true, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
@ -62,8 +66,8 @@ func init() {
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters",
Long: `continuously synchronize file changes between two active-active or active-passive filers
Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
@ -86,8 +90,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler,
*syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler,
*syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@ -98,8 +103,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler,
*syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler,
*syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@ -113,8 +119,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
replicationStr, collection string, ttlSec int, debug bool) error {
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error {
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
@ -138,9 +144,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {

16
weed/pb/grpc_client_server.go

@ -138,6 +138,22 @@ func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
}
func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
hostnameAndPort := strings.Split(grpcAddress, ":")
if len(hostnameAndPort) != 2 {
return fmt.Sprintf("unexpected grpcAddress: %s", grpcAddress)
}
grpcPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
if parseErr != nil {
return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
}
port := int(grpcPort) - 10000
return fmt.Sprintf("%s:%d", hostnameAndPort[0], port)
}
func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master)

4
weed/replication/sink/filersink/fetch_write.go

@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
replicatedChunk, e := fs.replicateOneChunk(chunk, path)
if e != nil {
err = e
return
}
replicatedChunks[index] = replicatedChunk
}(sourceChunk, chunkIndex)
@ -97,6 +98,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if fs.writeChunkByFiler {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)

16
weed/replication/sink/filersink/filer_sink.go

@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@ -26,6 +27,8 @@ type FilerSink struct {
ttlSec int32
dataCenter string
grpcDialOption grpc.DialOption
address string
writeChunkByFiler bool
}
func init() {
@ -42,26 +45,33 @@ func (fs *FilerSink) GetSinkToDirectory() string {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
security.LoadClientTLS(util.GetViper(), "grpc.client"))
security.LoadClientTLS(util.GetViper(), "grpc.client"),
false)
}
func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
fs.filerSource = s
}
func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption) (err error) {
func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
fs.address = address
if fs.address == "" {
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
}
fs.grpcAddress = grpcAddress
fs.dir = dir
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.grpcDialOption = grpcDialOption
fs.writeChunkByFiler = writeChunkByFiler
return nil
}

19
weed/replication/source/filer_source.go

@ -25,19 +25,28 @@ type FilerSource struct {
grpcAddress string
grpcDialOption grpc.DialOption
Dir string
address string
proxyByFiler bool
}
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
false,
)
}
func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) {
func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
fs.address = address
if fs.address == "" {
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
}
fs.grpcAddress = grpcAddress
fs.Dir = dir
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
fs.proxyByFiler = readChunkFromFiler
return nil
}
@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return
}
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
if fs.proxyByFiler {
return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId)
}
fileUrls, err := fs.LookupFileId(part)
fileUrls, err := fs.LookupFileId(fileId)
if err != nil {
return "", nil, nil, err
}

24
weed/server/filer_server_handlers.go

@ -3,30 +3,38 @@ package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// proxy to volume servers
var fileId string
if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
fileId = r.RequestURI[len("/?proxyChunkId="):]
}
if fileId != "" {
stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
fs.proxyToVolumeServer(w,r,fileId)
stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
return
}
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
if r.Header.Get("Origin") != "" {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}
start := time.Now()
switch r.Method {
case "GET":
fileId := r.FormValue("proxyToFileId")
if fileId != "" {
stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
fs.proxyToVolumeServer(w,r,fileId)
stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
} else {
stats.FilerRequestCounter.WithLabelValues("get").Inc()
fs.GetOrHeadHandler(w, r, true)
stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
}
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)

Loading…
Cancel
Save