Browse Source

less logs, remove unused code

pull/7329/head
chrislu 1 week ago
parent
commit
fd33e03008
  1. 14
      weed/mq/kafka/integration/broker_client_subscribe.go
  2. 547
      weed/mq/kafka/protocol/fetch.go
  3. 59
      weed/mq/kafka/protocol/fetch_multibatch.go
  4. 22
      weed/mq/kafka/protocol/handler.go
  5. 18
      weed/mq/kafka/protocol/offset_management.go

14
weed/mq/kafka/integration/broker_client_subscribe.go

@ -260,7 +260,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
if endIdx > len(session.consumedRecords) {
endIdx = len(session.consumedRecords)
}
glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)",
glog.V(2).Infof("[FETCH] Returning %d cached records for %s at offset %d (cache: %d-%d)",
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
session.mu.Unlock()
return session.consumedRecords[startIdx:endIdx], nil
@ -317,12 +317,12 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(2).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
glog.V(2).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
glog.V(2).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset)
glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset)
// Session still needs recreation - close it
if existingSession.Stream != nil {
@ -388,7 +388,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(2).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
glog.V(2).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Read from fresh subscriber
glog.V(2).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
@ -586,7 +586,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
if result.err != nil {
glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
// Update session offset before returning
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)",
glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (error case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset
return records, nil
@ -612,7 +612,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
// Timeout - return what we have
glog.V(2).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart))
// CRITICAL: Update session offset so next fetch knows where we left off
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)",
glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (timeout case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset
return records, nil
@ -621,7 +621,7 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
// Update session offset after successful read
glog.V(2).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)",
glog.V(2).Infof("[FETCH] Updating %s offset: %d -> %d (success case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset

547
weed/mq/kafka/protocol/fetch.go

@ -871,373 +871,12 @@ func encodeVarint(value int64) []byte {
return buf
}
// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue
func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) {
// Only reconstruct if schema management is enabled
if !h.IsSchemaEnabled() {
return nil, fmt.Errorf("schema management not enabled")
}
// Extract schema information from metadata
schemaIDStr, exists := metadata["schema_id"]
if !exists {
return nil, fmt.Errorf("no schema ID in metadata")
}
var schemaID uint32
if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil {
return nil, fmt.Errorf("invalid schema ID: %w", err)
}
formatStr, exists := metadata["schema_format"]
if !exists {
return nil, fmt.Errorf("no schema format in metadata")
}
var format schema.Format
switch formatStr {
case "AVRO":
format = schema.FormatAvro
case "PROTOBUF":
format = schema.FormatProtobuf
case "JSON_SCHEMA":
format = schema.FormatJSONSchema
default:
return nil, fmt.Errorf("unsupported schema format: %s", formatStr)
}
// Use schema manager to encode back to original format
return h.schemaManager.EncodeMessage(recordValue, schemaID, format)
}
// SchematizedRecord holds both key and value for schematized messages
type SchematizedRecord struct {
Key []byte
Value []byte
}
// fetchSchematizedRecords fetches and reconstructs schematized records from SeaweedMQ
func (h *Handler) fetchSchematizedRecords(topicName string, partitionID int32, offset int64, maxBytes int32) ([]*SchematizedRecord, error) {
glog.Infof("fetchSchematizedRecords: topic=%s partition=%d offset=%d maxBytes=%d", topicName, partitionID, offset, maxBytes)
// Only proceed when schema feature is toggled on
if !h.useSchema {
glog.Infof("fetchSchematizedRecords EARLY RETURN: useSchema=false")
return []*SchematizedRecord{}, nil
}
// Check if SeaweedMQ handler is available when schema feature is in use
if h.seaweedMQHandler == nil {
glog.Infof("fetchSchematizedRecords ERROR: seaweedMQHandler is nil")
return nil, fmt.Errorf("SeaweedMQ handler not available")
}
// If schema management isn't fully configured, return empty instead of error
if !h.IsSchemaEnabled() {
glog.Infof("fetchSchematizedRecords EARLY RETURN: IsSchemaEnabled()=false")
return []*SchematizedRecord{}, nil
}
// Fetch stored records from SeaweedMQ
maxRecords := 100 // Reasonable batch size limit
glog.Infof("fetchSchematizedRecords: calling GetStoredRecords maxRecords=%d", maxRecords)
smqRecords, err := h.seaweedMQHandler.GetStoredRecords(context.Background(), topicName, partitionID, offset, maxRecords)
if err != nil {
glog.Infof("fetchSchematizedRecords ERROR: GetStoredRecords failed: %v", err)
return nil, fmt.Errorf("failed to fetch SMQ records: %w", err)
}
glog.Infof("fetchSchematizedRecords: GetStoredRecords returned %d records", len(smqRecords))
if len(smqRecords) == 0 {
return []*SchematizedRecord{}, nil
}
var reconstructedRecords []*SchematizedRecord
totalBytes := int32(0)
for _, smqRecord := range smqRecords {
// Check if we've exceeded maxBytes limit
if maxBytes > 0 && totalBytes >= maxBytes {
break
}
// Try to reconstruct the schematized message value
reconstructedValue, err := h.reconstructSchematizedMessageFromSMQ(smqRecord)
if err != nil {
// Log error but continue with other messages
Error("Failed to reconstruct schematized message at offset %d: %v", smqRecord.GetOffset(), err)
continue
}
if reconstructedValue != nil {
// Create SchematizedRecord with both key and reconstructed value
record := &SchematizedRecord{
Key: smqRecord.GetKey(), // Preserve the original key
Value: reconstructedValue, // Use the reconstructed value
}
reconstructedRecords = append(reconstructedRecords, record)
totalBytes += int32(len(record.Key) + len(record.Value))
}
}
return reconstructedRecords, nil
}
// reconstructSchematizedMessageFromSMQ reconstructs a schematized message from an SMQRecord
func (h *Handler) reconstructSchematizedMessageFromSMQ(smqRecord integration.SMQRecord) ([]byte, error) {
// Get the stored value (should be a serialized RecordValue)
valueBytes := smqRecord.GetValue()
if len(valueBytes) == 0 {
return nil, fmt.Errorf("empty value in SMQ record")
}
// Try to unmarshal as RecordValue
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(valueBytes, recordValue); err != nil {
// If it's not a RecordValue, it might be a regular Kafka message
// Return it as-is (non-schematized)
return valueBytes, nil
}
// Extract schema metadata from the RecordValue fields
metadata := h.extractSchemaMetadataFromRecord(recordValue)
if len(metadata) == 0 {
// No schema metadata found, treat as regular message
return valueBytes, nil
}
// Remove Kafka metadata fields to get the original message content
originalRecord := h.removeKafkaMetadataFields(recordValue)
// Reconstruct the original Confluent envelope
return h.reconstructSchematizedMessage(originalRecord, metadata)
}
// extractSchemaMetadataFromRecord extracts schema metadata from RecordValue fields
func (h *Handler) extractSchemaMetadataFromRecord(recordValue *schema_pb.RecordValue) map[string]string {
metadata := make(map[string]string)
// Look for schema metadata fields in the record
if schemaIDField := recordValue.Fields["_schema_id"]; schemaIDField != nil {
if schemaIDValue := schemaIDField.GetStringValue(); schemaIDValue != "" {
metadata["schema_id"] = schemaIDValue
}
}
if schemaFormatField := recordValue.Fields["_schema_format"]; schemaFormatField != nil {
if schemaFormatValue := schemaFormatField.GetStringValue(); schemaFormatValue != "" {
metadata["schema_format"] = schemaFormatValue
}
}
if schemaSubjectField := recordValue.Fields["_schema_subject"]; schemaSubjectField != nil {
if schemaSubjectValue := schemaSubjectField.GetStringValue(); schemaSubjectValue != "" {
metadata["schema_subject"] = schemaSubjectValue
}
}
if schemaVersionField := recordValue.Fields["_schema_version"]; schemaVersionField != nil {
if schemaVersionValue := schemaVersionField.GetStringValue(); schemaVersionValue != "" {
metadata["schema_version"] = schemaVersionValue
}
}
return metadata
}
// removeKafkaMetadataFields removes Kafka and schema metadata fields from RecordValue
func (h *Handler) removeKafkaMetadataFields(recordValue *schema_pb.RecordValue) *schema_pb.RecordValue {
originalRecord := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
// Copy all fields except metadata fields
for key, value := range recordValue.Fields {
if !h.isMetadataField(key) {
originalRecord.Fields[key] = value
}
}
return originalRecord
}
// isMetadataField checks if a field is a metadata field that should be excluded from the original message
func (h *Handler) isMetadataField(fieldName string) bool {
return fieldName == "_kafka_offset" ||
fieldName == "_kafka_partition" ||
fieldName == "_kafka_timestamp" ||
fieldName == "_schema_id" ||
fieldName == "_schema_format" ||
fieldName == "_schema_subject" ||
fieldName == "_schema_version"
}
// createSchematizedRecordBatch creates a Kafka record batch from reconstructed schematized messages
func (h *Handler) createSchematizedRecordBatch(records []*SchematizedRecord, baseOffset int64) []byte {
if len(records) == 0 {
// Return empty record batch
return h.createEmptyRecordBatch(baseOffset)
}
// Create individual record entries for the batch
var recordsData []byte
currentTimestamp := time.Now().UnixMilli()
for i, record := range records {
// Create a record entry (Kafka record format v2) with both key and value
recordEntry := h.createRecordEntry(record.Key, record.Value, int32(i), currentTimestamp)
recordsData = append(recordsData, recordEntry...)
}
// Apply compression if the data is large enough to benefit
enableCompression := len(recordsData) > 100
var compressionType compression.CompressionCodec = compression.None
var finalRecordsData []byte
if enableCompression {
compressed, err := compression.Compress(compression.Gzip, recordsData)
if err == nil && len(compressed) < len(recordsData) {
finalRecordsData = compressed
compressionType = compression.Gzip
} else {
finalRecordsData = recordsData
}
} else {
finalRecordsData = recordsData
}
// Create the record batch with proper compression and CRC
batch, err := h.createRecordBatchWithCompressionAndCRC(baseOffset, finalRecordsData, compressionType, int32(len(records)), currentTimestamp)
if err != nil {
// Fallback to simple batch creation
return h.createRecordBatchWithPayload(baseOffset, int32(len(records)), finalRecordsData)
}
return batch
}
// createRecordEntry creates a single record entry in Kafka record format v2
func (h *Handler) createRecordEntry(messageKey []byte, messageData []byte, offsetDelta int32, timestamp int64) []byte {
// Record format v2:
// - length (varint)
// - attributes (int8)
// - timestamp delta (varint)
// - offset delta (varint)
// - key length (varint) + key
// - value length (varint) + value
// - headers count (varint) + headers
var record []byte
// Attributes (1 byte) - no special attributes
record = append(record, 0)
// Timestamp delta (varint) - 0 for now (all messages have same timestamp)
record = append(record, encodeVarint(0)...)
// Offset delta (varint)
record = append(record, encodeVarint(int64(offsetDelta))...)
// Key length (varint) + key
if messageKey == nil || len(messageKey) == 0 {
record = append(record, encodeVarint(-1)...) // -1 indicates null key
} else {
record = append(record, encodeVarint(int64(len(messageKey)))...)
record = append(record, messageKey...)
}
// Value length (varint) + value
record = append(record, encodeVarint(int64(len(messageData)))...)
record = append(record, messageData...)
// Headers count (varint) - no headers
record = append(record, encodeVarint(0)...)
// Prepend the total record length (varint)
recordLength := encodeVarint(int64(len(record)))
return append(recordLength, record...)
}
// createRecordBatchWithCompressionAndCRC creates a Kafka record batch with proper compression and CRC
func (h *Handler) createRecordBatchWithCompressionAndCRC(baseOffset int64, recordsData []byte, compressionType compression.CompressionCodec, recordCount int32, baseTimestampMs int64) ([]byte, error) {
// Create record batch header
// Validate size to prevent overflow
const maxBatchSize = 1 << 30 // 1 GB limit
if len(recordsData) > maxBatchSize-61 {
return nil, fmt.Errorf("records data too large: %d bytes", len(recordsData))
}
batch := make([]byte, 0, len(recordsData)+61) // 61 bytes for header
// Base offset (8 bytes)
baseOffsetBytes := make([]byte, 8)
binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
batch = append(batch, baseOffsetBytes...)
// Batch length placeholder (4 bytes) - will be filled later
batchLengthPos := len(batch)
batch = append(batch, 0, 0, 0, 0)
// Partition leader epoch (4 bytes)
batch = append(batch, 0, 0, 0, 0)
// Magic byte (1 byte) - version 2
batch = append(batch, 2)
// CRC placeholder (4 bytes) - will be calculated later
crcPos := len(batch)
batch = append(batch, 0, 0, 0, 0)
// Attributes (2 bytes) - compression type and other flags
attributes := int16(compressionType) // Set compression type in lower 3 bits
attributesBytes := make([]byte, 2)
binary.BigEndian.PutUint16(attributesBytes, uint16(attributes))
batch = append(batch, attributesBytes...)
// Last offset delta (4 bytes)
lastOffsetDelta := uint32(recordCount - 1)
lastOffsetDeltaBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta)
batch = append(batch, lastOffsetDeltaBytes...)
// First timestamp (8 bytes) - use the same timestamp used to build record entries
firstTimestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(firstTimestampBytes, uint64(baseTimestampMs))
batch = append(batch, firstTimestampBytes...)
// Max timestamp (8 bytes) - same as first for simplicity
batch = append(batch, firstTimestampBytes...)
// Producer ID (8 bytes) - -1 for non-transactional
batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
// Producer epoch (2 bytes) - -1 for non-transactional
batch = append(batch, 0xFF, 0xFF)
// Base sequence (4 bytes) - -1 for non-transactional
batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
// Record count (4 bytes)
recordCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(recordCountBytes, uint32(recordCount))
batch = append(batch, recordCountBytes...)
// Records payload (compressed or uncompressed)
batch = append(batch, recordsData...)
// Calculate and set batch length (excluding base offset and batch length fields)
batchLength := len(batch) - 12 // 8 bytes base offset + 4 bytes batch length
binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], uint32(batchLength))
// Calculate and set CRC32 over attributes..end (exclude CRC field itself)
// Kafka uses Castagnoli (CRC-32C) algorithm. CRC covers ONLY from attributes offset (byte 21) onwards.
// See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...)
crcData := batch[crcPos+4:] // Skip CRC field itself (bytes 17..20) and include the rest
crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
return batch, nil
}
// createEmptyRecordBatch creates an empty Kafka record batch using the new parser
func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte {
// Use the new record batch creation function with no compression
@ -1307,47 +946,6 @@ func (h *Handler) createEmptyRecordBatchManual(baseOffset int64) []byte {
return batch
}
// createRecordBatchWithPayload creates a record batch with the given payload
func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte {
// For Phase 7, create a simplified record batch
// In Phase 8, this will implement proper Kafka record batch format v2
batch := h.createEmptyRecordBatch(baseOffset)
// Update record count
recordCountOffset := len(batch) - 4
binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount))
// Append payload (simplified - real implementation would format individual records)
batch = append(batch, payload...)
// Update batch length
batchLength := len(batch) - 12
binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength))
return batch
}
// handleSchematizedFetch handles fetch requests for topics with schematized messages
func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, offset int64, maxBytes int32) ([]byte, error) {
// Check if this topic uses schema management
if !h.IsSchemaEnabled() {
// Fall back to regular fetch handling
return nil, fmt.Errorf("schema management not enabled")
}
// Fetch schematized records from SeaweedMQ
records, err := h.fetchSchematizedRecords(topicName, partitionID, offset, maxBytes)
if err != nil {
return nil, fmt.Errorf("failed to fetch schematized records: %w", err)
}
// Create record batch from reconstructed records
recordBatch := h.createSchematizedRecordBatch(records, offset)
return recordBatch, nil
}
// isSchematizedTopic checks if a topic uses schema management
func (h *Handler) isSchematizedTopic(topicName string) bool {
// System topics (_schemas, __consumer_offsets, etc.) should NEVER use schema encoding
@ -1593,62 +1191,6 @@ func (h *Handler) getTopicSchemaConfig(topicName string) (*TopicSchemaConfig, er
return config, nil
}
// decodeRecordValueToKafkaKey decodes a key RecordValue back to the original Kafka key bytes
func (h *Handler) decodeRecordValueToKafkaKey(topicName string, keyRecordValueBytes []byte) []byte {
if keyRecordValueBytes == nil {
return nil
}
// Try to get topic schema config
schemaConfig, err := h.getTopicSchemaConfig(topicName)
if err != nil || !schemaConfig.HasKeySchema {
// No key schema config available, return raw bytes
return keyRecordValueBytes
}
// Try to unmarshal as RecordValue
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(keyRecordValueBytes, recordValue); err != nil {
// If it's not a RecordValue, return the raw bytes
return keyRecordValueBytes
}
// If key schema management is enabled, re-encode the RecordValue to Confluent format
if h.IsSchemaEnabled() {
if encodedKey, err := h.encodeKeyRecordValueToConfluentFormat(topicName, recordValue); err == nil {
return encodedKey
}
}
// Fallback: convert RecordValue to JSON
return h.recordValueToJSON(recordValue)
}
// encodeKeyRecordValueToConfluentFormat re-encodes a key RecordValue back to Confluent format
func (h *Handler) encodeKeyRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) {
if recordValue == nil {
return nil, fmt.Errorf("key RecordValue is nil")
}
// Get schema configuration from topic config
schemaConfig, err := h.getTopicSchemaConfig(topicName)
if err != nil {
return nil, fmt.Errorf("failed to get topic schema config: %w", err)
}
if !schemaConfig.HasKeySchema {
return nil, fmt.Errorf("no key schema configured for topic: %s", topicName)
}
// Use schema manager to encode RecordValue back to original format
encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.KeySchemaID, schemaConfig.KeySchemaFormat)
if err != nil {
return nil, fmt.Errorf("failed to encode key RecordValue: %w", err)
}
return encodedBytes, nil
}
// recordValueToJSON converts a RecordValue to JSON bytes (fallback)
func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte {
if recordValue == nil || recordValue.Fields == nil {
@ -1685,92 +1227,3 @@ func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte {
return []byte(jsonStr)
}
// fetchPartitionData fetches data for a single partition (called concurrently)
func (h *Handler) fetchPartitionData(
ctx context.Context,
topicName string,
partition FetchPartition,
apiVersion uint16,
isSchematizedTopic bool,
) *partitionFetchResult {
startTime := time.Now()
result := &partitionFetchResult{}
// Get the actual high water mark from SeaweedMQ
highWaterMark, err := h.seaweedMQHandler.GetLatestOffset(topicName, partition.PartitionID)
if err != nil {
highWaterMark = 0
}
result.highWaterMark = highWaterMark
// Check if topic exists
if !h.seaweedMQHandler.TopicExists(topicName) {
if isSystemTopic(topicName) {
// Auto-create system topics
if err := h.createTopicWithSchemaSupport(topicName, 1); err != nil {
result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
result.fetchDuration = time.Since(startTime)
return result
}
} else {
result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
result.fetchDuration = time.Since(startTime)
return result
}
}
// Normalize special fetch offsets
effectiveFetchOffset := partition.FetchOffset
if effectiveFetchOffset < 0 {
if effectiveFetchOffset == -2 {
effectiveFetchOffset = 0
} else if effectiveFetchOffset == -1 {
effectiveFetchOffset = highWaterMark
}
}
// Fetch records if available
var recordBatch []byte
if highWaterMark > effectiveFetchOffset {
// Use multi-batch fetcher (pass context to respect timeout)
multiFetcher := NewMultiBatchFetcher(h)
fetchResult, err := multiFetcher.FetchMultipleBatches(
ctx,
topicName,
partition.PartitionID,
effectiveFetchOffset,
highWaterMark,
partition.MaxBytes,
)
if err == nil && fetchResult.TotalSize > 0 {
recordBatch = fetchResult.RecordBatches
} else {
// Fallback to single batch (pass context to respect timeout)
smqRecords, err := h.seaweedMQHandler.GetStoredRecords(ctx, topicName, partition.PartitionID, effectiveFetchOffset, 10)
if err == nil && len(smqRecords) > 0 {
recordBatch = h.constructRecordBatchFromSMQ(topicName, effectiveFetchOffset, smqRecords)
} else {
recordBatch = []byte{}
}
}
} else {
recordBatch = []byte{}
}
// Try schematized records if needed and recordBatch is empty
if isSchematizedTopic && len(recordBatch) == 0 {
schematizedRecords, err := h.fetchSchematizedRecords(topicName, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes)
if err == nil && len(schematizedRecords) > 0 {
schematizedBatch := h.createSchematizedRecordBatch(schematizedRecords, effectiveFetchOffset)
if len(schematizedBatch) > 0 {
recordBatch = schematizedBatch
}
}
}
result.recordBatch = recordBatch
result.fetchDuration = time.Since(startTime)
return result
}

59
weed/mq/kafka/protocol/fetch_multibatch.go

@ -577,65 +577,6 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com
return batch
}
// estimateBatchSize estimates the size of a record batch before constructing it
func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []integration.SMQRecord) int32 {
if len(smqRecords) == 0 {
return 61 // empty batch header size
}
// Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes +
// last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count)
headerSize := int32(61)
baseTs := smqRecords[0].GetTimestamp()
recordsSize := int32(0)
for i, rec := range smqRecords {
// attributes(1)
rb := int32(1)
// timestamp_delta(varint)
tsDelta := rec.GetTimestamp() - baseTs
rb += int32(len(encodeVarint(tsDelta)))
// offset_delta(varint)
rb += int32(len(encodeVarint(int64(i))))
// key length varint + data or -1
if k := rec.GetKey(); k != nil {
rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k))
} else {
rb += int32(len(encodeVarint(-1)))
}
// value length varint + data or -1
if v := rec.GetValue(); v != nil {
rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v))
} else {
rb += int32(len(encodeVarint(-1)))
}
// headers count (varint = 0)
rb += int32(len(encodeVarint(0)))
// prepend record length varint
recordsSize += int32(len(encodeVarint(int64(rb)))) + rb
}
return headerSize + recordsSize
}
// sizeOfVarint returns the number of bytes encodeVarint would use for value
func sizeOfVarint(value int64) int32 {
// ZigZag encode to match encodeVarint
u := uint64(uint64(value<<1) ^ uint64(value>>63))
size := int32(1)
for u >= 0x80 {
u >>= 7
size++
}
return size
}
// compressData compresses data using the specified codec (basic implementation)
func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) {
// For Phase 5, implement basic compression support

22
weed/mq/kafka/protocol/handler.go

@ -1248,7 +1248,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
glog.V(0).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics)
glog.V(1).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@ -1323,7 +1323,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
glog.V(0).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics)
glog.V(1).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@ -1436,7 +1436,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
glog.V(0).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics)
glog.V(1).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@ -1544,7 +1544,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
glog.V(0).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics)
glog.V(1).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@ -1671,7 +1671,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
glog.V(0).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics)
glog.V(1).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@ -1688,24 +1688,24 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
for _, topic := range requestedTopics {
if isSystemTopic(topic) {
// Always try to auto-create system topics during metadata requests
glog.V(0).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic)
glog.V(1).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic)
if !h.seaweedMQHandler.TopicExists(topic) {
glog.V(0).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic)
glog.V(1).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic)
if err := h.createTopicWithSchemaSupport(topic, 1); err != nil {
glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err)
// Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION
} else {
glog.V(0).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic)
glog.V(1).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic)
}
} else {
glog.V(0).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic)
glog.V(1).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic)
}
topicsToReturn = append(topicsToReturn, topic)
} else if h.seaweedMQHandler.TopicExists(topic) {
topicsToReturn = append(topicsToReturn, topic)
}
}
glog.V(0).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics)
glog.V(1).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics)
}
var buf bytes.Buffer
@ -3986,7 +3986,7 @@ func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partiti
// Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself
// This was causing issues with Schema Registry bootstrap
glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
glog.V(1).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
return h.seaweedMQHandler.CreateTopic(topicName, partitions)
}

