Browse Source

Fix 6181/6182 (#6183)

* set larger buf size for LogBuffer

* jump to next day when no more entry found

* Update weed/filer/filer_notify_read.go

---------

Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
pull/6188/head
Bruce 2 months ago
committed by GitHub
parent
commit
0060a2cf9c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 22
      weed/filer/filer_notify_read.go
  2. 19
      weed/server/filer_grpc_server_sub_meta.go
  3. 6
      weed/util/log_buffer/log_buffer.go
  4. 13
      weed/util/time.go

22
weed/filer/filer_notify_read.go

@ -4,15 +4,16 @@ import (
"container/heap"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/protobuf/proto"
"io"
"math"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@ -39,6 +40,19 @@ func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePositi
}
func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) {
startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "")
if listDayErr != nil {
return false, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
if len(dayEntries) == 0 {
return false, nil
}
return true, nil
}
// ----------
type LogEntryItem struct {
Entry *filer_pb.LogEntry

19
weed/server/filer_grpc_server_sub_meta.go

@ -2,11 +2,12 @@ package weed_server
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"strings"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs)
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
position := log_buffer.NewMessagePosition(nextDayTs, -2)
found, err := fs.filer.HasPersistedLogFiles(position)
if err != nil {
return fmt.Errorf("checking persisted log files: %v", err)
}
if found {
lastReadTime = position
}
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
return false
}
return true
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {

6
weed/util/log_buffer/log_buffer.go

@ -2,7 +2,6 @@ package log_buffer
import (
"bytes"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"sync/atomic"
"time"
@ -11,11 +10,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const BufferSize = 4 * 1024 * 1024
const PreviousBufferCount = 3
const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 32
type dataToFlush struct {
startTime time.Time

13
weed/util/time.go

@ -0,0 +1,13 @@
package util
import (
"time"
)
func GetNextDayTsNano(curTs int64) int64 {
curTime := time.Unix(0, curTs)
nextDay := curTime.AddDate(0, 0, 1).Truncate(24 * time.Hour)
nextDayNano := nextDay.UnixNano()
return nextDayNano
}
Loading…
Cancel
Save