package schema import ( "fmt" parquet "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: var parquetValue parquet.Value parquetValue, err = toParquetValue(fieldValue) if err != nil { return } rowBuilder.Add(levels.startColumnIndex, parquetValue) // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue) case *schema_pb.Type_ListType: rowBuilder.Next(levels.startColumnIndex) // fmt.Printf("rowBuilder.Next %d\n", columnIndex) elementType := fieldType.GetListType().ElementType for _, value := range fieldValue.GetListValue().Values { if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil { return } } } return } func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error { parquetLevels, err := ToParquetLevels(recordType) if err != nil { return err } visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) { return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue) } fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}} fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}} return doVisitValue(fieldType, parquetLevels, fieldValue, visitor) } // typeValueVisitor is a function that is called for each value in a schema_pb.Value // Find the column index. // intended to be used in RowBuilder.Add(columnIndex, value) type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) // endIndex is exclusive // same logic as RowBuilder.configure in row_builder.go func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: return visitor(fieldType, levels, fieldValue) case *schema_pb.Type_ListType: return visitor(fieldType, levels, fieldValue) case *schema_pb.Type_RecordType: for _, field := range fieldType.GetRecordType().Fields { fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name] if !found { // TODO check this if no such field found continue } fieldLevels := levels.levels[field.Name] err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor) if err != nil { return } } return } return } func toParquetValue(value *schema_pb.Value) (parquet.Value, error) { switch value.Kind.(type) { case *schema_pb.Value_BoolValue: return parquet.BooleanValue(value.GetBoolValue()), nil case *schema_pb.Value_Int32Value: return parquet.Int32Value(value.GetInt32Value()), nil case *schema_pb.Value_Int64Value: return parquet.Int64Value(value.GetInt64Value()), nil case *schema_pb.Value_FloatValue: return parquet.FloatValue(value.GetFloatValue()), nil case *schema_pb.Value_DoubleValue: return parquet.DoubleValue(value.GetDoubleValue()), nil case *schema_pb.Value_BytesValue: return parquet.ByteArrayValue(value.GetBytesValue()), nil case *schema_pb.Value_StringValue: return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil default: return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind) } }