diff --git a/test/postgres/producer.go b/test/postgres/producer.go index 0e19ef258..c8e210d89 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "math/big" "math/rand" "os" "strconv" @@ -23,14 +24,16 @@ import ( ) type UserEvent struct { - ID int64 `json:"id"` - UserID int64 `json:"user_id"` - UserType string `json:"user_type"` - Action string `json:"action"` - Status string `json:"status"` - Amount float64 `json:"amount,omitempty"` - Timestamp time.Time `json:"timestamp"` - Metadata string `json:"metadata,omitempty"` + ID int64 `json:"id"` + UserID int64 `json:"user_id"` + UserType string `json:"user_type"` + Action string `json:"action"` + Status string `json:"status"` + Amount float64 `json:"amount,omitempty"` + PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL + BirthDate time.Time `json:"birth_date"` // Will be converted to DATE + Timestamp time.Time `json:"timestamp"` + Metadata string `json:"metadata,omitempty"` } type SystemLog struct { @@ -189,6 +192,26 @@ func createSchemaForTopic(topicName string) *schema_pb.RecordType { } } +// convertToDecimal converts a string to decimal format for Parquet logical type +func convertToDecimal(value string) ([]byte, int32, int32) { + // Parse the decimal string using big.Rat for precision + rat := new(big.Rat) + if _, success := rat.SetString(value); !success { + return nil, 0, 0 + } + + // Convert to a fixed scale (e.g., 4 decimal places) + scale := int32(4) + precision := int32(18) // Total digits + + // Scale the rational number to integer representation + multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) + scaled := new(big.Int).Mul(rat.Num(), multiplier) + scaled.Div(scaled, rat.Denom()) + + return scaled.Bytes(), precision, scale +} + // convertToRecordValue converts Go structs to RecordValue format func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields := make(map[string]*schema_pb.Value) @@ -201,7 +224,27 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}} fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}} fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}} - fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + + // Convert precise amount to DECIMAL logical type + if v.PreciseAmount != "" { + if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil { + fields["precise_amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DecimalValue{DecimalValue: &schema_pb.DecimalValue{ + Value: decimal, + Precision: precision, + Scale: scale, + }}} + } + } + + // Convert birth date to DATE logical type + fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{ + DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch + }}} + + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: v.Timestamp.UnixMicro(), + IsUtc: true, + }}} fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} case SystemLog: @@ -210,14 +253,20 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}} fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}} fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} - fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: v.Timestamp.UnixMicro(), + IsUtc: true, + }}} case MetricEntry: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}} fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}} fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} - fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: v.Timestamp.UnixMicro(), + IsUtc: true, + }}} case ProductView: fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} @@ -226,7 +275,10 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}} fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}} fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} - fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: v.Timestamp.UnixMicro(), + IsUtc: true, + }}} default: // Fallback to JSON for unknown types @@ -384,15 +436,26 @@ func generateUserEvent() interface{} { actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"} statuses := []string{"active", "inactive", "pending", "completed", "failed"} + // Generate a birth date between 1970 and 2005 (18+ years old) + birthYear := 1970 + rand.Intn(35) + birthMonth := 1 + rand.Intn(12) + birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues + birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC) + + // Generate a precise amount as a string with 4 decimal places + preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000) + return UserEvent{ - ID: rand.Int63n(1000000) + 1, - UserID: rand.Int63n(10000) + 1, - UserType: userTypes[rand.Intn(len(userTypes))], - Action: actions[rand.Intn(len(actions))], - Status: statuses[rand.Intn(len(statuses))], - Amount: rand.Float64() * 1000, - Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second), - Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)), + ID: rand.Int63n(1000000) + 1, + UserID: rand.Int63n(10000) + 1, + UserType: userTypes[rand.Intn(len(userTypes))], + Action: actions[rand.Intn(len(actions))], + Status: statuses[rand.Intn(len(statuses))], + Amount: rand.Float64() * 1000, + PreciseAmount: preciseAmount, + BirthDate: birthDate, + Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second), + Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)), } }