|
@ -13,12 +13,12 @@ import ( |
|
|
|
|
|
|
|
|
func TestNewLogBufferFirstBuffer(t *testing.T) { |
|
|
func TestNewLogBufferFirstBuffer(t *testing.T) { |
|
|
flushInterval := time.Second |
|
|
flushInterval := time.Second |
|
|
lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) { |
|
|
|
|
|
|
|
|
lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) { |
|
|
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) |
|
|
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) |
|
|
}, nil, func() { |
|
|
}, nil, func() { |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
startTime := time.Now() |
|
|
|
|
|
|
|
|
startTime := MessagePosition{Time:time.Now()} |
|
|
|
|
|
|
|
|
messageSize := 1024 |
|
|
messageSize := 1024 |
|
|
messageCount := 5000 |
|
|
messageCount := 5000 |
|
@ -31,13 +31,13 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { |
|
|
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { |
|
|
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { |
|
|
// stop if no more messages
|
|
|
// stop if no more messages
|
|
|
return receivedMessageCount < messageCount |
|
|
return receivedMessageCount < messageCount |
|
|
}, func(logEntry *filer_pb.LogEntry) error { |
|
|
|
|
|
|
|
|
}, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { |
|
|
receivedMessageCount++ |
|
|
receivedMessageCount++ |
|
|
if receivedMessageCount >= messageCount { |
|
|
if receivedMessageCount >= messageCount { |
|
|
println("processed all messages") |
|
|
println("processed all messages") |
|
|
return io.EOF |
|
|
|
|
|
|
|
|
return true, io.EOF |
|
|
} |
|
|
} |
|
|
return nil |
|
|
|
|
|
|
|
|
return false,nil |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount) |
|
|
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount) |
|
|