diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index a43315971..f20e1a065 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -272,17 +272,6 @@ subscribeLoop: TsNs: logEntry.TsNs, } - // DEBUG: Log broker sending data to client - valuePreview := "" - if len(logEntry.Data) > 0 { - if len(logEntry.Data) <= 50 { - valuePreview = string(logEntry.Data) - } else { - valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data)) - } - } - glog.Infof("[BROKER_SEND] Sending to client=%s valueLen=%d valuePreview=%q offset=%d", - clientName, len(logEntry.Data), valuePreview, logEntry.Offset) if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: dataMsg, diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index bb86bc0bc..c01aac970 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/seaweedfs/seaweedfs/weed/filer_client" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -122,17 +121,6 @@ func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName stri offset := m.offsets[topicName][partitionID] m.offsets[topicName][partitionID]++ - // DEBUG: Log what's being stored - valuePreview := "" - if len(value) > 0 { - if len(value) <= 50 { - valuePreview = string(value) - } else { - valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(value[:50]), len(value)) - } - } - glog.Infof("[MOCK_STORE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q", - topicName, partitionID, offset, len(value), valuePreview) // Store record record := &mockRecord{ @@ -168,18 +156,6 @@ func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic strin result := make([]integration.SMQRecord, 0, maxRecords) for _, record := range partitionRecords { if record.GetOffset() >= fromOffset { - // DEBUG: Log what's being retrieved - valuePreview := "" - if len(record.GetValue()) > 0 { - if len(record.GetValue()) <= 50 { - valuePreview = string(record.GetValue()) - } else { - valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(record.GetValue()[:50]), len(record.GetValue())) - } - } - glog.Infof("[MOCK_RETRIEVE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q", - topic, partition, record.GetOffset(), len(record.GetValue()), valuePreview) - result = append(result, record) if len(result) >= maxRecords { break diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index de4999d0a..6b38a71e1 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -1132,23 +1132,11 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } - // DEBUG: Log what we're decoding - valuePreview := "" - if len(recordValueBytes) > 0 { - if len(recordValueBytes) <= 50 { - valuePreview = string(recordValueBytes) - } else { - valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(recordValueBytes[:50]), len(recordValueBytes)) - } - } - glog.Infof("[DECODE_START] topic=%s schemaEnabled=%v bytesLen=%d preview=%q", - topicName, h.IsSchemaEnabled(), len(recordValueBytes), valuePreview) // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. if strings.HasPrefix(topicName, "_") { - glog.Infof("[DECODE_SYSTEM_TOPIC] Returning raw bytes for system topic %s", topicName) return recordValueBytes } @@ -1156,7 +1144,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB // All messages are stored as raw bytes when schema management is disabled // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing if !h.IsSchemaEnabled() { - glog.Infof("[DECODE_NO_SCHEMA] Returning raw bytes (schema disabled)") return recordValueBytes } @@ -1173,12 +1160,9 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB // We need to check if this looks like a real RecordValue or just random bytes if !h.isValidRecordValue(recordValue, recordValueBytes) { // Not a valid RecordValue - return raw bytes as-is - glog.Infof("[DECODE_INVALID_RV] Not a valid RecordValue, returning raw bytes") return recordValueBytes } - glog.Infof("[DECODE_VALID_RV] Valid RecordValue detected, re-encoding...") - // If schema management is enabled, re-encode the RecordValue to Confluent format if h.IsSchemaEnabled() { if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { @@ -1188,9 +1172,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB } // Fallback: convert RecordValue to JSON - result := h.recordValueToJSON(recordValue) - glog.Infof("[DECODE_END] Returning JSON converted bytes, len=%d", len(result)) - return result + return h.recordValueToJSON(recordValue) } // isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index a10926509..3b7b99ada 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -355,17 +355,6 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star continue } - // DEBUG: Log unmarshaled entry with data preview - dataPreview := "" - if len(logEntry.Data) > 0 { - if len(logEntry.Data) <= 50 { - dataPreview = string(logEntry.Data) - } else { - dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data)) - } - } - glog.Infof("[LOG_BUFFER_UNMARSHAL] Offset=%d TsNs=%d dataLen=%d dataPreview=%q", - logEntry.Offset, logEntry.TsNs, len(logEntry.Data), dataPreview) glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))