diff --git a/test/kafka/persistent_offset_integration_test.go b/test/kafka/persistent_offset_integration_test.go index 097fef2de..7417e6990 100644 --- a/test/kafka/persistent_offset_integration_test.go +++ b/test/kafka/persistent_offset_integration_test.go @@ -233,6 +233,7 @@ func testSMQSubscriberIntegration(t *testing.T, brokers []string) { // Subscribe from offset 0 subscription, err := subscriber.Subscribe(kafkaTopic, kafkaPartition, 0, consumerGroup) require.NoError(t, err) + _ = subscription // Use the subscription variable // Wait for subscription to be active time.Sleep(2 * time.Second) diff --git a/test/kafka/schema_integration_test.go b/test/kafka/schema_integration_test.go index 37f5be450..36b7d1a86 100644 --- a/test/kafka/schema_integration_test.go +++ b/test/kafka/schema_integration_test.go @@ -479,20 +479,22 @@ func TestKafkaGateway_SchemaPerformance(t *testing.T) { t.Fatalf("Failed to create schema manager: %v", err) } - // Create test message + // Create test message using the same schema as registered (with email field) avroSchema := `{ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, - {"name": "name", "type": "string"} + {"name": "name", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": null} ] }` codec, _ := goavro.NewCodec(avroSchema) testData := map[string]interface{}{ - "id": int32(1), - "name": "Performance Test", + "id": int32(1), + "name": "Performance Test", + "email": map[string]interface{}{"string": "perf@example.com"}, } avroBinary, _ := codec.BinaryFromNative(nil, testData) testMsg := schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, avroBinary) diff --git a/test/kafka/schema_smq_integration_test.go b/test/kafka/schema_smq_integration_test.go index 67570947a..60885d70e 100644 --- a/test/kafka/schema_smq_integration_test.go +++ b/test/kafka/schema_smq_integration_test.go @@ -37,12 +37,13 @@ func createTestKafkaHandler(t *testing.T) *protocol.Handler { // Create handler with schema management enabled handler := protocol.NewHandler() - // Enable schema management with mock registry + // Try to enable schema management, but don't fail if registry is not available err := handler.EnableSchemaManagement(schema.ManagerConfig{ RegistryURL: "http://localhost:8081", // Mock registry }) if err != nil { t.Logf("Schema management not enabled (expected in test): %v", err) + // Continue without schema management for basic offset testing } return handler @@ -100,6 +101,16 @@ func testAvroMessageWorkflow(t *testing.T, handler *protocol.Handler) { t.Logf("Assigned Kafka offset: %d", baseOffset) // Step 6: Process the schematized message (simulate what happens in Produce handler) + // Always store the record in the ledger for offset tracking + timestamp := time.Now().UnixNano() + err = ledger.AppendRecord(baseOffset, timestamp, int32(len(confluentMsg))) + if err != nil { + t.Fatalf("Failed to append record to ledger: %v", err) + } + + t.Logf("Stored message in SMQ simulation - Offset: %d, Timestamp: %d, Size: %d", + baseOffset, timestamp, len(confluentMsg)) + if handler.IsSchemaEnabled() { // Parse Confluent envelope envelope, ok := schema.ParseConfluentEnvelope(confluentMsg) @@ -111,15 +122,7 @@ func testAvroMessageWorkflow(t *testing.T, handler *protocol.Handler) { envelope.SchemaID, envelope.Format, len(envelope.Payload)) // This is where the message would be decoded and sent to SMQ - // For now, we'll simulate the SMQ storage - timestamp := time.Now().UnixNano() - err = ledger.AppendRecord(baseOffset, timestamp, int32(len(confluentMsg))) - if err != nil { - t.Fatalf("Failed to append record to ledger: %v", err) - } - - t.Logf("Stored message in SMQ simulation - Offset: %d, Timestamp: %d, Size: %d", - baseOffset, timestamp, len(confluentMsg)) + // Schema processing happens after storage } // Step 7: Verify offset management