| 
						
						
							
								
							
						
						
					 | 
				
				 | 
				
					@ -5,19 +5,28 @@ import ( | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"fmt" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"github.com/chrislusf/seaweedfs/weed/glog" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"github.com/chrislusf/seaweedfs/weed/util" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"google.golang.org/grpc" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"io" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						"time" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					type EventErrorType int | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					const ( | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						TrivialOnError EventErrorType = iota | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						FatalOnError | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						RetryForeverOnError | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, fatalOnError bool) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						if err != nil { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							return fmt.Errorf("subscribing filer meta change: %v", err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						} | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				
					@ -26,10 +35,10 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						clientName string, clientId int32, pathPrefix string, lastTsNs *int64, untilTsNs int64, selfSignature int32, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, fatalOnError bool) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						if err != nil { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							return fmt.Errorf("subscribing filer meta change: %v", err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						} | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				
					@ -38,7 +47,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
						return func(client filer_pb.SeaweedFilerClient) error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							ctx, cancel := context.WithCancel(context.Background()) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
							defer cancel() | 
				
			
			
		
	
	
		
			
				
					| 
						
							
								
							
						
						
							
								
							
						
						
					 | 
				
				 | 
				
					@ -65,9 +74,19 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix str | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
								} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
								if err := processEventFn(resp); err != nil { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									if fatalOnError { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									switch eventErrorType { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									case TrivialOnError: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										glog.Errorf("process %v: %v", resp, err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									case FatalOnError: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										glog.Fatalf("process %v: %v", resp, err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									} else { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									case RetryForeverOnError: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										util.RetryForever("followMetaUpdates", func() error { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
											return processEventFn(resp) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										}, func(err error) bool { | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
											glog.Errorf("process %v: %v", resp, err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
											return true | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									default: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
										glog.Errorf("process %v: %v", resp, err) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
									} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				
								} | 
				
			
			
		
	
	
		
			
				
					| 
						
							
								
							
						
						
						
					 | 
				
				 | 
				
					
  |