Browse Source

go fmt

pull/5637/head
chrislu 10 months ago
parent
commit
1b4484bf0a
  1. 20
      weed/filer/reader_cache.go
  2. 2
      weed/mq/broker/broker_grpc_sub.go
  3. 8
      weed/mq/broker/broker_topic_partition_read_write.go
  4. 26
      weed/mq/broker/broker_write.go
  5. 4
      weed/mq/client/cmd/weed_pub/publisher.go
  6. 1
      weed/mq/client/pub_client/publish.go
  7. 2
      weed/mq/client/pub_client/publisher.go
  8. 6
      weed/mq/pub_balancer/balancer.go
  9. 12
      weed/mq/pub_balancer/broker_stats.go
  10. 19
      weed/mq/sub_coordinator/consumer_group.go
  11. 6
      weed/mq/sub_coordinator/coordinator.go
  12. 4
      weed/mq/topic/local_manager.go
  13. 1
      weed/mq/topic/local_partition.go
  14. 4
      weed/pb/filer_pb/filer.pb.go
  15. 2
      weed/pb/filer_pb/filer_grpc.pb.go
  16. 4
      weed/pb/iam_pb/iam.pb.go
  17. 2
      weed/pb/iam_pb/iam_grpc.pb.go
  18. 4
      weed/pb/master_pb/master.pb.go
  19. 2
      weed/pb/master_pb/master_grpc.pb.go
  20. 4
      weed/pb/mount_pb/mount.pb.go
  21. 2
      weed/pb/mount_pb/mount_grpc.pb.go
  22. 2
      weed/pb/mq_pb/mq_grpc.pb.go
  23. 4
      weed/pb/remote_pb/remote.pb.go
  24. 4
      weed/pb/s3_pb/s3.pb.go
  25. 2
      weed/pb/s3_pb/s3_grpc.pb.go
  26. 4
      weed/pb/volume_server_pb/volume_server.pb.go
  27. 2
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  28. 14
      weed/util/buffered_queue/buffered_queue.go
  29. 56
      weed/util/log_buffer/log_buffer.go
  30. 2
      weed/util/log_buffer/log_read.go
  31. 8
      weed/util/log_buffer/sealed_buffer.go

20
weed/filer/reader_cache.go

@ -23,16 +23,16 @@ type ReaderCache struct {
type SingleChunkCacher struct {
completedTimeNew int64
sync.Mutex
parent *ReaderCache
chunkFileId string
data []byte
err error
cipherKey []byte
isGzipped bool
chunkSize int
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
parent *ReaderCache
chunkFileId string
data []byte
err error
cipherKey []byte
isGzipped bool
chunkSize int
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {

2
weed/mq/broker/broker_grpc_sub.go

@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
}()
var startPosition log_buffer.MessagePosition
if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
offset := req.GetInit().GetPartitionOffset()
if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)

8
weed/mq/broker/broker_topic_partition_read_write.go

@ -26,7 +26,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
startTime, stopTime = startTime.UTC(), stopTime.UTC()
targetFile := fmt.Sprintf("%s/%s",partitionDir, startTime.Format(topic.TIME_FORMAT))
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
// TODO append block with more metadata
@ -50,7 +50,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
return b.MasterClient.LookupFileId(fileId)
}
eachChunkFn := func (buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
@ -99,7 +99,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
if chunk.Size == 0 {
continue
}
if chunk.IsChunkManifest{
if chunk.IsChunkManifest {
glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
return
}
@ -145,7 +145,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
if entry.IsDirectory {
return nil
}
if stopTsNs!=0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
isDone = true
return nil
}

26
weed/mq/broker/broker_write.go

@ -24,14 +24,14 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
var offset int64 = 0
if err == filer_pb.ErrNotFound {
entry = &filer_pb.Entry{
Name: name,
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
FileMode: uint32(os.FileMode(0644)),
Uid: uint32(os.Getuid()),
Gid: uint32(os.Getgid()),
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
FileMode: uint32(os.FileMode(0644)),
Uid: uint32(os.Getuid()),
Gid: uint32(os.Getgid()),
},
}
} else if err != nil {
@ -45,11 +45,11 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
// update the entry
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: entry,
})
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: entry,
})
})
}
func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
@ -63,11 +63,11 @@ func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fi
Collection: "topics",
// TtlSec: wfs.option.TtlSec,
// DiskType: string(wfs.option.DiskType),
DataCenter: b.option.DataCenter,
Path: targetFile,
DataCenter: b.option.DataCenter,
Path: targetFile,
},
&operation.UploadOption{
Cipher: b.option.Cipher,
Cipher: b.option.Cipher,
},
func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)

