diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 0c2eb38c3..b674c3aeb 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -260,7 +260,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok if endIdx > len(session.consumedRecords) { endIdx = len(session.consumedRecords) } - glog.V(2).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", + glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)", endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) session.mu.Unlock() return session.consumedRecords[startIdx:endIdx], nil @@ -317,12 +317,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // Check if the session was already recreated at (or before) the requested offset if existingOffset <= requestedOffset { bc.subscribersLock.Unlock() - glog.V(2).Infof("[FETCH] ✓ Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) + glog.V(2).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) // Re-acquire the existing session and continue return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) } - glog.V(2).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) + glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) // Session still needs recreation - close it if existingSession.Stream != nil { @@ -388,7 +388,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok bc.subscribers[key] = newSession bc.subscribersLock.Unlock() - glog.V(2).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) + glog.V(2).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) // Read from fresh subscriber glog.V(2).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) @@ -586,7 +586,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib if result.err != nil { glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) // Update session offset before returning - glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)", + glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (error case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset return records, nil @@ -612,7 +612,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib // Timeout - return what we have glog.V(2).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) // CRITICAL: Update session offset so next fetch knows where we left off - glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)", + glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (timeout case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset return records, nil @@ -621,7 +621,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) // Update session offset after successful read - glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)", + glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (success case, read %d records)", session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 802351528..a8ea803f1 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -871,373 +871,12 @@ func encodeVarint(value int64) []byte { return buf } -// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue -func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) { - // Only reconstruct if schema management is enabled - if !h.IsSchemaEnabled() { - return nil, fmt.Errorf("schema management not enabled") - } - - // Extract schema information from metadata - schemaIDStr, exists := metadata["schema_id"] - if !exists { - return nil, fmt.Errorf("no schema ID in metadata") - } - - var schemaID uint32 - if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil { - return nil, fmt.Errorf("invalid schema ID: %w", err) - } - - formatStr, exists := metadata["schema_format"] - if !exists { - return nil, fmt.Errorf("no schema format in metadata") - } - - var format schema.Format - switch formatStr { - case "AVRO": - format = schema.FormatAvro - case "PROTOBUF": - format = schema.FormatProtobuf - case "JSON_SCHEMA": - format = schema.FormatJSONSchema - default: - return nil, fmt.Errorf("unsupported schema format: %s", formatStr) - } - - // Use schema manager to encode back to original format - return h.schemaManager.EncodeMessage(recordValue, schemaID, format) -} - // SchematizedRecord holds both key and value for schematized messages type SchematizedRecord struct { Key []byte Value []byte } -// fetchSchematizedRecords fetches and reconstructs schematized records from SeaweedMQ -func (h *Handler) fetchSchematizedRecords(topicName string, partitionID int32, offset int64, maxBytes int32) ([]*SchematizedRecord, error) { - glog.Infof("fetchSchematizedRecords: topic=%s partition=%d offset=%d maxBytes=%d", topicName, partitionID, offset, maxBytes) - - // Only proceed when schema feature is toggled on - if !h.useSchema { - glog.Infof("fetchSchematizedRecords EARLY RETURN: useSchema=false") - return []*SchematizedRecord{}, nil - } - - // Check if SeaweedMQ handler is available when schema feature is in use - if h.seaweedMQHandler == nil { - glog.Infof("fetchSchematizedRecords ERROR: seaweedMQHandler is nil") - return nil, fmt.Errorf("SeaweedMQ handler not available") - } - - // If schema management isn't fully configured, return empty instead of error - if !h.IsSchemaEnabled() { - glog.Infof("fetchSchematizedRecords EARLY RETURN: IsSchemaEnabled()=false") - return []*SchematizedRecord{}, nil - } - - // Fetch stored records from SeaweedMQ - maxRecords := 100 // Reasonable batch size limit - glog.Infof("fetchSchematizedRecords: calling GetStoredRecords maxRecords=%d", maxRecords) - smqRecords, err := h.seaweedMQHandler.GetStoredRecords(context.Background(), topicName, partitionID, offset, maxRecords) - if err != nil { - glog.Infof("fetchSchematizedRecords ERROR: GetStoredRecords failed: %v", err) - return nil, fmt.Errorf("failed to fetch SMQ records: %w", err) - } - - glog.Infof("fetchSchematizedRecords: GetStoredRecords returned %d records", len(smqRecords)) - if len(smqRecords) == 0 { - return []*SchematizedRecord{}, nil - } - - var reconstructedRecords []*SchematizedRecord - totalBytes := int32(0) - - for _, smqRecord := range smqRecords { - // Check if we've exceeded maxBytes limit - if maxBytes > 0 && totalBytes >= maxBytes { - break - } - - // Try to reconstruct the schematized message value - reconstructedValue, err := h.reconstructSchematizedMessageFromSMQ(smqRecord) - if err != nil { - // Log error but continue with other messages - Error("Failed to reconstruct schematized message at offset %d: %v", smqRecord.GetOffset(), err) - continue - } - - if reconstructedValue != nil { - // Create SchematizedRecord with both key and reconstructed value - record := &SchematizedRecord{ - Key: smqRecord.GetKey(), // Preserve the original key - Value: reconstructedValue, // Use the reconstructed value - } - reconstructedRecords = append(reconstructedRecords, record) - totalBytes += int32(len(record.Key) + len(record.Value)) - } - } - - return reconstructedRecords, nil -} - -// reconstructSchematizedMessageFromSMQ reconstructs a schematized message from an SMQRecord -func (h *Handler) reconstructSchematizedMessageFromSMQ(smqRecord integration.SMQRecord) ([]byte, error) { - // Get the stored value (should be a serialized RecordValue) - valueBytes := smqRecord.GetValue() - if len(valueBytes) == 0 { - return nil, fmt.Errorf("empty value in SMQ record") - } - - // Try to unmarshal as RecordValue - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(valueBytes, recordValue); err != nil { - // If it's not a RecordValue, it might be a regular Kafka message - // Return it as-is (non-schematized) - return valueBytes, nil - } - - // Extract schema metadata from the RecordValue fields - metadata := h.extractSchemaMetadataFromRecord(recordValue) - if len(metadata) == 0 { - // No schema metadata found, treat as regular message - return valueBytes, nil - } - - // Remove Kafka metadata fields to get the original message content - originalRecord := h.removeKafkaMetadataFields(recordValue) - - // Reconstruct the original Confluent envelope - return h.reconstructSchematizedMessage(originalRecord, metadata) -} - -// extractSchemaMetadataFromRecord extracts schema metadata from RecordValue fields -func (h *Handler) extractSchemaMetadataFromRecord(recordValue *schema_pb.RecordValue) map[string]string { - metadata := make(map[string]string) - - // Look for schema metadata fields in the record - if schemaIDField := recordValue.Fields["_schema_id"]; schemaIDField != nil { - if schemaIDValue := schemaIDField.GetStringValue(); schemaIDValue != "" { - metadata["schema_id"] = schemaIDValue - } - } - - if schemaFormatField := recordValue.Fields["_schema_format"]; schemaFormatField != nil { - if schemaFormatValue := schemaFormatField.GetStringValue(); schemaFormatValue != "" { - metadata["schema_format"] = schemaFormatValue - } - } - - if schemaSubjectField := recordValue.Fields["_schema_subject"]; schemaSubjectField != nil { - if schemaSubjectValue := schemaSubjectField.GetStringValue(); schemaSubjectValue != "" { - metadata["schema_subject"] = schemaSubjectValue - } - } - - if schemaVersionField := recordValue.Fields["_schema_version"]; schemaVersionField != nil { - if schemaVersionValue := schemaVersionField.GetStringValue(); schemaVersionValue != "" { - metadata["schema_version"] = schemaVersionValue - } - } - - return metadata -} - -// removeKafkaMetadataFields removes Kafka and schema metadata fields from RecordValue -func (h *Handler) removeKafkaMetadataFields(recordValue *schema_pb.RecordValue) *schema_pb.RecordValue { - originalRecord := &schema_pb.RecordValue{ - Fields: make(map[string]*schema_pb.Value), - } - - // Copy all fields except metadata fields - for key, value := range recordValue.Fields { - if !h.isMetadataField(key) { - originalRecord.Fields[key] = value - } - } - - return originalRecord -} - -// isMetadataField checks if a field is a metadata field that should be excluded from the original message -func (h *Handler) isMetadataField(fieldName string) bool { - return fieldName == "_kafka_offset" || - fieldName == "_kafka_partition" || - fieldName == "_kafka_timestamp" || - fieldName == "_schema_id" || - fieldName == "_schema_format" || - fieldName == "_schema_subject" || - fieldName == "_schema_version" -} - -// createSchematizedRecordBatch creates a Kafka record batch from reconstructed schematized messages -func (h *Handler) createSchematizedRecordBatch(records []*SchematizedRecord, baseOffset int64) []byte { - if len(records) == 0 { - // Return empty record batch - return h.createEmptyRecordBatch(baseOffset) - } - - // Create individual record entries for the batch - var recordsData []byte - currentTimestamp := time.Now().UnixMilli() - - for i, record := range records { - // Create a record entry (Kafka record format v2) with both key and value - recordEntry := h.createRecordEntry(record.Key, record.Value, int32(i), currentTimestamp) - recordsData = append(recordsData, recordEntry...) - } - - // Apply compression if the data is large enough to benefit - enableCompression := len(recordsData) > 100 - var compressionType compression.CompressionCodec = compression.None - var finalRecordsData []byte - - if enableCompression { - compressed, err := compression.Compress(compression.Gzip, recordsData) - if err == nil && len(compressed) < len(recordsData) { - finalRecordsData = compressed - compressionType = compression.Gzip - } else { - finalRecordsData = recordsData - } - } else { - finalRecordsData = recordsData - } - - // Create the record batch with proper compression and CRC - batch, err := h.createRecordBatchWithCompressionAndCRC(baseOffset, finalRecordsData, compressionType, int32(len(records)), currentTimestamp) - if err != nil { - // Fallback to simple batch creation - return h.createRecordBatchWithPayload(baseOffset, int32(len(records)), finalRecordsData) - } - - return batch -} - -// createRecordEntry creates a single record entry in Kafka record format v2 -func (h *Handler) createRecordEntry(messageKey []byte, messageData []byte, offsetDelta int32, timestamp int64) []byte { - // Record format v2: - // - length (varint) - // - attributes (int8) - // - timestamp delta (varint) - // - offset delta (varint) - // - key length (varint) + key - // - value length (varint) + value - // - headers count (varint) + headers - - var record []byte - - // Attributes (1 byte) - no special attributes - record = append(record, 0) - - // Timestamp delta (varint) - 0 for now (all messages have same timestamp) - record = append(record, encodeVarint(0)...) - - // Offset delta (varint) - record = append(record, encodeVarint(int64(offsetDelta))...) - - // Key length (varint) + key - if messageKey == nil || len(messageKey) == 0 { - record = append(record, encodeVarint(-1)...) // -1 indicates null key - } else { - record = append(record, encodeVarint(int64(len(messageKey)))...) - record = append(record, messageKey...) - } - - // Value length (varint) + value - record = append(record, encodeVarint(int64(len(messageData)))...) - record = append(record, messageData...) - - // Headers count (varint) - no headers - record = append(record, encodeVarint(0)...) - - // Prepend the total record length (varint) - recordLength := encodeVarint(int64(len(record))) - return append(recordLength, record...) -} - -// createRecordBatchWithCompressionAndCRC creates a Kafka record batch with proper compression and CRC -func (h *Handler) createRecordBatchWithCompressionAndCRC(baseOffset int64, recordsData []byte, compressionType compression.CompressionCodec, recordCount int32, baseTimestampMs int64) ([]byte, error) { - // Create record batch header - // Validate size to prevent overflow - const maxBatchSize = 1 << 30 // 1 GB limit - if len(recordsData) > maxBatchSize-61 { - return nil, fmt.Errorf("records data too large: %d bytes", len(recordsData)) - } - batch := make([]byte, 0, len(recordsData)+61) // 61 bytes for header - - // Base offset (8 bytes) - baseOffsetBytes := make([]byte, 8) - binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) - batch = append(batch, baseOffsetBytes...) - - // Batch length placeholder (4 bytes) - will be filled later - batchLengthPos := len(batch) - batch = append(batch, 0, 0, 0, 0) - - // Partition leader epoch (4 bytes) - batch = append(batch, 0, 0, 0, 0) - - // Magic byte (1 byte) - version 2 - batch = append(batch, 2) - - // CRC placeholder (4 bytes) - will be calculated later - crcPos := len(batch) - batch = append(batch, 0, 0, 0, 0) - - // Attributes (2 bytes) - compression type and other flags - attributes := int16(compressionType) // Set compression type in lower 3 bits - attributesBytes := make([]byte, 2) - binary.BigEndian.PutUint16(attributesBytes, uint16(attributes)) - batch = append(batch, attributesBytes...) - - // Last offset delta (4 bytes) - lastOffsetDelta := uint32(recordCount - 1) - lastOffsetDeltaBytes := make([]byte, 4) - binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) - batch = append(batch, lastOffsetDeltaBytes...) - - // First timestamp (8 bytes) - use the same timestamp used to build record entries - firstTimestampBytes := make([]byte, 8) - binary.BigEndian.PutUint64(firstTimestampBytes, uint64(baseTimestampMs)) - batch = append(batch, firstTimestampBytes...) - - // Max timestamp (8 bytes) - same as first for simplicity - batch = append(batch, firstTimestampBytes...) - - // Producer ID (8 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) - - // Producer epoch (2 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF) - - // Base sequence (4 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) - - // Record count (4 bytes) - recordCountBytes := make([]byte, 4) - binary.BigEndian.PutUint32(recordCountBytes, uint32(recordCount)) - batch = append(batch, recordCountBytes...) - - // Records payload (compressed or uncompressed) - batch = append(batch, recordsData...) - - // Calculate and set batch length (excluding base offset and batch length fields) - batchLength := len(batch) - 12 // 8 bytes base offset + 4 bytes batch length - binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], uint32(batchLength)) - - // Calculate and set CRC32 over attributes..end (exclude CRC field itself) - // Kafka uses Castagnoli (CRC-32C) algorithm. CRC covers ONLY from attributes offset (byte 21) onwards. - // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...) - crcData := batch[crcPos+4:] // Skip CRC field itself (bytes 17..20) and include the rest - crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli)) - binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) - - return batch, nil -} - // createEmptyRecordBatch creates an empty Kafka record batch using the new parser func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte { // Use the new record batch creation function with no compression @@ -1307,47 +946,6 @@ func (h *Handler) createEmptyRecordBatchManual(baseOffset int64) []byte { return batch } -// createRecordBatchWithPayload creates a record batch with the given payload -func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte { - // For Phase 7, create a simplified record batch - // In Phase 8, this will implement proper Kafka record batch format v2 - - batch := h.createEmptyRecordBatch(baseOffset) - - // Update record count - recordCountOffset := len(batch) - 4 - binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount)) - - // Append payload (simplified - real implementation would format individual records) - batch = append(batch, payload...) - - // Update batch length - batchLength := len(batch) - 12 - binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength)) - - return batch -} - -// handleSchematizedFetch handles fetch requests for topics with schematized messages -func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, offset int64, maxBytes int32) ([]byte, error) { - // Check if this topic uses schema management - if !h.IsSchemaEnabled() { - // Fall back to regular fetch handling - return nil, fmt.Errorf("schema management not enabled") - } - - // Fetch schematized records from SeaweedMQ - records, err := h.fetchSchematizedRecords(topicName, partitionID, offset, maxBytes) - if err != nil { - return nil, fmt.Errorf("failed to fetch schematized records: %w", err) - } - - // Create record batch from reconstructed records - recordBatch := h.createSchematizedRecordBatch(records, offset) - - return recordBatch, nil -} - // isSchematizedTopic checks if a topic uses schema management func (h *Handler) isSchematizedTopic(topicName string) bool { // System topics (_schemas, __consumer_offsets, etc.) should NEVER use schema encoding @@ -1593,62 +1191,6 @@ func (h *Handler) getTopicSchemaConfig(topicName string) (*TopicSchemaConfig, er return config, nil } -// decodeRecordValueToKafkaKey decodes a key RecordValue back to the original Kafka key bytes -func (h *Handler) decodeRecordValueToKafkaKey(topicName string, keyRecordValueBytes []byte) []byte { - if keyRecordValueBytes == nil { - return nil - } - - // Try to get topic schema config - schemaConfig, err := h.getTopicSchemaConfig(topicName) - if err != nil || !schemaConfig.HasKeySchema { - // No key schema config available, return raw bytes - return keyRecordValueBytes - } - - // Try to unmarshal as RecordValue - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(keyRecordValueBytes, recordValue); err != nil { - // If it's not a RecordValue, return the raw bytes - return keyRecordValueBytes - } - - // If key schema management is enabled, re-encode the RecordValue to Confluent format - if h.IsSchemaEnabled() { - if encodedKey, err := h.encodeKeyRecordValueToConfluentFormat(topicName, recordValue); err == nil { - return encodedKey - } - } - - // Fallback: convert RecordValue to JSON - return h.recordValueToJSON(recordValue) -} - -// encodeKeyRecordValueToConfluentFormat re-encodes a key RecordValue back to Confluent format -func (h *Handler) encodeKeyRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { - if recordValue == nil { - return nil, fmt.Errorf("key RecordValue is nil") - } - - // Get schema configuration from topic config - schemaConfig, err := h.getTopicSchemaConfig(topicName) - if err != nil { - return nil, fmt.Errorf("failed to get topic schema config: %w", err) - } - - if !schemaConfig.HasKeySchema { - return nil, fmt.Errorf("no key schema configured for topic: %s", topicName) - } - - // Use schema manager to encode RecordValue back to original format - encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.KeySchemaID, schemaConfig.KeySchemaFormat) - if err != nil { - return nil, fmt.Errorf("failed to encode key RecordValue: %w", err) - } - - return encodedBytes, nil -} - // recordValueToJSON converts a RecordValue to JSON bytes (fallback) func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte { if recordValue == nil || recordValue.Fields == nil { @@ -1685,92 +1227,3 @@ func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte { return []byte(jsonStr) } - -// fetchPartitionData fetches data for a single partition (called concurrently) -func (h *Handler) fetchPartitionData( - ctx context.Context, - topicName string, - partition FetchPartition, - apiVersion uint16, - isSchematizedTopic bool, -) *partitionFetchResult { - startTime := time.Now() - result := &partitionFetchResult{} - - // Get the actual high water mark from SeaweedMQ - highWaterMark, err := h.seaweedMQHandler.GetLatestOffset(topicName, partition.PartitionID) - if err != nil { - highWaterMark = 0 - } - result.highWaterMark = highWaterMark - - // Check if topic exists - if !h.seaweedMQHandler.TopicExists(topicName) { - if isSystemTopic(topicName) { - // Auto-create system topics - if err := h.createTopicWithSchemaSupport(topicName, 1); err != nil { - result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION - result.fetchDuration = time.Since(startTime) - return result - } - } else { - result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION - result.fetchDuration = time.Since(startTime) - return result - } - } - - // Normalize special fetch offsets - effectiveFetchOffset := partition.FetchOffset - if effectiveFetchOffset < 0 { - if effectiveFetchOffset == -2 { - effectiveFetchOffset = 0 - } else if effectiveFetchOffset == -1 { - effectiveFetchOffset = highWaterMark - } - } - - // Fetch records if available - var recordBatch []byte - if highWaterMark > effectiveFetchOffset { - // Use multi-batch fetcher (pass context to respect timeout) - multiFetcher := NewMultiBatchFetcher(h) - fetchResult, err := multiFetcher.FetchMultipleBatches( - ctx, - topicName, - partition.PartitionID, - effectiveFetchOffset, - highWaterMark, - partition.MaxBytes, - ) - - if err == nil && fetchResult.TotalSize > 0 { - recordBatch = fetchResult.RecordBatches - } else { - // Fallback to single batch (pass context to respect timeout) - smqRecords, err := h.seaweedMQHandler.GetStoredRecords(ctx, topicName, partition.PartitionID, effectiveFetchOffset, 10) - if err == nil && len(smqRecords) > 0 { - recordBatch = h.constructRecordBatchFromSMQ(topicName, effectiveFetchOffset, smqRecords) - } else { - recordBatch = []byte{} - } - } - } else { - recordBatch = []byte{} - } - - // Try schematized records if needed and recordBatch is empty - if isSchematizedTopic && len(recordBatch) == 0 { - schematizedRecords, err := h.fetchSchematizedRecords(topicName, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes) - if err == nil && len(schematizedRecords) > 0 { - schematizedBatch := h.createSchematizedRecordBatch(schematizedRecords, effectiveFetchOffset) - if len(schematizedBatch) > 0 { - recordBatch = schematizedBatch - } - } - } - - result.recordBatch = recordBatch - result.fetchDuration = time.Since(startTime) - return result -} diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 2d157c75a..f00b6a92a 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -577,65 +577,6 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com return batch } -// estimateBatchSize estimates the size of a record batch before constructing it -func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []integration.SMQRecord) int32 { - if len(smqRecords) == 0 { - return 61 // empty batch header size - } - - // Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes + - // last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count) - headerSize := int32(61) - - baseTs := smqRecords[0].GetTimestamp() - recordsSize := int32(0) - for i, rec := range smqRecords { - // attributes(1) - rb := int32(1) - - // timestamp_delta(varint) - tsDelta := rec.GetTimestamp() - baseTs - rb += int32(len(encodeVarint(tsDelta))) - - // offset_delta(varint) - rb += int32(len(encodeVarint(int64(i)))) - - // key length varint + data or -1 - if k := rec.GetKey(); k != nil { - rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k)) - } else { - rb += int32(len(encodeVarint(-1))) - } - - // value length varint + data or -1 - if v := rec.GetValue(); v != nil { - rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v)) - } else { - rb += int32(len(encodeVarint(-1))) - } - - // headers count (varint = 0) - rb += int32(len(encodeVarint(0))) - - // prepend record length varint - recordsSize += int32(len(encodeVarint(int64(rb)))) + rb - } - - return headerSize + recordsSize -} - -// sizeOfVarint returns the number of bytes encodeVarint would use for value -func sizeOfVarint(value int64) int32 { - // ZigZag encode to match encodeVarint - u := uint64(uint64(value<<1) ^ uint64(value>>63)) - size := int32(1) - for u >= 0x80 { - u >>= 7 - size++ - } - return size -} - // compressData compresses data using the specified codec (basic implementation) func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) { // For Phase 5, implement basic compression support diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index fcfe196c2..4cf1e1e8c 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1248,7 +1248,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) + glog.V(1).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1323,7 +1323,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) + glog.V(1).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1436,7 +1436,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) + glog.V(1).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1544,7 +1544,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) + glog.V(1).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1671,7 +1671,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) + glog.V(1).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1688,24 +1688,24 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for _, topic := range requestedTopics { if isSystemTopic(topic) { // Always try to auto-create system topics during metadata requests - glog.V(0).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) + glog.V(1).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) if !h.seaweedMQHandler.TopicExists(topic) { - glog.V(0).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) + glog.V(1).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) if err := h.createTopicWithSchemaSupport(topic, 1); err != nil { glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err) // Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION } else { - glog.V(0).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) + glog.V(1).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) } } else { - glog.V(0).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) + glog.V(1).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) } topicsToReturn = append(topicsToReturn, topic) } else if h.seaweedMQHandler.TopicExists(topic) { topicsToReturn = append(topicsToReturn, topic) } } - glog.V(0).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) + glog.V(1).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) } var buf bytes.Buffer @@ -3986,7 +3986,7 @@ func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partiti // Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself // This was causing issues with Schema Registry bootstrap - glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) + glog.V(1).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) return h.seaweedMQHandler.CreateTopic(topicName, partitions) } diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index f1937e93e..95a94538d 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -172,16 +172,16 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re } if groupIsEmpty { - glog.V(0).Infof("[OFFSET_COMMIT] ✓ Committed (empty group): group=%s topic=%s partition=%d offset=%d", + glog.V(1).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", req.GroupID, t.Name, p.Index, p.Offset) } else { - glog.V(0).Infof("[OFFSET_COMMIT] ✓ Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + glog.V(1).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { // Do not store commit if generation mismatch errCode = 22 // IllegalGeneration - glog.V(0).Infof("[OFFSET_COMMIT] ❌ Rejected - generation mismatch: group=%s expected=%d got=%d members=%d", + glog.V(0).Infof("[OFFSET_COMMIT] Rejected - generation mismatch: group=%s expected=%d got=%d members=%d", req.GroupID, group.Generation, req.GenerationID, len(group.Members)) } @@ -212,14 +212,14 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { - glog.V(0).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID) + glog.V(1).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID) return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } group.Mu.RLock() defer group.Mu.RUnlock() - glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics) + glog.V(1).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics) // Build response response := OffsetFetchResponse{ @@ -255,7 +255,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(0).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + glog.V(1).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { // Fallback: try fetching from SMQ persistent storage @@ -269,10 +269,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(0).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d", + glog.V(1).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { - glog.V(0).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d", + glog.V(1).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d", request.GroupID, topic.Name, partition) } // No offset found in either location (-1 indicates no committed offset) @@ -285,7 +285,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req Metadata: metadata, ErrorCode: errorCode, } - glog.V(0).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d", + glog.V(1).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, fetchedOffset) topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) }