diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 428589975..014eb19e9 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/queue" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -35,7 +36,7 @@ type Filer struct { DirQueuesPath string buckets *FilerBuckets Cipher bool - metaLogBuffer *LogBuffer + metaLogBuffer *queue.LogBuffer } func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer { @@ -45,7 +46,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, } - f.metaLogBuffer = NewLogBuffer(time.Minute, f.logFlushFunc) + f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc) go f.loopProcessingDeletion() @@ -312,5 +313,6 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { } func (f *Filer) Shutdown() { + f.metaLogBuffer.Shutdown() f.store.Shutdown() } diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 54c3d0c48..e808e45f0 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -3,7 +3,6 @@ package filer2 import ( "fmt" "strings" - "sync" "time" "github.com/golang/protobuf/proto" @@ -78,72 +77,3 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { } } -type LogBuffer struct { - buf []byte - pos int - startTime time.Time - stopTime time.Time - sizeBuf []byte - flushInterval time.Duration - flushFn func(startTime, stopTime time.Time, buf []byte) - sync.Mutex -} - -func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer { - lb := &LogBuffer{ - buf: make([]byte, 4*0124*1024), - sizeBuf: make([]byte, 4), - flushInterval: 2 * time.Second, // flushInterval, - flushFn: flushFn, - } - go lb.loopFlush() - return lb -} - -func (m *LogBuffer) loopFlush() { - for { - m.Lock() - m.flush() - m.Unlock() - time.Sleep(m.flushInterval) - } -} - -func (m *LogBuffer) flush() { - if m.flushFn != nil && m.pos > 0 { - m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos]) - m.pos = 0 - } -} - -func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { - - logEntry := &filer_pb.LogEntry{ - TsNs: ts.UnixNano(), - PartitionKeyHash: util.HashToInt32(key), - Data: data, - } - - logEntryData, _ := proto.Marshal(logEntry) - - size := len(logEntryData) - - m.Lock() - defer m.Unlock() - - if m.pos == 0 { - m.startTime = ts - } - - if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { - m.flush() - m.startTime = ts - } - m.stopTime = ts - - util.Uint32toBytes(m.sizeBuf, uint32(size)) - copy(m.buf[m.pos:m.pos+4], m.sizeBuf) - - copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) - m.pos += size + 4 -} diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go new file mode 100644 index 000000000..d6ccdf2a6 --- /dev/null +++ b/weed/queue/log_buffer.go @@ -0,0 +1,92 @@ +package queue + +import ( + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type LogBuffer struct { + buf []byte + pos int + startTime time.Time + stopTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn func(startTime, stopTime time.Time, buf []byte) + isStopping bool + sync.Mutex +} + +func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer { + lb := &LogBuffer{ + buf: make([]byte, 4*0124*1024), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + } + go lb.loopFlush() + return lb +} + +func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) { + + logEntry := &filer_pb.LogEntry{ + TsNs: ts.UnixNano(), + PartitionKeyHash: util.HashToInt32(key), + Data: data, + } + + logEntryData, _ := proto.Marshal(logEntry) + + size := len(logEntryData) + + m.Lock() + defer m.Unlock() + + if m.pos == 0 { + m.startTime = ts + } + + if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { + m.flush() + m.startTime = ts + } + m.stopTime = ts + + util.Uint32toBytes(m.sizeBuf, uint32(size)) + copy(m.buf[m.pos:m.pos+4], m.sizeBuf) + + copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) + m.pos += size + 4 +} + +func (m *LogBuffer) Shutdown() { + if m.isStopping { + return + } + m.isStopping = true + m.Lock() + m.flush() + m.Unlock() +} + +func (m *LogBuffer) loopFlush() { + for !m.isStopping { + m.Lock() + m.flush() + m.Unlock() + time.Sleep(m.flushInterval) + } +} + +func (m *LogBuffer) flush() { + if m.flushFn != nil && m.pos > 0 { + m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos]) + m.pos = 0 + } +}