diff --git a/.gitignore b/.gitignore index 044120bcd..cd240ab6d 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,4 @@ ADVANCED_IAM_DEVELOPMENT_PLAN.md /test/s3/iam/test-volume-data *.log weed-iam +test/kafka/kafka-client-loadtest/weed-linux-arm64 diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 89a46718c..76727628a 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -934,6 +934,18 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) + // Log consumer group coordination requests + switch apiKey { + case 9: + glog.Warningf("🟡 OffsetFetch: correlationID=%d from %s", correlationID, connectionID) + case 11: + glog.Warningf("🟡 JoinGroup: correlationID=%d from %s", correlationID, connectionID) + case 12: + glog.Warningf("🟡 Heartbeat: correlationID=%d from %s", correlationID, connectionID) + case 14: + glog.Warningf("🟡 SyncGroup: correlationID=%d from %s", correlationID, connectionID) + } + // Extract request body - special handling for ApiVersions requests var requestBody []byte if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 {