From 57949f9959e3b9fcf4438641a8e499a706da0271 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 18 Apr 2024 22:41:12 -0700 Subject: [PATCH] support list type --- weed/mq/schema/to_parquet_schema.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/weed/mq/schema/to_parquet_schema.go b/weed/mq/schema/to_parquet_schema.go index f39692714..a5f3481e3 100644 --- a/weed/mq/schema/to_parquet_schema.go +++ b/weed/mq/schema/to_parquet_schema.go @@ -15,23 +15,29 @@ func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parqu return parquet.NewSchema(topicName, rootNode), nil } -func toParquetFieldType(field *schema_pb.Field) (parquet.Node, error) { - var ( - dataType parquet.Node - err error - ) - switch field.Type.Kind.(type) { +func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) { + switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: - dataType, err = toParquetFieldTypeScalar(field.Type.GetScalarType()) + dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType()) case *schema_pb.Type_RecordType: - dataType, err = toParquetFieldTypeRecord(field.Type.GetRecordType()) + dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType()) + case *schema_pb.Type_ListType: + dataType, err = toParquetFieldTypeList(fieldType.GetListType()) default: - return nil, fmt.Errorf("unknown field type: %T", field.Type.Kind) + return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind) } return dataType, err } +func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) { + elementType, err := toParquetFieldType(listType.ElementType) + if err != nil { + return nil, err + } + return parquet.List(elementType), nil +} + func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) { switch scalarType { case schema_pb.ScalarType_BOOLEAN: @@ -55,7 +61,7 @@ func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, er func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) { recordNode := parquet.Group{} for _, field := range recordType.Fields { - parquetFieldType, err := toParquetFieldType(field) + parquetFieldType, err := toParquetFieldType(field.Type) if err != nil { return nil, err }