18
weed/mq/kafka/protocol/offset_management.go

@ -172,16 +172,16 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
}
if groupIsEmpty {
glog.V(0).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d",
glog.V(1).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d",
req.GroupID, t.Name, p.Index, p.Offset)
} else {
glog.V(0).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d",
glog.V(1).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d",
req.GroupID, t.Name, p.Index, p.Offset, group.Generation)
}
} else {
// Do not store commit if generation mismatch
errCode = 22 // IllegalGeneration
glog.V(0).Infof("[OFFSET_COMMIT] Rejected - generation mismatch: group=%s expected=%d got=%d members=%d",
glog.V(0).Infof("[OFFSET_COMMIT] Rejected - generation mismatch: group=%s expected=%d got=%d members=%d",
req.GroupID, group.Generation, req.GenerationID, len(group.Members))
}
@ -212,14 +212,14 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
glog.V(0).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID)
glog.V(1).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID)
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.RLock()
defer group.Mu.RUnlock()
glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics)
glog.V(1).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics)
// Build response
response := OffsetFetchResponse{
@ -255,7 +255,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(0).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
glog.V(1).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
// Fallback: try fetching from SMQ persistent storage
@ -269,10 +269,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(0).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d",
glog.V(1).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
glog.V(0).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d",
glog.V(1).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d",
request.GroupID, topic.Name, partition)
}
// No offset found in either location (-1 indicates no committed offset)
@ -285,7 +285,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
Metadata: metadata,
ErrorCode: errorCode,
}
glog.V(0).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d",
glog.V(1).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, fetchedOffset)
topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse)
}

Loading…
Cancel
Save