@ -269,7 +269,8 @@ type LogFileQueueIterator struct {
masterClient * wdclient . MasterClient
masterClient * wdclient . MasterClient
startTsNs int64
startTsNs int64
stopTsNs int64
stopTsNs int64
currentFileIterator * LogFileIterator
pendingEntries [ ] * filer_pb . LogEntry
pendingIndex int
}
}
func newLogFileQueueIterator ( masterClient * wdclient . MasterClient , q * util . Queue [ * LogFileEntry ] , startTsNs , stopTsNs int64 ) * LogFileQueueIterator {
func newLogFileQueueIterator ( masterClient * wdclient . MasterClient , q * util . Queue [ * LogFileEntry ] , startTsNs , stopTsNs int64 ) * LogFileQueueIterator {
@ -284,13 +285,17 @@ func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[
// getNext will return io.EOF when done
// getNext will return io.EOF when done
func ( iter * LogFileQueueIterator ) getNext ( v * OrderedLogVisitor ) ( logEntry * filer_pb . LogEntry , err error ) {
func ( iter * LogFileQueueIterator ) getNext ( v * OrderedLogVisitor ) ( logEntry * filer_pb . LogEntry , err error ) {
for {
for {
if iter . currentFileIterator != nil {
logEntry , err = iter . currentFileIterator . getNext ( )
if err != io . EOF {
return
}
// return pending entries first
if iter . pendingIndex < len ( iter . pendingEntries ) {
logEntry = iter . pendingEntries [ iter . pendingIndex ]
iter . pendingIndex ++
return logEntry , nil
}
}
// now either iter.currentFileIterator is nil or err is io.EOF
// reset for next file
iter . pendingEntries = nil
iter . pendingIndex = 0
// read entries from next file
if iter . q . Len ( ) == 0 {
if iter . q . Len ( ) == 0 {
return nil , io . EOF
return nil , io . EOF
}
}
@ -313,7 +318,38 @@ func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer
if next != nil && next . TsNs <= iter . startTsNs {
if next != nil && next . TsNs <= iter . startTsNs {
continue
continue
}
}
iter . currentFileIterator = newLogFileIterator ( iter . masterClient , t . FileEntry , iter . startTsNs , iter . stopTsNs )
// read all entries from this file
iter . pendingEntries , err = iter . readFileEntries ( t . FileEntry )
if err != nil {
return nil , err
}
}
}
// readFileEntries reads all log entries from a single file
func ( iter * LogFileQueueIterator ) readFileEntries ( fileEntry * Entry ) ( entries [ ] * filer_pb . LogEntry , err error ) {
fileIterator := newLogFileIterator ( iter . masterClient , fileEntry , iter . startTsNs , iter . stopTsNs )
defer func ( ) {
if closeErr := fileIterator . Close ( ) ; closeErr != nil && err == nil {
err = closeErr
}
} ( )
for {
logEntry , err := fileIterator . getNext ( )
if err == io . EOF {
return entries , nil
}
if err != nil {
if isChunkNotFoundError ( err ) {
// Volume or chunk was deleted, skip the rest of this log file
glog . Warningf ( "skipping rest of %s: %v" , fileIterator . filePath , err )
return entries , nil
}
return nil , err
}
entries = append ( entries , logEntry )
}
}
}
}
@ -324,6 +360,7 @@ type LogFileIterator struct {
sizeBuf [ ] byte
sizeBuf [ ] byte
startTsNs int64
startTsNs int64
stopTsNs int64
stopTsNs int64
filePath string
}
}
func newLogFileIterator ( masterClient * wdclient . MasterClient , fileEntry * Entry , startTsNs , stopTsNs int64 ) * LogFileIterator {
func newLogFileIterator ( masterClient * wdclient . MasterClient , fileEntry * Entry , startTsNs , stopTsNs int64 ) * LogFileIterator {
@ -332,7 +369,15 @@ func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, s
sizeBuf : make ( [ ] byte , 4 ) ,
sizeBuf : make ( [ ] byte , 4 ) ,
startTsNs : startTsNs ,
startTsNs : startTsNs ,
stopTsNs : stopTsNs ,
stopTsNs : stopTsNs ,
filePath : string ( fileEntry . FullPath ) ,
}
}
}
func ( iter * LogFileIterator ) Close ( ) error {
if r , ok := iter . r . ( io . Closer ) ; ok {
return r . Close ( )
}
return nil
}
}
// getNext will return io.EOF when done
// getNext will return io.EOF when done