|
|
@ -23,24 +23,24 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { |
|
|
|
messageSize := 1024 |
|
|
|
messageCount := 5000 |
|
|
|
|
|
|
|
receivedmessageCount := 0 |
|
|
|
receivedMessageCount := 0 |
|
|
|
var wg sync.WaitGroup |
|
|
|
wg.Add(1) |
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { |
|
|
|
// stop if no more messages
|
|
|
|
return true |
|
|
|
return receivedMessageCount < messageCount |
|
|
|
}, func(logEntry *filer_pb.LogEntry) error { |
|
|
|
receivedmessageCount++ |
|
|
|
if receivedmessageCount >= messageCount { |
|
|
|
receivedMessageCount++ |
|
|
|
if receivedMessageCount >= messageCount { |
|
|
|
println("processed all messages") |
|
|
|
return io.EOF |
|
|
|
} |
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedmessageCount) |
|
|
|
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount) |
|
|
|
fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err) |
|
|
|
if err != nil && err != io.EOF { |
|
|
|
t.Errorf("unexpected error %v", err) |
|
|
@ -54,7 +54,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
if receivedmessageCount != messageCount { |
|
|
|
t.Errorf("expect %d messages, but got %d", messageCount, receivedmessageCount) |
|
|
|
if receivedMessageCount != messageCount { |
|
|
|
t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount) |
|
|
|
} |
|
|
|
} |