|
|
@ -5,19 +5,28 @@ import ( |
|
|
|
"fmt" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
"google.golang.org/grpc" |
|
|
|
"io" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
type EventErrorType int |
|
|
|
|
|
|
|
const ( |
|
|
|
TrivialOnError EventErrorType = iota |
|
|
|
FatalOnError |
|
|
|
RetryForeverOnError |
|
|
|
) |
|
|
|
|
|
|
|
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error |
|
|
|
|
|
|
|
func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, |
|
|
|
pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32, |
|
|
|
processEventFn ProcessMetadataFunc, fatalOnError bool) error { |
|
|
|
processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { |
|
|
|
|
|
|
|
err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, |
|
|
|
pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) |
|
|
|
pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("subscribing filer meta change: %v", err) |
|
|
|
} |
|
|
@ -26,10 +35,10 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, |
|
|
|
|
|
|
|
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, |
|
|
|
clientName string, clientId int32, pathPrefix string, lastTsNs *int64, untilTsNs int64, selfSignature int32, |
|
|
|
processEventFn ProcessMetadataFunc, fatalOnError bool) error { |
|
|
|
processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { |
|
|
|
|
|
|
|
err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, |
|
|
|
pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) |
|
|
|
pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("subscribing filer meta change: %v", err) |
|
|
|
} |
|
|
@ -38,7 +47,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, |
|
|
|
} |
|
|
|
|
|
|
|
func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, |
|
|
|
processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
return func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
defer cancel() |
|
|
@ -65,9 +74,19 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix str |
|
|
|
} |
|
|
|
|
|
|
|
if err := processEventFn(resp); err != nil { |
|
|
|
if fatalOnError { |
|
|
|
switch eventErrorType { |
|
|
|
case TrivialOnError: |
|
|
|
glog.Errorf("process %v: %v", resp, err) |
|
|
|
case FatalOnError: |
|
|
|
glog.Fatalf("process %v: %v", resp, err) |
|
|
|
} else { |
|
|
|
case RetryForeverOnError: |
|
|
|
util.RetryForever("followMetaUpdates", func() error { |
|
|
|
return processEventFn(resp) |
|
|
|
}, func(err error) bool { |
|
|
|
glog.Errorf("process %v: %v", resp, err) |
|
|
|
return true |
|
|
|
}) |
|
|
|
default: |
|
|
|
glog.Errorf("process %v: %v", resp, err) |
|
|
|
} |
|
|
|
} |
|
|
|