|
|
@ -2,7 +2,6 @@ package weed_server |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util/log_buffer" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
@ -12,6 +11,12 @@ import ( |
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util/log_buffer" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
// MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
|
|
|
|
MaxUnsyncedEvents = 1e3 |
|
|
|
) |
|
|
|
|
|
|
|
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { |
|
|
@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
|
lastReadTime := time.Unix(0, req.SinceNs) |
|
|
|
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
|
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) |
|
|
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) |
|
|
|
|
|
|
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) |
|
|
|
|
|
|
@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq |
|
|
|
lastReadTime := time.Unix(0, req.SinceNs) |
|
|
|
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
|
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) |
|
|
|
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) |
|
|
|
|
|
|
|
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) |
|
|
|
|
|
|
@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { |
|
|
|
func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { |
|
|
|
filtered := 0 |
|
|
|
|
|
|
|
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { |
|
|
|
defer func() { |
|
|
|
if filtered > MaxUnsyncedEvents { |
|
|
|
if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ |
|
|
|
EventNotification: &filer_pb.EventNotification{}, |
|
|
|
TsNs: tsNs, |
|
|
|
}); err == nil { |
|
|
|
filtered = 0 |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
filtered++ |
|
|
|
foundSelf := false |
|
|
|
for _, sig := range eventNotification.Signatures { |
|
|
|
if sig == clientSignature && clientSignature != 0 { |
|
|
|
if sig == req.Signature && req.Signature != 0 { |
|
|
|
return nil |
|
|
|
} |
|
|
|
if sig == fs.filer.Signature { |
|
|
@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe |
|
|
|
glog.V(0).Infof("=> client %v: %+v", clientName, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
filtered = 0 |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|