4
weed/mq/client/cmd/weed_pub/publisher.go

@ -12,8 +12,8 @@ import (
)
var (
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrent publishers")
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
namespace = flag.String("ns", "test", "namespace")

1
weed/mq/client/pub_client/publish.go

@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {

2
weed/mq/client/pub_client/publisher.go

@ -16,7 +16,7 @@ type PublisherConfiguration struct {
Topic topic.Topic
CreateTopic bool
CreateTopicPartitionCount int32
Brokers []string
Brokers []string
}
type PublishClient struct {

6
weed/mq/pub_balancer/balancer.go

@ -31,10 +31,10 @@ const (
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
OnAddBroker func(broker string, brokerStats *BrokerStats)
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
OnAddBroker func(broker string, brokerStats *BrokerStats)
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
func NewBalancer() *Balancer {

12
weed/mq/pub_balancer/broker_stats.go

@ -39,11 +39,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
Partition: topic.Partition{
RangeStart: topicPartitionStats.Partition.RangeStart,
RangeStop: topicPartitionStats.Partition.RangeStop,
RingSize: topicPartitionStats.Partition.RingSize,
RangeStop: topicPartitionStats.Partition.RangeStop,
RingSize: topicPartitionStats.Partition.RingSize,
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
@ -66,11 +66,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
Partition: topic.Partition{
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
RingSize: partition.RingSize,
RangeStop: partition.RangeStop,
RingSize: partition.RingSize,
UnixTimeNs: partition.UnixTimeNs,
},
},

19
weed/mq/sub_coordinator/consumer_group.go

@ -16,12 +16,12 @@ type ConsumerGroupInstance struct {
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
}
type ConsumerGroup struct {
topic topic.Topic
topic topic.Topic
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
pubBalancer *pub_balancer.Balancer
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
pubBalancer *pub_balancer.Balancer
}
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup {
@ -40,13 +40,13 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance)
cg.onConsumerGroupInstanceChange("add consumer instance " + consumerGroupInstance)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance)
cg.onConsumerGroupInstanceChange("remove consumer instance " + consumerGroupInstance)
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
@ -107,9 +107,9 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
for i, partitionSlot := range partitionSlots {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
Partition: &mq_pb.Partition{
RangeStop: partitionSlot.RangeStop,
RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize,
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: partitionSlot.UnixTimeNs,
},
Broker: partitionSlot.Broker,
@ -126,5 +126,4 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
consumerGroupInstance.ResponseChan <- response
}
}

6
weed/mq/sub_coordinator/coordinator.go

@ -31,7 +31,7 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName)
if tcg == nil && createIfMissing{
if tcg == nil && createIfMissing {
tcg = &TopicConsumerGroups{
ConsumerGroups: cmap.New[*ConsumerGroup](),
}
@ -56,14 +56,14 @@ func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string,
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {
cg = NewConsumerGroup(topic, c.balancer)
if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){
if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) {
cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
}
}
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance)
if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){
if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) {
cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
}
}

4
weed/mq/topic/local_manager.go

@ -88,7 +88,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{
Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
Partition: localPartition.Partition,
}
stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
@ -96,7 +96,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
Namespace: string(localTopic.Namespace),
Name: localTopic.Name,
},
Partition: localPartition.Partition.ToPbPartition(),
Partition: localPartition.Partition.ToPbPartition(),
ConsumerCount: localPartition.ConsumerCount,
}
// fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition)

