|  |  | @ -1,8 +1,10 @@ | 
			
		
	
		
			
				
					|  |  |  | package log_buffer | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import ( | 
			
		
	
		
			
				
					|  |  |  | 	"crypto/rand" | 
			
		
	
		
			
				
					|  |  |  | 	"fmt" | 
			
		
	
		
			
				
					|  |  |  | 	"math/rand" | 
			
		
	
		
			
				
					|  |  |  | 	"io" | 
			
		
	
		
			
				
					|  |  |  | 	"sync" | 
			
		
	
		
			
				
					|  |  |  | 	"testing" | 
			
		
	
		
			
				
					|  |  |  | 	"time" | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | @ -10,33 +12,49 @@ import ( | 
			
		
	
		
			
				
					|  |  |  | ) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func TestNewLogBufferFirstBuffer(t *testing.T) { | 
			
		
	
		
			
				
					|  |  |  | 	lb := NewLogBuffer("test", time.Minute, func(startTime, stopTime time.Time, buf []byte) { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	flushInterval := time.Second | 
			
		
	
		
			
				
					|  |  |  | 	lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) { | 
			
		
	
		
			
				
					|  |  |  | 		fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) | 
			
		
	
		
			
				
					|  |  |  | 	}, func() { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	}) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	startTime := time.Now() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	messageSize := 1024 | 
			
		
	
		
			
				
					|  |  |  | 	messageCount := 5000 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	receivedmessageCount := 0 | 
			
		
	
		
			
				
					|  |  |  | 	var wg sync.WaitGroup | 
			
		
	
		
			
				
					|  |  |  | 	wg.Add(1) | 
			
		
	
		
			
				
					|  |  |  | 	go func() { | 
			
		
	
		
			
				
					|  |  |  | 		defer wg.Done() | 
			
		
	
		
			
				
					|  |  |  | 		lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { | 
			
		
	
		
			
				
					|  |  |  | 			// stop if no more messages
 | 
			
		
	
		
			
				
					|  |  |  | 			return true | 
			
		
	
		
			
				
					|  |  |  | 		}, func(logEntry *filer_pb.LogEntry) error { | 
			
		
	
		
			
				
					|  |  |  | 			receivedmessageCount++ | 
			
		
	
		
			
				
					|  |  |  | 			if receivedmessageCount >= messageCount { | 
			
		
	
		
			
				
					|  |  |  | 				println("processed all messages") | 
			
		
	
		
			
				
					|  |  |  | 				return io.EOF | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return nil | 
			
		
	
		
			
				
					|  |  |  | 		}) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedmessageCount) | 
			
		
	
		
			
				
					|  |  |  | 		fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err) | 
			
		
	
		
			
				
					|  |  |  | 		if err != nil && err != io.EOF { | 
			
		
	
		
			
				
					|  |  |  | 			t.Errorf("unexpected error %v", err) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	}() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	var buf = make([]byte, messageSize) | 
			
		
	
		
			
				
					|  |  |  | 	for i := 0; i < messageCount; i++ { | 
			
		
	
		
			
				
					|  |  |  | 		rand.Read(buf) | 
			
		
	
		
			
				
					|  |  |  | 		lb.AddToBuffer(nil, buf, 0) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	receivedmessageCount := 0 | 
			
		
	
		
			
				
					|  |  |  | 	lb.LoopProcessLogData("test", startTime, 0, func() bool { | 
			
		
	
		
			
				
					|  |  |  | 		// stop if no more messages
 | 
			
		
	
		
			
				
					|  |  |  | 		return false | 
			
		
	
		
			
				
					|  |  |  | 	}, func(logEntry *filer_pb.LogEntry) error { | 
			
		
	
		
			
				
					|  |  |  | 		receivedmessageCount++ | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 	}) | 
			
		
	
		
			
				
					|  |  |  | 	wg.Wait() | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	if receivedmessageCount != messageCount { | 
			
		
	
		
			
				
					|  |  |  | 		fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount) | 
			
		
	
		
			
				
					|  |  |  | 		t.Errorf("expect %d messages, but got %d", messageCount, receivedmessageCount) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | } |