diff --git a/test/postgres/producer.go b/test/postgres/producer.go index 177fa57b0..ec11d5a28 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) type UserEvent struct { @@ -110,16 +111,140 @@ func main() { log.Println(" postgres=> SELECT COUNT(*) FROM user_events;") } +// createSchemaForTopic creates a proper RecordType schema based on topic name +func createSchemaForTopic(topicName string) *schema_pb.RecordType { + switch topicName { + case "user_events": + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "user_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "user_type", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "action", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "status", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "amount", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: false}, + {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "metadata", FieldIndex: 7, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: false}, + }, + } + case "system_logs": + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false}, + {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + }, + } + case "metrics": + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "name", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "value", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true}, + {Name: "tags", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "timestamp", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + }, + } + case "product_views": + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "product_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "user_id", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "category", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "price", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true}, + {Name: "view_count", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: true}, + {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + }, + } + case "application_logs", "error_logs": + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true}, + {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false}, + {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true}, + }, + } + default: + // Default generic schema + return &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "data", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, IsRequired: true}, + }, + } + } +} + +// convertToRecordValue converts Go structs to RecordValue format +func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { + fields := make(map[string]*schema_pb.Value) + + switch v := data.(type) { + case UserEvent: + fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} + fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}} + fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.UserType}} + fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}} + fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}} + fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} + + case SystemLog: + fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} + fields["level"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Level}} + fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}} + fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}} + fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + + case MetricEntry: + fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} + fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}} + fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}} + fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + + case ProductView: + fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}} + fields["product_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ProductID}} + fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}} + fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}} + fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}} + fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Timestamp.Format(time.RFC3339)}} + + default: + // Fallback to JSON for unknown types + jsonData, err := json.Marshal(data) + if err != nil { + return nil, fmt.Errorf("failed to marshal unknown type: %v", err) + } + fields["data"] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: jsonData}} + } + + return &schema_pb.RecordValue{Fields: fields}, nil +} + func createTopicData(masterAddr, filerAddr, namespace, topicName string, generator func() interface{}, count int) error { + // Create schema based on topic type + recordType := createSchemaForTopic(topicName) + // Create publisher configuration config := &pub_client.PublisherConfiguration{ Topic: topic.NewTopic(namespace, topicName), PartitionCount: 1, Brokers: []string{strings.Replace(masterAddr, ":9333", ":17777", 1)}, // Use broker port PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName), - RecordType: nil, // Use simple byte publishing + RecordType: recordType, // Use structured schema } // Create publisher @@ -133,15 +258,15 @@ func createTopicData(masterAddr, filerAddr, namespace, topicName string, for i := 0; i < count; i++ { data := generator() - // Convert to JSON - jsonData, err := json.Marshal(data) + // Convert struct to RecordValue + recordValue, err := convertToRecordValue(data) if err != nil { - log.Printf("Error marshaling data: %v", err) + log.Printf("Error converting data to RecordValue: %v", err) continue } - // Publish message (RecordType is nil, so use regular Publish) - err = publisher.Publish([]byte(fmt.Sprintf("key-%d", i)), jsonData) + // Publish structured record + err = publisher.PublishRecord([]byte(fmt.Sprintf("key-%d", i)), recordValue) if err != nil { log.Printf("Error publishing message %d: %v", i+1, err) continue diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index c8112738f..2e0d64c85 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -442,7 +442,10 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq switch col := expr.Expr.(type) { case *sqlparser.FuncExpr: // This is an aggregation function - aggSpec, _ := e.parseAggregationFunction(col, expr) + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } if aggSpec != nil { aggregations = append(aggregations, *aggSpec) hasAggregations = true diff --git a/weed/server/postgres/protocol.go b/weed/server/postgres/protocol.go index 25fbb7687..cf5344464 100644 --- a/weed/server/postgres/protocol.go +++ b/weed/server/postgres/protocol.go @@ -103,11 +103,23 @@ func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query s ctx := context.Background() result, err := s.sqlEngine.ExecuteSQL(ctx, cleanQuery) if err != nil { - return s.sendError(session, "42000", err.Error()) + // Send error message but keep connection alive + sendErr := s.sendError(session, "42000", err.Error()) + if sendErr != nil { + return sendErr + } + // Send ReadyForQuery to keep connection alive + return s.sendReadyForQuery(session) } if result.Error != nil { - return s.sendError(session, "42000", result.Error.Error()) + // Send error message but keep connection alive + sendErr := s.sendError(session, "42000", result.Error.Error()) + if sendErr != nil { + return sendErr + } + // Send ReadyForQuery to keep connection alive + return s.sendReadyForQuery(session) } // Send results