Browse Source

start after the known tsns

mq
chrislu 5 days ago
parent
commit
59bf216726
  1. 2
      weed/mq/logstore/read_log_from_disk.go
  2. 2
      weed/mq/logstore/read_parquet_to_log.go

2
weed/mq/logstore/read_log_from_disk.go

@ -36,7 +36,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err) err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
return return
} }
if logEntry.TsNs < starTsNs {
if logEntry.TsNs <= starTsNs {
pos += 4 + int(size) pos += 4 + int(size)
continue continue
} }

2
weed/mq/logstore/read_parquet_to_log.go

@ -73,7 +73,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err) return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err)
} }
processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
if processedTsNs < starTsNs {
if processedTsNs <= starTsNs {
continue continue
} }
if stopTsNs != 0 && processedTsNs >= stopTsNs { if stopTsNs != 0 && processedTsNs >= stopTsNs {

Loading…
Cancel
Save