diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index ac7ae4e4c..e5356472a 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -1,7 +1,6 @@ package filer2 import ( - "bytes" "fmt" "strings" "time" @@ -79,51 +78,3 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { glog.V(0).Infof("log write failed %s: %v", targetFile, err) } } - -func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) { - - var bytesBuf *bytes.Buffer - bytesBuf = f.MetaLogBuffer.ReadFromBuffer(lastReadTime) - if bytesBuf == nil { - return - } - defer f.MetaLogBuffer.ReleaseMeory(bytesBuf) - buf := bytesBuf.Bytes() - var processedTs int64 - - for pos := 0; pos+4 < len(buf); { - - size := util.BytesToUint32(buf[pos : pos+4]) - entryData := buf[pos+4 : pos+4+int(size)] - - logEntry := &filer_pb.LogEntry{} - err = proto.Unmarshal(entryData, logEntry) - if err != nil { - glog.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err) - return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err) - } - - event := &filer_pb.SubscribeMetadataResponse{} - err = proto.Unmarshal(logEntry.Data, event) - if err != nil { - glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - } - - err = eachEventFn(event.Directory, event.EventNotification) - - processedTs = logEntry.TsNs - - if err != nil { - newLastReadTime = time.Unix(0, processedTs) - return - } - - pos += 4 + int(size) - - } - - newLastReadTime = time.Unix(0, processedTs) - return - -} diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index e3de57145..6dd423007 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -1,9 +1,12 @@ package weed_server import ( + "fmt" "strings" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -23,48 +26,58 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, req.SinceNs) } - var readErr error - for { - - lastReadTime, readErr = fs.filer.ReadLogBuffer(lastReadTime, func(dirPath string, eventNotification *filer_pb.EventNotification) error { - - // get complete path to the file or directory - var entryName string - if eventNotification.OldEntry != nil { - entryName = eventNotification.OldEntry.Name - } else if eventNotification.NewEntry != nil { - entryName = eventNotification.NewEntry.Name - } - - fullpath := util.Join(dirPath, entryName) - - // skip on filer internal meta logs - if strings.HasPrefix(fullpath, filer2.SystemLogDir) { - return nil - } - - if !strings.HasPrefix(fullpath, req.PathPrefix) { - return nil - } - - message := &filer_pb.SubscribeMetadataResponse{ - Directory: dirPath, - EventNotification: eventNotification, - } - if err := stream.Send(message); err != nil { - return err - } + eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification) error { + + // get complete path to the file or directory + var entryName string + if eventNotification.OldEntry != nil { + entryName = eventNotification.OldEntry.Name + } else if eventNotification.NewEntry != nil { + entryName = eventNotification.NewEntry.Name + } + + fullpath := util.Join(dirPath, entryName) + + // skip on filer internal meta logs + if strings.HasPrefix(fullpath, filer2.SystemLogDir) { return nil - }) - if readErr != nil { - glog.V(0).Infof("=> client %v: %+v", clientName, readErr) - return readErr } + if !strings.HasPrefix(fullpath, req.PathPrefix) { + return nil + } + + message := &filer_pb.SubscribeMetadataResponse{ + Directory: dirPath, + EventNotification: eventNotification, + } + if err := stream.Send(message); err != nil { + glog.V(0).Infof("=> client %v: %+v", clientName, err) + return err + } + return nil + } + + _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() - } + return true + }, func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification); err != nil { + return err + } + + return nil + }) + + return err }