Browse Source

refactoring

pull/1287/head
Chris Lu 5 years ago
parent
commit
2fc2eb74dd
  1. 6
      weed/filer2/filer.go
  2. 70
      weed/filer2/filer_notify.go
  3. 92
      weed/queue/log_buffer.go

6
weed/filer2/filer.go

@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/queue"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
) )
@ -35,7 +36,7 @@ type Filer struct {
DirQueuesPath string DirQueuesPath string
buckets *FilerBuckets buckets *FilerBuckets
Cipher bool Cipher bool
metaLogBuffer *LogBuffer
metaLogBuffer *queue.LogBuffer
} }
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer { func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer {
@ -45,7 +46,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }
f.metaLogBuffer = NewLogBuffer(time.Minute, f.logFlushFunc)
f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc)
go f.loopProcessingDeletion() go f.loopProcessingDeletion()
@ -312,5 +313,6 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
} }
func (f *Filer) Shutdown() { func (f *Filer) Shutdown() {
f.metaLogBuffer.Shutdown()
f.store.Shutdown() f.store.Shutdown()
} }

70
weed/filer2/filer_notify.go

@ -3,7 +3,6 @@ package filer2
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -78,72 +77,3 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
} }
} }
type LogBuffer struct {
buf []byte
pos int
startTime time.Time
stopTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
sync.Mutex
}
func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
lb := &LogBuffer{
buf: make([]byte, 4*0124*1024),
sizeBuf: make([]byte, 4),
flushInterval: 2 * time.Second, // flushInterval,
flushFn: flushFn,
}
go lb.loopFlush()
return lb
}
func (m *LogBuffer) loopFlush() {
for {
m.Lock()
m.flush()
m.Unlock()
time.Sleep(m.flushInterval)
}
}
func (m *LogBuffer) flush() {
if m.flushFn != nil && m.pos > 0 {
m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
m.pos = 0
}
}
func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(),
PartitionKeyHash: util.HashToInt32(key),
Data: data,
}
logEntryData, _ := proto.Marshal(logEntry)
size := len(logEntryData)
m.Lock()
defer m.Unlock()
if m.pos == 0 {
m.startTime = ts
}
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
m.flush()
m.startTime = ts
}
m.stopTime = ts
util.Uint32toBytes(m.sizeBuf, uint32(size))
copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
m.pos += size + 4
}

92
weed/queue/log_buffer.go

@ -0,0 +1,92 @@
package queue
import (
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
type LogBuffer struct {
buf []byte
pos int
startTime time.Time
stopTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
isStopping bool
sync.Mutex
}
func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
lb := &LogBuffer{
buf: make([]byte, 4*0124*1024),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
}
go lb.loopFlush()
return lb
}
func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(),
PartitionKeyHash: util.HashToInt32(key),
Data: data,
}
logEntryData, _ := proto.Marshal(logEntry)
size := len(logEntryData)
m.Lock()
defer m.Unlock()
if m.pos == 0 {
m.startTime = ts
}
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
m.flush()
m.startTime = ts
}
m.stopTime = ts
util.Uint32toBytes(m.sizeBuf, uint32(size))
copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
m.pos += size + 4
}
func (m *LogBuffer) Shutdown() {
if m.isStopping {
return
}
m.isStopping = true
m.Lock()
m.flush()
m.Unlock()
}
func (m *LogBuffer) loopFlush() {
for !m.isStopping {
m.Lock()
m.flush()
m.Unlock()
time.Sleep(m.flushInterval)
}
}
func (m *LogBuffer) flush() {
if m.flushFn != nil && m.pos > 0 {
m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
m.pos = 0
}
}
Loading…
Cancel
Save