|
|
@ -78,7 +78,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic |
|
|
recordType = schema.NewRecordTypeBuilder(recordType). |
|
|
recordType = schema.NewRecordTypeBuilder(recordType). |
|
|
WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). |
|
|
WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). |
|
|
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). |
|
|
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). |
|
|
WithField(SW_COLUMN_NAME_INDEX, schema.TypeInt64). |
|
|
|
|
|
|
|
|
WithField(SW_COLUMN_NAME_OFFSET, schema.TypeInt64). |
|
|
RecordTypeEnd() |
|
|
RecordTypeEnd() |
|
|
|
|
|
|
|
|
parquetLevels, err := schema.ToParquetLevels(recordType) |
|
|
parquetLevels, err := schema.ToParquetLevels(recordType) |
|
|
@ -124,8 +124,8 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic |
|
|
|
|
|
|
|
|
// Get offset from parquet, default to 0 if not present (backward compatibility)
|
|
|
// Get offset from parquet, default to 0 if not present (backward compatibility)
|
|
|
var offset int64 = 0 |
|
|
var offset int64 = 0 |
|
|
if indexValue, exists := recordValue.Fields[SW_COLUMN_NAME_INDEX]; exists { |
|
|
|
|
|
offset = indexValue.GetInt64Value() |
|
|
|
|
|
|
|
|
if offsetValue, exists := recordValue.Fields[SW_COLUMN_NAME_OFFSET]; exists { |
|
|
|
|
|
offset = offsetValue.GetInt64Value() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
|