Browse Source

refactor

pull/1293/head
Chris Lu 5 years ago
parent
commit
2955b96ef1
  1. 6
      weed/filer2/filer.go
  2. 6
      weed/filer2/filer_notify.go
  3. 1
      weed/server/filer_grpc_server_listen.go
  4. 3
      weed/util/log_buffer/log_read.go

6
weed/filer2/filer.go

@ -35,7 +35,7 @@ type Filer struct {
FsyncBuckets []string FsyncBuckets []string
buckets *FilerBuckets buckets *FilerBuckets
Cipher bool Cipher bool
metaLogBuffer *log_buffer.LogBuffer
MetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string metaLogCollection string
metaLogReplication string metaLogReplication string
} }
@ -47,7 +47,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }
f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection f.metaLogCollection = collection
f.metaLogReplication = replication f.metaLogReplication = replication
@ -316,6 +316,6 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
} }
func (f *Filer) Shutdown() { func (f *Filer) Shutdown() {
f.metaLogBuffer.Shutdown()
f.MetaLogBuffer.Shutdown()
f.store.Shutdown() f.store.Shutdown()
} }

6
weed/filer2/filer_notify.go

@ -64,7 +64,7 @@ func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventN
return return
} }
f.metaLogBuffer.AddToBuffer([]byte(dir), data)
f.MetaLogBuffer.AddToBuffer([]byte(dir), data)
} }
@ -83,11 +83,11 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) { func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) {
var bytesBuf *bytes.Buffer var bytesBuf *bytes.Buffer
bytesBuf = f.metaLogBuffer.ReadFromBuffer(lastReadTime)
bytesBuf = f.MetaLogBuffer.ReadFromBuffer(lastReadTime)
if bytesBuf == nil { if bytesBuf == nil {
return return
} }
defer f.metaLogBuffer.ReleaseMeory(bytesBuf)
defer f.MetaLogBuffer.ReleaseMeory(bytesBuf)
buf := bytesBuf.Bytes() buf := bytesBuf.Bytes()
var processedTs int64 var processedTs int64

1
weed/server/filer_grpc_server_listen.go

@ -22,6 +22,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if req.SinceNs > 0 { if req.SinceNs > 0 {
lastReadTime = time.Unix(0, req.SinceNs) lastReadTime = time.Unix(0, req.SinceNs)
} }
var readErr error var readErr error
for { for {

3
weed/util/log_buffer/log_read.go

@ -12,7 +12,8 @@ import (
) )
func (logBuffer *LogBuffer) LoopProcessLogData( func (logBuffer *LogBuffer) LoopProcessLogData(
startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) {
startTreadTime time.Time, waitForDataFn func() bool,
eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) {
// loop through all messages // loop through all messages
var bytesBuf *bytes.Buffer var bytesBuf *bytes.Buffer
lastReadTime := startTreadTime lastReadTime := startTreadTime

Loading…
Cancel
Save