Browse Source

read meta logs by timestamp

pass in event ts when moving logs
meta aggregator reads in memory logs only
pull/1444/head
Chris Lu 4 years ago
parent
commit
b69cb74c03
  1. 7
      weed/filer2/filer.go
  2. 4
      weed/filer2/filer_notify.go
  3. 8
      weed/filer2/meta_aggregator.go
  4. 2
      weed/messaging/broker/broker_grpc_server_publish.go
  5. 21
      weed/util/log_buffer/log_buffer.go
  6. 2
      weed/util/log_buffer/log_buffer_test.go

7
weed/filer2/filer.go

@ -16,7 +16,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
) )
const PaginationSize = 1024 * 256
const (
LogFlushInterval = time.Minute
PaginationSize = 1024 * 256
)
var ( var (
OS_UID = uint32(os.Getuid()) OS_UID = uint32(os.Getuid())
@ -47,7 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
Signature: util.RandomInt32(), Signature: util.RandomInt32(),
} }
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection f.metaLogCollection = collection
f.metaLogReplication = replication f.metaLogReplication = replication

4
weed/filer2/filer_notify.go

@ -68,7 +68,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
return return
} }
f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data)
f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
} }
@ -119,7 +119,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close() chunkedFileReader.Close()
if err == io.EOF { if err == io.EOF {
break
continue
} }
return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
} }

8
weed/filer2/meta_aggregator.go

@ -25,13 +25,15 @@ type MetaAggregator struct {
ListenersCond *sync.Cond ListenersCond *sync.Cond
} }
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
// The old data comes from what each LocalMetadata persisted on disk.
func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{ t := &MetaAggregator{
filers: filers, filers: filers,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
} }
t.ListenersCond = sync.NewCond(&t.ListenersLock) t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() {
t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
t.ListenersCond.Broadcast() t.ListenersCond.Broadcast()
}) })
return t return t
@ -48,7 +50,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now() lastPersistTime := time.Now()
changesSinceLastPersist := 0 changesSinceLastPersist := 0
lastTsNs := int64(0)
lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
MaxChangeLimit := 100 MaxChangeLimit := 100
@ -88,7 +90,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
} }
dir := event.Directory dir := event.Directory
// println("received meta change", dir, "size", len(data)) // println("received meta change", dir, "size", len(data))
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
if maybeReplicateMetadataChange != nil { if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event) maybeReplicateMetadataChange(event)
} }

2
weed/messaging/broker/broker_grpc_server_publish.go

@ -85,7 +85,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
continue continue
} }
tl.logBuffer.AddToBuffer(in.Data.Key, data)
tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
if in.Data.IsClose { if in.Data.IsClose {
// println("server received closing") // println("server received closing")

21
weed/util/log_buffer/log_buffer.go

@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb return lb
} }
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
m.Lock() m.Lock()
defer func() { defer func() {
@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}() }()
// need to put the timestamp inside the lock // need to put the timestamp inside the lock
ts := time.Now()
tsNs := ts.UnixNano()
if m.lastTsNs >= tsNs {
var ts time.Time
if eventTsNs == 0 {
ts = time.Now()
eventTsNs = ts.UnixNano()
} else {
ts = time.Unix(0, eventTsNs)
}
if m.lastTsNs >= eventTsNs {
// this is unlikely to happen, but just in case // this is unlikely to happen, but just in case
tsNs = m.lastTsNs + 1
ts = time.Unix(0, tsNs)
eventTsNs = m.lastTsNs + 1
ts = time.Unix(0, eventTsNs)
} }
m.lastTsNs = tsNs
m.lastTsNs = eventTsNs
logEntry := &filer_pb.LogEntry{ logEntry := &filer_pb.LogEntry{
TsNs: tsNs,
TsNs: eventTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey), PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data, Data: data,
} }

2
weed/util/log_buffer/log_buffer_test.go

@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize) var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ { for i := 0; i < messageCount; i++ {
rand.Read(buf) rand.Read(buf)
lb.AddToBuffer(nil, buf)
lb.AddToBuffer(nil, buf, 0)
} }
receivedmessageCount := 0 receivedmessageCount := 0

Loading…
Cancel
Save