diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 008fd33a7..eba2a044a 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -110,7 +110,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c2158e7eb..a6d94670a 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -56,7 +56,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { m.Lock() defer func() { @@ -68,20 +68,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { // need to put the timestamp inside the lock var ts time.Time - if eventTsNs == 0 { + if processingTsNs == 0 { ts = time.Now() - eventTsNs = ts.UnixNano() + processingTsNs = ts.UnixNano() } else { - ts = time.Unix(0, eventTsNs) + ts = time.Unix(0, processingTsNs) } - if m.lastTsNs >= eventTsNs { + if m.lastTsNs >= processingTsNs { // this is unlikely to happen, but just in case - eventTsNs = m.lastTsNs + 1 - ts = time.Unix(0, eventTsNs) + processingTsNs = m.lastTsNs + 1 + ts = time.Unix(0, processingTsNs) } - m.lastTsNs = eventTsNs + m.lastTsNs = processingTsNs logEntry := &filer_pb.LogEntry{ - TsNs: eventTsNs, + TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, }