diff --git a/weed/mq/kafka/consumer/group_coordinator.go b/weed/mq/kafka/consumer/group_coordinator.go index ed18cded1..e7e8eab3f 100644 --- a/weed/mq/kafka/consumer/group_coordinator.go +++ b/weed/mq/kafka/consumer/group_coordinator.go @@ -1,6 +1,7 @@ package consumer import ( + "crypto/sha256" "fmt" "sync" "time" @@ -187,11 +188,12 @@ func (gc *GroupCoordinator) ListGroups() []string { return groups } -// GenerateMemberID creates a unique member ID +// GenerateMemberID creates a deterministic member ID based on client info func (gc *GroupCoordinator) GenerateMemberID(clientID, clientHost string) string { - // Use timestamp + client info to create unique member ID - timestamp := time.Now().UnixNano() - return fmt.Sprintf("%s-%s-%d", clientID, clientHost, timestamp) + // Use hash of client info to create deterministic member ID + // This ensures the same client gets the same member ID across calls + hash := fmt.Sprintf("%x", sha256.Sum256([]byte(clientID+"-"+clientHost))) + return fmt.Sprintf("%s-%s-%s", clientID, clientHost, hash[:8]) } // ValidateSessionTimeout checks if session timeout is within acceptable range diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 84b2060ad..770f85825 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -101,11 +101,33 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Handle member ID logic var memberID string var isNewMember bool + + // Use deterministic client identifier based on group + session timeout + protocol + clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType) if request.MemberID == "" { - // New member - generate ID - memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "unknown-host") - isNewMember = true + // New member - check if we already have a member for this client + + // Look for existing member with same client characteristics + var existingMemberID string + for existingID, member := range group.Members { + if member.ClientID == clientKey { + existingMemberID = existingID + break + } + } + + if existingMemberID != "" { + // Reuse existing member ID for this client + memberID = existingMemberID + isNewMember = false + fmt.Printf("DEBUG: JoinGroup reusing existing member ID '%s' for client key '%s'\n", memberID, clientKey) + } else { + // Generate new deterministic member ID + memberID = h.groupCoordinator.GenerateMemberID(clientKey, "consumer") + isNewMember = true + fmt.Printf("DEBUG: JoinGroup generated new member ID '%s' for client key '%s'\n", memberID, clientKey) + } } else { memberID = request.MemberID // Check if member exists @@ -113,6 +135,8 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Member ID provided but doesn't exist - reject return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil } + isNewMember = false + fmt.Printf("DEBUG: JoinGroup using provided member ID '%s'\n", memberID) } // Check group state @@ -135,7 +159,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque // Create or update member member := &consumer.GroupMember{ ID: memberID, - ClientID: request.GroupInstanceID, + ClientID: clientKey, // Use deterministic client key for member identification ClientHost: "unknown", // TODO: extract from connection - needed for consumer group metadata SessionTimeout: request.SessionTimeout, RebalanceTimeout: request.RebalanceTimeout,