From e18a8713872c8722367fe180bbe2e10d6431fe09 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 13:34:55 -0700 Subject: [PATCH] fmt --- weed/mq/kafka/consumer/group_coordinator.go | 96 ++++++++++----------- weed/mq/kafka/protocol/handler.go | 10 +-- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/weed/mq/kafka/consumer/group_coordinator.go b/weed/mq/kafka/consumer/group_coordinator.go index dc1a94dc2..ed18cded1 100644 --- a/weed/mq/kafka/consumer/group_coordinator.go +++ b/weed/mq/kafka/consumer/group_coordinator.go @@ -61,17 +61,17 @@ func (ms MemberState) String() string { // GroupMember represents a consumer in a consumer group type GroupMember struct { - ID string // Member ID (generated by gateway) - ClientID string // Client ID from consumer - ClientHost string // Client host/IP - SessionTimeout int32 // Session timeout in milliseconds - RebalanceTimeout int32 // Rebalance timeout in milliseconds - Subscription []string // Subscribed topics - Assignment []PartitionAssignment // Assigned partitions - Metadata []byte // Protocol-specific metadata - State MemberState // Current member state - LastHeartbeat time.Time // Last heartbeat timestamp - JoinedAt time.Time // When member joined group + ID string // Member ID (generated by gateway) + ClientID string // Client ID from consumer + ClientHost string // Client host/IP + SessionTimeout int32 // Session timeout in milliseconds + RebalanceTimeout int32 // Rebalance timeout in milliseconds + Subscription []string // Subscribed topics + Assignment []PartitionAssignment // Assigned partitions + Metadata []byte // Protocol-specific metadata + State MemberState // Current member state + LastHeartbeat time.Time // Last heartbeat timestamp + JoinedAt time.Time // When member joined group } // PartitionAssignment represents partition assignment for a member @@ -82,17 +82,17 @@ type PartitionAssignment struct { // ConsumerGroup represents a Kafka consumer group type ConsumerGroup struct { - ID string // Group ID - State GroupState // Current group state - Generation int32 // Generation ID (incremented on rebalance) - Protocol string // Assignment protocol (e.g., "range", "roundrobin") - Leader string // Leader member ID - Members map[string]*GroupMember // Group members by member ID - SubscribedTopics map[string]bool // Topics subscribed by group - OffsetCommits map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset - CreatedAt time.Time // Group creation time - LastActivity time.Time // Last activity (join, heartbeat, etc.) - + ID string // Group ID + State GroupState // Current group state + Generation int32 // Generation ID (incremented on rebalance) + Protocol string // Assignment protocol (e.g., "range", "roundrobin") + Leader string // Leader member ID + Members map[string]*GroupMember // Group members by member ID + SubscribedTopics map[string]bool // Topics subscribed by group + OffsetCommits map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset + CreatedAt time.Time // Group creation time + LastActivity time.Time // Last activity (join, heartbeat, etc.) + Mu sync.RWMutex // Protects group state } @@ -105,14 +105,14 @@ type OffsetCommit struct { // GroupCoordinator manages consumer groups type GroupCoordinator struct { - groups map[string]*ConsumerGroup // Group ID -> Group - groupsMu sync.RWMutex // Protects groups map - + groups map[string]*ConsumerGroup // Group ID -> Group + groupsMu sync.RWMutex // Protects groups map + // Configuration sessionTimeoutMin int32 // Minimum session timeout (ms) sessionTimeoutMax int32 // Maximum session timeout (ms) rebalanceTimeoutMs int32 // Default rebalance timeout (ms) - + // Cleanup cleanupTicker *time.Ticker stopChan chan struct{} @@ -123,16 +123,16 @@ type GroupCoordinator struct { func NewGroupCoordinator() *GroupCoordinator { gc := &GroupCoordinator{ groups: make(map[string]*ConsumerGroup), - sessionTimeoutMin: 6000, // 6 seconds - sessionTimeoutMax: 300000, // 5 minutes + sessionTimeoutMin: 6000, // 6 seconds + sessionTimeoutMax: 300000, // 5 minutes rebalanceTimeoutMs: 300000, // 5 minutes - stopChan: make(chan struct{}), + stopChan: make(chan struct{}), } - + // Start cleanup routine gc.cleanupTicker = time.NewTicker(30 * time.Second) go gc.cleanupRoutine() - + return gc } @@ -140,7 +140,7 @@ func NewGroupCoordinator() *GroupCoordinator { func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup { gc.groupsMu.Lock() defer gc.groupsMu.Unlock() - + group, exists := gc.groups[groupID] if !exists { group = &ConsumerGroup{ @@ -155,7 +155,7 @@ func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup { } gc.groups[groupID] = group } - + return group } @@ -163,7 +163,7 @@ func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup { func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup { gc.groupsMu.RLock() defer gc.groupsMu.RUnlock() - + return gc.groups[groupID] } @@ -171,7 +171,7 @@ func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup { func (gc *GroupCoordinator) RemoveGroup(groupID string) { gc.groupsMu.Lock() defer gc.groupsMu.Unlock() - + delete(gc.groups, groupID) } @@ -179,7 +179,7 @@ func (gc *GroupCoordinator) RemoveGroup(groupID string) { func (gc *GroupCoordinator) ListGroups() []string { gc.groupsMu.RLock() defer gc.groupsMu.RUnlock() - + groups := make([]string, 0, len(gc.groups)) for groupID := range gc.groups { groups = append(groups, groupID) @@ -216,10 +216,10 @@ func (gc *GroupCoordinator) performCleanup() { now := time.Now() gc.groupsMu.Lock() defer gc.groupsMu.Unlock() - + for groupID, group := range gc.groups { group.Mu.Lock() - + // Check for expired members expiredMembers := make([]string, 0) for memberID, member := range group.Members { @@ -228,7 +228,7 @@ func (gc *GroupCoordinator) performCleanup() { expiredMembers = append(expiredMembers, memberID) } } - + // Remove expired members for _, memberID := range expiredMembers { delete(group.Members, memberID) @@ -236,22 +236,22 @@ func (gc *GroupCoordinator) performCleanup() { group.Leader = "" } } - + // Update group state based on member count if len(group.Members) == 0 { if group.State != GroupStateEmpty { group.State = GroupStateEmpty group.Generation++ } - + // Mark group for deletion if empty for too long (30 minutes) if now.Sub(group.LastActivity) > 30*time.Minute { group.State = GroupStateDead } } - + group.Mu.Unlock() - + // Remove dead groups if group.State == GroupStateDead { delete(gc.groups, groupID) @@ -273,26 +273,26 @@ func (gc *GroupCoordinator) Close() { func (gc *GroupCoordinator) GetGroupStats() map[string]interface{} { gc.groupsMu.RLock() defer gc.groupsMu.RUnlock() - + stats := map[string]interface{}{ "total_groups": len(gc.groups), "group_states": make(map[string]int), } - + stateCount := make(map[GroupState]int) totalMembers := 0 - + for _, group := range gc.groups { group.Mu.RLock() stateCount[group.State]++ totalMembers += len(group.Members) group.Mu.RUnlock() } - + stats["total_members"] = totalMembers for state, count := range stateCount { stats["group_states"].(map[string]int)[state.String()] = count } - + return stats } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 48973ada5..991e765a5 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -8,7 +8,7 @@ import ( "net" "sync" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" @@ -32,14 +32,14 @@ type Handler struct { // Legacy in-memory mode (for backward compatibility and tests) topicsMu sync.RWMutex topics map[string]*TopicInfo // topic name -> topic info - + ledgersMu sync.RWMutex ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger - + // SeaweedMQ integration (optional, for production use) seaweedMQHandler *integration.SeaweedMQHandler useSeaweedMQ bool - + // Consumer group coordination groupCoordinator *consumer.GroupCoordinator } @@ -76,7 +76,7 @@ func (h *Handler) Close() error { if h.groupCoordinator != nil { h.groupCoordinator.Close() } - + // Close SeaweedMQ handler if present if h.useSeaweedMQ && h.seaweedMQHandler != nil { return h.seaweedMQHandler.Close()