|
|
@ -1,9 +1,12 @@ |
|
|
|
package weed_server |
|
|
|
|
|
|
|
import ( |
|
|
|
"fmt" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/golang/protobuf/proto" |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer2" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
@ -23,48 +26,58 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
|
lastReadTime = time.Unix(0, req.SinceNs) |
|
|
|
} |
|
|
|
|
|
|
|
var readErr error |
|
|
|
for { |
|
|
|
|
|
|
|
lastReadTime, readErr = fs.filer.ReadLogBuffer(lastReadTime, func(dirPath string, eventNotification *filer_pb.EventNotification) error { |
|
|
|
|
|
|
|
// get complete path to the file or directory
|
|
|
|
var entryName string |
|
|
|
if eventNotification.OldEntry != nil { |
|
|
|
entryName = eventNotification.OldEntry.Name |
|
|
|
} else if eventNotification.NewEntry != nil { |
|
|
|
entryName = eventNotification.NewEntry.Name |
|
|
|
} |
|
|
|
|
|
|
|
fullpath := util.Join(dirPath, entryName) |
|
|
|
|
|
|
|
// skip on filer internal meta logs
|
|
|
|
if strings.HasPrefix(fullpath, filer2.SystemLogDir) { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
if !strings.HasPrefix(fullpath, req.PathPrefix) { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
message := &filer_pb.SubscribeMetadataResponse{ |
|
|
|
Directory: dirPath, |
|
|
|
EventNotification: eventNotification, |
|
|
|
} |
|
|
|
if err := stream.Send(message); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification) error { |
|
|
|
|
|
|
|
// get complete path to the file or directory
|
|
|
|
var entryName string |
|
|
|
if eventNotification.OldEntry != nil { |
|
|
|
entryName = eventNotification.OldEntry.Name |
|
|
|
} else if eventNotification.NewEntry != nil { |
|
|
|
entryName = eventNotification.NewEntry.Name |
|
|
|
} |
|
|
|
|
|
|
|
fullpath := util.Join(dirPath, entryName) |
|
|
|
|
|
|
|
// skip on filer internal meta logs
|
|
|
|
if strings.HasPrefix(fullpath, filer2.SystemLogDir) { |
|
|
|
return nil |
|
|
|
}) |
|
|
|
if readErr != nil { |
|
|
|
glog.V(0).Infof("=> client %v: %+v", clientName, readErr) |
|
|
|
return readErr |
|
|
|
} |
|
|
|
|
|
|
|
if !strings.HasPrefix(fullpath, req.PathPrefix) { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
message := &filer_pb.SubscribeMetadataResponse{ |
|
|
|
Directory: dirPath, |
|
|
|
EventNotification: eventNotification, |
|
|
|
} |
|
|
|
if err := stream.Send(message); err != nil { |
|
|
|
glog.V(0).Infof("=> client %v: %+v", clientName, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
_, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { |
|
|
|
fs.listenersLock.Lock() |
|
|
|
fs.listenersCond.Wait() |
|
|
|
fs.listenersLock.Unlock() |
|
|
|
} |
|
|
|
return true |
|
|
|
}, func(logEntry *filer_pb.LogEntry) error { |
|
|
|
event := &filer_pb.SubscribeMetadataResponse{} |
|
|
|
if err := proto.Unmarshal(logEntry.Data, event); err != nil { |
|
|
|
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) |
|
|
|
return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
if err := eachEventNotificationFn(event.Directory, event.EventNotification); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|