Browse Source

Merge branch 'master' into mq-subscribe

pull/5637/head
chrislu 1 year ago
parent
commit
fbb4917e2f
  1. 4
      k8s/charts/seaweedfs/Chart.yaml
  2. 10
      weed/command/filer_sync.go
  3. 2
      weed/util/constants.go

4
k8s/charts/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
appVersion: "3.60"
version: 3.60.1
appVersion: "3.61"
version: 3.61.0

10
weed/command/filer_sync.go

@ -47,6 +47,8 @@ type SyncOptions struct {
metricsHttpIp *string metricsHttpIp *string
metricsHttpPort *int metricsHttpPort *int
concurrency *int concurrency *int
aDoDeleteFiles *bool
bDoDeleteFiles *bool
clientId int32 clientId int32
clientEpoch int32 clientEpoch int32
} }
@ -90,6 +92,8 @@ func init() {
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip") syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
syncOptions.clientId = util.RandomInt32() syncOptions.clientId = util.RandomInt32()
} }
@ -164,6 +168,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.bDiskType, *syncOptions.bDiskType,
*syncOptions.bDebug, *syncOptions.bDebug,
*syncOptions.concurrency, *syncOptions.concurrency,
*syncOptions.bDoDeleteFiles,
aFilerSignature, aFilerSignature,
bFilerSignature) bFilerSignature)
if err != nil { if err != nil {
@ -201,6 +206,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.aDiskType, *syncOptions.aDiskType,
*syncOptions.aDebug, *syncOptions.aDebug,
*syncOptions.concurrency, *syncOptions.concurrency,
*syncOptions.aDoDeleteFiles,
bFilerSignature, bFilerSignature,
aFilerSignature) aFilerSignature)
if err != nil { if err != nil {
@ -233,7 +239,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
} }
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error {
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now // if first time, start from now
// if has previously synced, resume from that point of time // if has previously synced, resume from that point of time
@ -251,7 +257,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource) filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, true, debug)
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification message := resp.EventNotification

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.60)
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.61)
VERSION = sizeLimit + " " + VERSION_NUMBER VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = "" COMMIT = ""
) )

Loading…
Cancel
Save