1
weed/mq/topic/local_partition.go

@ -22,6 +22,7 @@ type LocalPartition struct {
}
var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
return &LocalPartition{
Partition: partition,

4
weed/pb/filer_pb/filer.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: filer.proto
package filer_pb

2
weed/pb/filer_pb/filer_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: filer.proto
package filer_pb

4
weed/pb/iam_pb/iam.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: iam.proto
package iam_pb

2
weed/pb/iam_pb/iam_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: iam.proto
package iam_pb

4
weed/pb/master_pb/master.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: master.proto
package master_pb

2
weed/pb/master_pb/master_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: master.proto
package master_pb

4
weed/pb/mount_pb/mount.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: mount.proto
package mount_pb

2
weed/pb/mount_pb/mount_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: mount.proto
package mount_pb

2
weed/pb/mq_pb/mq_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: mq.proto
package mq_pb

4
weed/pb/remote_pb/remote.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: remote.proto
package remote_pb

4
weed/pb/s3_pb/s3.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: s3.proto
package s3_pb

2
weed/pb/s3_pb/s3_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: s3.proto
package s3_pb

4
weed/pb/volume_server_pb/volume_server.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.2
// protoc-gen-go v1.31.0
// protoc v4.25.3
// source: volume_server.proto
package volume_server_pb

2
weed/pb/volume_server_pb/volume_server_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.2
// - protoc v4.25.3
// source: volume_server.proto
package volume_server_pb

14
weed/util/buffered_queue/buffered_queue.go

@ -32,12 +32,12 @@ func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
chunkSize: chunkSize,
head: chunk,
tail: chunk,
last: chunk,
count: 0,
mutex: sync.Mutex{},
chunkSize: chunkSize,
head: chunk,
tail: chunk,
last: chunk,
count: 0,
mutex: sync.Mutex{},
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
@ -87,7 +87,7 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count <= 0 && !q.isClosed {
for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.count <= 0 && q.isClosed {

56
weed/util/log_buffer/log_buffer.go

@ -27,39 +27,39 @@ type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
name string
prevBuffers *SealedBuffers
buf []byte
batchIndex int64
idx []int
pos int
startTime time.Time
stopTime time.Time
lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
name string
prevBuffers *SealedBuffers
buf []byte
batchIndex int64
idx []int
pos int
startTime time.Time
stopTime time.Time
lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
isStopping *atomic.Bool
flushChan chan *dataToFlush
lastTsNs int64
isStopping *atomic.Bool
flushChan chan *dataToFlush
lastTsNs int64
sync.RWMutex
}
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
name: name,
prevBuffers: newSealedBuffers(PreviousBufferCount),
buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
name: name,
prevBuffers: newSealedBuffers(PreviousBufferCount),
buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
}
go lb.loopFlush()
go lb.loopInterval()
@ -199,10 +199,10 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
return nil
}
func (logBuffer *LogBuffer) GetEarliestTime() time.Time{
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
BatchIndex: logBuffer.batchIndex,
@ -241,8 +241,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
if tsMemory.IsZero() { // case 2.2
println("2.2 no data")
return nil, -2,nil
} else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3
return nil, -2, nil
} else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
if !logBuffer.lastFlushTime.IsZero() {
glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime)
return nil, -2, ResumeFromDiskError
@ -273,7 +273,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
}
lastTs := lastReadPosition.UnixNano()

2
weed/util/log_buffer/log_read.go

@ -18,7 +18,7 @@ var (
)
type MessagePosition struct {
time.Time // this is the timestamp of the message
time.Time // this is the timestamp of the message
BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
}

8
weed/util/log_buffer/sealed_buffer.go

@ -6,10 +6,10 @@ import (
)
type MemBuffer struct {
buf []byte
size int
startTime time.Time
stopTime time.Time
buf []byte
size int
startTime time.Time
stopTime time.Time
batchIndex int64
}

Loading…
Cancel
Save