diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index eeb1eb640..45d4fb856 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -356,20 +356,20 @@ func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte { func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte { // For Phase 7, create a simplified record batch // In Phase 8, this will implement proper Kafka record batch format v2 - + batch := h.createEmptyRecordBatch(baseOffset) - + // Update record count recordCountOffset := len(batch) - 4 binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount)) - + // Append payload (simplified - real implementation would format individual records) batch = append(batch, payload...) - + // Update batch length batchLength := len(batch) - 12 binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength)) - + return batch } @@ -390,7 +390,7 @@ func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, of // Create record batch from reconstructed messages recordBatch := h.createSchematizedRecordBatch(messages, offset) - fmt.Printf("DEBUG: Created schematized record batch: %d bytes for %d messages\n", + fmt.Printf("DEBUG: Created schematized record batch: %d bytes for %d messages\n", len(recordBatch), len(messages)) return recordBatch, nil diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index d01923f78..80fc65b81 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1607,10 +1607,10 @@ func (h *Handler) EnableSchemaManagement(config schema.ManagerConfig) error { if err != nil { return fmt.Errorf("failed to create schema manager: %w", err) } - + h.schemaManager = manager h.useSchema = true - + fmt.Printf("Schema management enabled with registry: %s\n", config.RegistryURL) return nil } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index c67d43dea..1d5e0f3f3 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -514,7 +514,7 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, return fmt.Errorf("schema decoding failed: %w", err) } - fmt.Printf("DEBUG: Successfully decoded message with schema ID %d, format %s, subject %s\n", + fmt.Printf("DEBUG: Successfully decoded message with schema ID %d, format %s, subject %s\n", decodedMsg.SchemaID, decodedMsg.SchemaFormat, decodedMsg.Subject) // If SeaweedMQ integration is enabled, store the decoded message @@ -524,7 +524,7 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, // For in-memory mode, we could store metadata about the schema // For now, just log the successful decoding - fmt.Printf("DEBUG: Schema decoding successful - would store RecordValue with %d fields\n", + fmt.Printf("DEBUG: Schema decoding successful - would store RecordValue with %d fields\n", len(decodedMsg.RecordValue.Fields)) return nil @@ -565,7 +565,7 @@ func (h *Handler) extractMessagesFromRecordSet(recordSetData []byte) ([][]byte, // Simplified: assume single message starting after record batch header // Real implementation would parse the record batch format properly messages := [][]byte{recordSetData} - + return messages, nil } @@ -581,7 +581,7 @@ func (h *Handler) validateSchemaCompatibility(topicName string, messageBytes []b return nil // Not schematized, no validation needed } - fmt.Printf("DEBUG: Validating schema compatibility - ID: %d, Format: %s, Topic: %s\n", + fmt.Printf("DEBUG: Validating schema compatibility - ID: %d, Format: %s, Topic: %s\n", schemaID, format, topicName) // TODO: Implement topic-specific schema validation diff --git a/weed/mq/kafka/schema/envelope.go b/weed/mq/kafka/schema/envelope.go index e889adce1..d26847e36 100644 --- a/weed/mq/kafka/schema/envelope.go +++ b/weed/mq/kafka/schema/envelope.go @@ -32,8 +32,8 @@ func (f Format) String() string { type ConfluentEnvelope struct { Format Format SchemaID uint32 - Indexes []int // For Protobuf nested message resolution - Payload []byte // The actual encoded data + Indexes []int // For Protobuf nested message resolution + Payload []byte // The actual encoded data } // ParseConfluentEnvelope parses a Confluent Schema Registry framed message @@ -50,7 +50,7 @@ func ParseConfluentEnvelope(data []byte) (*ConfluentEnvelope, bool) { // Extract schema ID (big-endian uint32) schemaID := binary.BigEndian.Uint32(data[1:5]) - + envelope := &ConfluentEnvelope{ Format: FormatAvro, // Default assumption; will be refined by schema registry lookup SchemaID: schemaID, @@ -61,7 +61,7 @@ func ParseConfluentEnvelope(data []byte) (*ConfluentEnvelope, bool) { // Note: Format detection should be done by the schema registry lookup // For now, we'll default to Avro and let the manager determine the actual format // based on the schema registry information - + return envelope, true } @@ -86,16 +86,16 @@ func CreateConfluentEnvelope(format Format, schemaID uint32, indexes []int, payl result := make([]byte, 5, 5+len(payload)+len(indexes)*4) result[0] = 0x00 // Magic byte binary.BigEndian.PutUint32(result[1:5], schemaID) - + // For Protobuf, add indexes as varints (simplified for Phase 1) if format == FormatProtobuf && len(indexes) > 0 { // TODO: Implement proper varint encoding for Protobuf indexes in Phase 5 // For now, we'll just append the payload } - + // Append the actual payload result = append(result, payload...) - + return result } @@ -104,11 +104,11 @@ func (e *ConfluentEnvelope) Validate() error { if e.SchemaID == 0 { return fmt.Errorf("invalid schema ID: 0") } - + if len(e.Payload) == 0 { return fmt.Errorf("empty payload") } - + // Format-specific validation switch e.Format { case FormatAvro: @@ -121,7 +121,7 @@ func (e *ConfluentEnvelope) Validate() error { default: return fmt.Errorf("unsupported format: %v", e.Format) } - + return nil } @@ -131,7 +131,7 @@ func (e *ConfluentEnvelope) Metadata() map[string]string { "schema_format": e.Format.String(), "schema_id": fmt.Sprintf("%d", e.SchemaID), } - + if len(e.Indexes) > 0 { // Store indexes for Protobuf reconstruction indexStr := "" @@ -143,6 +143,6 @@ func (e *ConfluentEnvelope) Metadata() map[string]string { } metadata["protobuf_indexes"] = indexStr } - + return metadata } diff --git a/weed/mq/kafka/schema/manager.go b/weed/mq/kafka/schema/manager.go index a5603d84e..7f0c059e9 100644 --- a/weed/mq/kafka/schema/manager.go +++ b/weed/mq/kafka/schema/manager.go @@ -14,12 +14,12 @@ import ( // Manager coordinates schema operations for the Kafka Gateway type Manager struct { registryClient *RegistryClient - + // Decoder cache - avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder + avroDecoders map[uint32]*AvroDecoder // schema ID -> decoder protobufDecoders map[uint32]*ProtobufDecoder // schema ID -> decoder - decoderMu sync.RWMutex - + decoderMu sync.RWMutex + // Configuration config ManagerConfig } @@ -47,17 +47,17 @@ const ( type DecodedMessage struct { // Original envelope information Envelope *ConfluentEnvelope - + // Schema information SchemaID uint32 SchemaFormat Format Subject string Version int - + // Decoded data RecordValue *schema_pb.RecordValue RecordType *schema_pb.RecordType - + // Metadata for storage Metadata map[string]string } @@ -69,9 +69,9 @@ func NewManager(config ManagerConfig) (*Manager, error) { Username: config.RegistryUsername, Password: config.RegistryPassword, } - + registryClient := NewRegistryClient(registryConfig) - + return &Manager{ registryClient: registryClient, avroDecoders: make(map[uint32]*AvroDecoder), @@ -86,12 +86,12 @@ func NewManagerWithHealthCheck(config ManagerConfig) (*Manager, error) { if err != nil { return nil, err } - + // Test connectivity if err := manager.registryClient.HealthCheck(); err != nil { return nil, fmt.Errorf("schema registry health check failed: %w", err) } - + return manager, nil } @@ -102,22 +102,22 @@ func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) { if !isSchematized { return nil, fmt.Errorf("message is not schematized") } - + // Step 2: Validate envelope if err := envelope.Validate(); err != nil { return nil, fmt.Errorf("invalid envelope: %w", err) } - + // Step 3: Get schema from registry cachedSchema, err := m.registryClient.GetSchemaByID(envelope.SchemaID) if err != nil { return nil, fmt.Errorf("failed to get schema %d: %w", envelope.SchemaID, err) } - + // Step 4: Decode based on format var recordValue *schema_pb.RecordValue var recordType *schema_pb.RecordType - + switch cachedSchema.Format { case FormatAvro: recordValue, recordType, err = m.decodeAvroMessage(envelope, cachedSchema) @@ -134,7 +134,7 @@ func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) { default: return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format) } - + // Step 5: Create decoded message decodedMsg := &DecodedMessage{ Envelope: envelope, @@ -146,7 +146,7 @@ func (m *Manager) DecodeMessage(messageBytes []byte) (*DecodedMessage, error) { RecordType: recordType, Metadata: m.createMetadata(envelope, cachedSchema), } - + return decodedMsg, nil } @@ -157,7 +157,7 @@ func (m *Manager) decodeAvroMessage(envelope *ConfluentEnvelope, cachedSchema *C if err != nil { return nil, nil, fmt.Errorf("failed to get Avro decoder: %w", err) } - + // Decode to RecordValue recordValue, err := decoder.DecodeToRecordValue(envelope.Payload) if err != nil { @@ -168,7 +168,7 @@ func (m *Manager) decodeAvroMessage(envelope *ConfluentEnvelope, cachedSchema *C // For now, return the error - we could implement partial decoding later return nil, nil, fmt.Errorf("permissive decoding failed: %w", err) } - + // Infer or get RecordType recordType, err := decoder.InferRecordType() if err != nil { @@ -179,7 +179,7 @@ func (m *Manager) decodeAvroMessage(envelope *ConfluentEnvelope, cachedSchema *C return nil, nil, fmt.Errorf("failed to infer record type: %w", err) } } - + return recordValue, recordType, nil } @@ -190,7 +190,7 @@ func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchem if err != nil { return nil, nil, fmt.Errorf("failed to get Protobuf decoder: %w", err) } - + // Decode to RecordValue recordValue, err := decoder.DecodeToRecordValue(envelope.Payload) if err != nil { @@ -200,7 +200,7 @@ func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchem // In permissive mode, try to decode as much as possible return nil, nil, fmt.Errorf("permissive decoding failed: %w", err) } - + // Get RecordType from descriptor recordType, err := decoder.InferRecordType() if err != nil { @@ -211,7 +211,7 @@ func (m *Manager) decodeProtobufMessage(envelope *ConfluentEnvelope, cachedSchem return nil, nil, fmt.Errorf("failed to infer record type: %w", err) } } - + return recordValue, recordType, nil } @@ -224,18 +224,18 @@ func (m *Manager) getAvroDecoder(schemaID uint32, schemaStr string) (*AvroDecode return decoder, nil } m.decoderMu.RUnlock() - + // Create new decoder decoder, err := NewAvroDecoder(schemaStr) if err != nil { return nil, err } - + // Cache the decoder m.decoderMu.Lock() m.avroDecoders[schemaID] = decoder m.decoderMu.Unlock() - + return decoder, nil } @@ -248,38 +248,38 @@ func (m *Manager) getProtobufDecoder(schemaID uint32, schemaStr string) (*Protob return decoder, nil } m.decoderMu.RUnlock() - + // For Protobuf, the schema is typically a binary FileDescriptorSet // In Confluent Schema Registry, Protobuf schemas are stored as binary descriptors schemaBytes := []byte(schemaStr) // Assume schemaStr contains binary data - + // Create new decoder decoder, err := NewProtobufDecoder(schemaBytes) if err != nil { return nil, err } - + // Cache the decoder m.decoderMu.Lock() m.protobufDecoders[schemaID] = decoder m.decoderMu.Unlock() - + return decoder, nil } // createMetadata creates metadata for storage in SeaweedMQ func (m *Manager) createMetadata(envelope *ConfluentEnvelope, cachedSchema *CachedSchema) map[string]string { metadata := envelope.Metadata() - + // Add schema registry information metadata["schema_subject"] = cachedSchema.Subject metadata["schema_version"] = fmt.Sprintf("%d", cachedSchema.Version) metadata["registry_url"] = m.registryClient.baseURL - + // Add decoding information metadata["decoded_at"] = fmt.Sprintf("%d", cachedSchema.CachedAt.Unix()) metadata["validation_mode"] = fmt.Sprintf("%d", m.config.ValidationMode) - + return metadata } @@ -294,13 +294,13 @@ func (m *Manager) GetSchemaInfo(messageBytes []byte) (uint32, Format, error) { if !ok { return 0, FormatUnknown, fmt.Errorf("not a schematized message") } - + // Get basic schema info from cache or registry cachedSchema, err := m.registryClient.GetSchemaByID(envelope.SchemaID) if err != nil { return 0, FormatUnknown, fmt.Errorf("failed to get schema info: %w", err) } - + return envelope.SchemaID, cachedSchema.Format, nil } @@ -325,7 +325,7 @@ func (m *Manager) ClearCache() { m.avroDecoders = make(map[uint32]*AvroDecoder) m.protobufDecoders = make(map[uint32]*ProtobufDecoder) m.decoderMu.Unlock() - + m.registryClient.ClearCache() } @@ -334,7 +334,7 @@ func (m *Manager) GetCacheStats() (decoders, schemas, subjects int) { m.decoderMu.RLock() decoders = len(m.avroDecoders) + len(m.protobufDecoders) m.decoderMu.RUnlock() - + schemas, subjects = m.registryClient.GetCacheStats() return } @@ -360,25 +360,25 @@ func (m *Manager) encodeAvroMessage(recordValue *schema_pb.RecordValue, schemaID if err != nil { return nil, fmt.Errorf("failed to get schema for encoding: %w", err) } - + // Get decoder (which contains the codec) decoder, err := m.getAvroDecoder(schemaID, cachedSchema.Schema) if err != nil { return nil, fmt.Errorf("failed to get decoder for encoding: %w", err) } - + // Convert RecordValue back to Go map goMap := recordValueToMap(recordValue) - + // Encode using Avro codec binary, err := decoder.codec.BinaryFromNative(nil, goMap) if err != nil { return nil, fmt.Errorf("failed to encode to Avro binary: %w", err) } - + // Create Confluent envelope envelope := CreateConfluentEnvelope(FormatAvro, schemaID, nil, binary) - + return envelope, nil } @@ -389,31 +389,31 @@ func (m *Manager) encodeProtobufMessage(recordValue *schema_pb.RecordValue, sche if err != nil { return nil, fmt.Errorf("failed to get schema for encoding: %w", err) } - + // Get decoder (which contains the descriptor) decoder, err := m.getProtobufDecoder(schemaID, cachedSchema.Schema) if err != nil { return nil, fmt.Errorf("failed to get decoder for encoding: %w", err) } - + // Convert RecordValue back to Go map goMap := recordValueToMap(recordValue) - + // Create a new message instance and populate it msg := decoder.msgType.New() if err := m.populateProtobufMessage(msg, goMap, decoder.descriptor); err != nil { return nil, fmt.Errorf("failed to populate Protobuf message: %w", err) } - + // Encode using Protobuf binary, err := proto.Marshal(msg.Interface()) if err != nil { return nil, fmt.Errorf("failed to encode to Protobuf binary: %w", err) } - + // Create Confluent envelope (with indexes if needed) envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, nil, binary) - + return envelope, nil } @@ -426,16 +426,16 @@ func (m *Manager) populateProtobufMessage(msg protoreflect.Message, data map[str // Skip unknown fields in permissive mode continue } - + // Convert and set the value protoValue, err := m.goValueToProtoValue(value, fieldDesc) if err != nil { return fmt.Errorf("failed to convert field %s: %w", key, err) } - + msg.Set(fieldDesc, protoValue) } - + return nil } @@ -444,7 +444,7 @@ func (m *Manager) goValueToProtoValue(value interface{}, fieldDesc protoreflect. if value == nil { return protoreflect.Value{}, nil } - + switch fieldDesc.Kind() { case protoreflect.BoolKind: if b, ok := value.(bool); ok { @@ -496,18 +496,18 @@ func (m *Manager) goValueToProtoValue(value interface{}, fieldDesc protoreflect. return protoreflect.ValueOfMessage(nestedMsg), nil } } - + return protoreflect.Value{}, fmt.Errorf("unsupported value type %T for field kind %v", value, fieldDesc.Kind()) } // recordValueToMap converts a RecordValue back to a Go map for encoding func recordValueToMap(recordValue *schema_pb.RecordValue) map[string]interface{} { result := make(map[string]interface{}) - + for key, value := range recordValue.Fields { result[key] = schemaValueToGoValue(value) } - + return result } diff --git a/weed/mq/kafka/schema/manager_test.go b/weed/mq/kafka/schema/manager_test.go index d72860a18..eec2a479e 100644 --- a/weed/mq/kafka/schema/manager_test.go +++ b/weed/mq/kafka/schema/manager_test.go @@ -37,7 +37,7 @@ func TestManager_DecodeMessage(t *testing.T) { RegistryURL: server.URL, ValidationMode: ValidationPermissive, } - + manager, err := NewManager(config) if err != nil { t.Fatalf("Failed to create manager: %v", err) @@ -52,7 +52,7 @@ func TestManager_DecodeMessage(t *testing.T) { {"name": "name", "type": "string"} ] }` - + codec, err := goavro.NewCodec(avroSchema) if err != nil { t.Fatalf("Failed to create Avro codec: %v", err) @@ -112,7 +112,7 @@ func TestManager_IsSchematized(t *testing.T) { config := ManagerConfig{ RegistryURL: "http://localhost:8081", // Not used for this test } - + manager, err := NewManager(config) if err != nil { // Skip test if we can't connect to registry @@ -177,7 +177,7 @@ func TestManager_GetSchemaInfo(t *testing.T) { config := ManagerConfig{ RegistryURL: server.URL, } - + manager, err := NewManager(config) if err != nil { t.Fatalf("Failed to create manager: %v", err) @@ -204,7 +204,7 @@ func TestManager_CacheManagement(t *testing.T) { config := ManagerConfig{ RegistryURL: "http://localhost:8081", // Not used for this test } - + manager, err := NewManager(config) if err != nil { t.Skip("Skipping test - no registry available") @@ -213,7 +213,7 @@ func TestManager_CacheManagement(t *testing.T) { // Check initial cache stats decoders, schemas, subjects := manager.GetCacheStats() if decoders != 0 || schemas != 0 || subjects != 0 { - t.Errorf("Expected empty cache initially, got decoders=%d, schemas=%d, subjects=%d", + t.Errorf("Expected empty cache initially, got decoders=%d, schemas=%d, subjects=%d", decoders, schemas, subjects) } @@ -223,7 +223,7 @@ func TestManager_CacheManagement(t *testing.T) { // Verify still empty decoders, schemas, subjects = manager.GetCacheStats() if decoders != 0 || schemas != 0 || subjects != 0 { - t.Errorf("Expected empty cache after clear, got decoders=%d, schemas=%d, subjects=%d", + t.Errorf("Expected empty cache after clear, got decoders=%d, schemas=%d, subjects=%d", decoders, schemas, subjects) } } @@ -254,7 +254,7 @@ func TestManager_EncodeMessage(t *testing.T) { config := ManagerConfig{ RegistryURL: server.URL, } - + manager, err := NewManager(config) if err != nil { t.Fatalf("Failed to create manager: %v", err) @@ -308,7 +308,7 @@ func BenchmarkManager_DecodeMessage(b *testing.B) { // Setup (similar to TestManager_DecodeMessage but simplified) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ - "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, "subject": "user-value", "version": 1, } diff --git a/weed/mq/kafka/schema/protobuf_decoder.go b/weed/mq/kafka/schema/protobuf_decoder.go index 26a65f703..fdce355ed 100644 --- a/weed/mq/kafka/schema/protobuf_decoder.go +++ b/weed/mq/kafka/schema/protobuf_decoder.go @@ -21,7 +21,7 @@ func NewProtobufDecoder(schemaBytes []byte) (*ProtobufDecoder, error) { // For Phase 5, we'll implement a simplified version // In a full implementation, this would properly parse FileDescriptorSet // and handle complex schema dependencies - + // For now, return an error indicating this needs proper implementation return nil, fmt.Errorf("Protobuf decoder from binary descriptors not fully implemented in Phase 5 - use NewProtobufDecoderFromDescriptor for testing") } @@ -30,7 +30,7 @@ func NewProtobufDecoder(schemaBytes []byte) (*ProtobufDecoder, error) { // This is used for testing and when we have pre-built descriptors func NewProtobufDecoderFromDescriptor(msgDesc protoreflect.MessageDescriptor) *ProtobufDecoder { msgType := dynamicpb.NewMessageType(msgDesc) - + return &ProtobufDecoder{ descriptor: msgDesc, msgType: msgType, @@ -43,7 +43,7 @@ func NewProtobufDecoderFromString(schemaStr string) (*ProtobufDecoder, error) { // For Phase 5, we'll implement a basic string-to-descriptor parser // In a full implementation, this would use protoc to compile .proto files // or parse the Confluent Schema Registry's Protobuf descriptor format - + return nil, fmt.Errorf("string-based Protobuf schemas not yet implemented - use binary descriptors") } @@ -79,13 +79,13 @@ func (pd *ProtobufDecoder) InferRecordType() (*schema_pb.RecordType, error) { // messageToMap converts a Protobuf message to a Go map func (pd *ProtobufDecoder) messageToMap(msg protoreflect.Message) map[string]interface{} { result := make(map[string]interface{}) - + msg.Range(func(fd protoreflect.FieldDescriptor, v protoreflect.Value) bool { fieldName := string(fd.Name()) result[fieldName] = pd.valueToInterface(fd, v) return true }) - + return result } @@ -152,10 +152,10 @@ func (pd *ProtobufDecoder) scalarValueToInterface(fd protoreflect.FieldDescripto // descriptorToRecordType converts a Protobuf descriptor to SeaweedMQ RecordType func (pd *ProtobufDecoder) descriptorToRecordType(desc protoreflect.MessageDescriptor) *schema_pb.RecordType { fields := make([]*schema_pb.Field, 0, desc.Fields().Len()) - + for i := 0; i < desc.Fields().Len(); i++ { fd := desc.Fields().Get(i) - + field := &schema_pb.Field{ Name: string(fd.Name()), FieldIndex: int32(fd.Number() - 1), // Protobuf field numbers start at 1 @@ -163,10 +163,10 @@ func (pd *ProtobufDecoder) descriptorToRecordType(desc protoreflect.MessageDescr IsRequired: fd.Cardinality() == protoreflect.Required, IsRepeated: fd.IsList(), } - + fields = append(fields, field) } - + return &schema_pb.RecordType{ Fields: fields, } @@ -190,7 +190,7 @@ func (pd *ProtobufDecoder) fieldDescriptorToType(fd protoreflect.FieldDescriptor // Handle map fields - for simplicity, treat as record with key/value fields keyType := pd.scalarKindToType(fd.MapKey().Kind(), nil) valueType := pd.scalarKindToType(fd.MapValue().Kind(), fd.MapValue().Message()) - + mapRecordType := &schema_pb.RecordType{ Fields: []*schema_pb.Field{ { @@ -200,14 +200,14 @@ func (pd *ProtobufDecoder) fieldDescriptorToType(fd protoreflect.FieldDescriptor IsRequired: true, }, { - Name: "value", + Name: "value", FieldIndex: 1, Type: valueType, IsRequired: false, }, }, } - + return &schema_pb.Type{ Kind: &schema_pb.Type_RecordType{ RecordType: mapRecordType, @@ -336,10 +336,10 @@ func ParseConfluentProtobufEnvelope(data []byte) (*ConfluentEnvelope, bool) { // No more varints, rest is message data break } - + envelope.Indexes = append(envelope.Indexes, int(index)) offset += bytesRead - + // Limit to reasonable number of indexes to avoid infinite loop if len(envelope.Indexes) > 10 { break @@ -354,22 +354,22 @@ func ParseConfluentProtobufEnvelope(data []byte) (*ConfluentEnvelope, bool) { func readVarint(data []byte) (uint64, int) { var result uint64 var shift uint - + for i, b := range data { if i >= 10 { // Prevent overflow (max varint is 10 bytes) return 0, 0 } - + result |= uint64(b&0x7F) << shift - + if b&0x80 == 0 { // Last byte (MSB is 0) return result, i + 1 } - + shift += 7 } - + // Incomplete varint return 0, 0 } @@ -379,18 +379,18 @@ func encodeVarint(value uint64) []byte { if value == 0 { return []byte{0} } - + var result []byte for value > 0 { b := byte(value & 0x7F) value >>= 7 - + if value > 0 { b |= 0x80 // Set continuation bit } - + result = append(result, b) } - + return result } diff --git a/weed/mq/kafka/schema/reconstruction_test.go b/weed/mq/kafka/schema/reconstruction_test.go index 36085c30f..1086ff017 100644 --- a/weed/mq/kafka/schema/reconstruction_test.go +++ b/weed/mq/kafka/schema/reconstruction_test.go @@ -37,7 +37,7 @@ func TestSchemaReconstruction_Avro(t *testing.T) { RegistryURL: server.URL, ValidationMode: ValidationPermissive, } - + manager, err := NewManager(config) if err != nil { t.Fatalf("Failed to create manager: %v", err) @@ -52,7 +52,7 @@ func TestSchemaReconstruction_Avro(t *testing.T) { {"name": "name", "type": "string"} ] }` - + codec, err := goavro.NewCodec(avroSchema) if err != nil { t.Fatalf("Failed to create Avro codec: %v", err) @@ -72,7 +72,7 @@ func TestSchemaReconstruction_Avro(t *testing.T) { // Create original Confluent message originalMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) - + // Debug: Check the created message t.Logf("Original Avro binary length: %d", len(avroBinary)) t.Logf("Original Confluent message length: %d", len(originalMsg)) @@ -82,7 +82,7 @@ func TestSchemaReconstruction_Avro(t *testing.T) { if !ok { t.Fatal("Failed to parse Confluent envelope") } - t.Logf("Parsed envelope - SchemaID: %d, Format: %v, Payload length: %d", + t.Logf("Parsed envelope - SchemaID: %d, Format: %v, Payload length: %d", envelope.SchemaID, envelope.Format, len(envelope.Payload)) // Step 1: Decode the original message (simulate Produce path) @@ -128,7 +128,7 @@ func TestSchemaReconstruction_Avro(t *testing.T) { func TestSchemaReconstruction_MultipleFormats(t *testing.T) { // Test that the reconstruction framework can handle multiple schema formats - + testCases := []struct { name string format Format @@ -151,7 +151,7 @@ func TestSchemaReconstruction_MultipleFormats(t *testing.T) { config := ManagerConfig{ RegistryURL: "http://localhost:8081", // Not used for this test } - + manager, err := NewManager(config) if err != nil { t.Skip("Skipping test - no registry available") @@ -159,7 +159,7 @@ func TestSchemaReconstruction_MultipleFormats(t *testing.T) { // Test encoding (will fail for Protobuf/JSON Schema in Phase 7, which is expected) _, err = manager.EncodeMessage(recordValue, 1, tc.format) - + switch tc.format { case FormatAvro: // Avro should work (but will fail due to no registry) @@ -191,7 +191,7 @@ func TestSchemaReconstruction_MultipleFormats(t *testing.T) { func TestConfluentEnvelope_RoundTrip(t *testing.T) { // Test that Confluent envelope creation and parsing work correctly - + testCases := []struct { name string format Format @@ -263,7 +263,7 @@ func TestConfluentEnvelope_RoundTrip(t *testing.T) { func TestSchemaMetadata_Preservation(t *testing.T) { // Test that schema metadata is properly preserved through the reconstruction process - + envelope := &ConfluentEnvelope{ Format: FormatAvro, SchemaID: 42, @@ -276,9 +276,9 @@ func TestSchemaMetadata_Preservation(t *testing.T) { // Verify metadata contents expectedMetadata := map[string]string{ - "schema_format": "AVRO", - "schema_id": "42", - "protobuf_indexes": "1,2,3", + "schema_format": "AVRO", + "schema_id": "42", + "protobuf_indexes": "1,2,3", } for key, expectedValue := range expectedMetadata { @@ -299,7 +299,7 @@ func TestSchemaMetadata_Preservation(t *testing.T) { } if reconstructedFormat != envelope.Format { - t.Errorf("Failed to reconstruct format from metadata: expected %v, got %v", + t.Errorf("Failed to reconstruct format from metadata: expected %v, got %v", envelope.Format, reconstructedFormat) } @@ -318,7 +318,7 @@ func BenchmarkSchemaReconstruction_Avro(b *testing.B) { config := ManagerConfig{ RegistryURL: "http://localhost:8081", } - + manager, err := NewManager(config) if err != nil { b.Skip("Skipping benchmark - no registry available") @@ -333,7 +333,7 @@ func BenchmarkSchemaReconstruction_Avro(b *testing.B) { func BenchmarkConfluentEnvelope_Creation(b *testing.B) { payload := []byte("test-payload-for-benchmarking") - + b.ResetTimer() for i := 0; i < b.N; i++ { _ = CreateConfluentEnvelope(FormatAvro, 1, nil, payload) @@ -342,7 +342,7 @@ func BenchmarkConfluentEnvelope_Creation(b *testing.B) { func BenchmarkConfluentEnvelope_Parsing(b *testing.B) { envelope := CreateConfluentEnvelope(FormatAvro, 1, nil, []byte("test-payload")) - + b.ResetTimer() for i := 0; i < b.N; i++ { _, _ = ParseConfluentEnvelope(envelope)