Browse Source

fmt

pull/7231/head
chrislu 2 months ago
parent
commit
e18a871387
  1. 96
      weed/mq/kafka/consumer/group_coordinator.go
  2. 10
      weed/mq/kafka/protocol/handler.go

96
weed/mq/kafka/consumer/group_coordinator.go

@ -61,17 +61,17 @@ func (ms MemberState) String() string {
// GroupMember represents a consumer in a consumer group // GroupMember represents a consumer in a consumer group
type GroupMember struct { type GroupMember struct {
ID string // Member ID (generated by gateway)
ClientID string // Client ID from consumer
ClientHost string // Client host/IP
SessionTimeout int32 // Session timeout in milliseconds
RebalanceTimeout int32 // Rebalance timeout in milliseconds
Subscription []string // Subscribed topics
Assignment []PartitionAssignment // Assigned partitions
Metadata []byte // Protocol-specific metadata
State MemberState // Current member state
LastHeartbeat time.Time // Last heartbeat timestamp
JoinedAt time.Time // When member joined group
ID string // Member ID (generated by gateway)
ClientID string // Client ID from consumer
ClientHost string // Client host/IP
SessionTimeout int32 // Session timeout in milliseconds
RebalanceTimeout int32 // Rebalance timeout in milliseconds
Subscription []string // Subscribed topics
Assignment []PartitionAssignment // Assigned partitions
Metadata []byte // Protocol-specific metadata
State MemberState // Current member state
LastHeartbeat time.Time // Last heartbeat timestamp
JoinedAt time.Time // When member joined group
} }
// PartitionAssignment represents partition assignment for a member // PartitionAssignment represents partition assignment for a member
@ -82,17 +82,17 @@ type PartitionAssignment struct {
// ConsumerGroup represents a Kafka consumer group // ConsumerGroup represents a Kafka consumer group
type ConsumerGroup struct { type ConsumerGroup struct {
ID string // Group ID
State GroupState // Current group state
Generation int32 // Generation ID (incremented on rebalance)
Protocol string // Assignment protocol (e.g., "range", "roundrobin")
Leader string // Leader member ID
Members map[string]*GroupMember // Group members by member ID
SubscribedTopics map[string]bool // Topics subscribed by group
OffsetCommits map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset
CreatedAt time.Time // Group creation time
LastActivity time.Time // Last activity (join, heartbeat, etc.)
ID string // Group ID
State GroupState // Current group state
Generation int32 // Generation ID (incremented on rebalance)
Protocol string // Assignment protocol (e.g., "range", "roundrobin")
Leader string // Leader member ID
Members map[string]*GroupMember // Group members by member ID
SubscribedTopics map[string]bool // Topics subscribed by group
OffsetCommits map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset
CreatedAt time.Time // Group creation time
LastActivity time.Time // Last activity (join, heartbeat, etc.)
Mu sync.RWMutex // Protects group state Mu sync.RWMutex // Protects group state
} }
@ -105,14 +105,14 @@ type OffsetCommit struct {
// GroupCoordinator manages consumer groups // GroupCoordinator manages consumer groups
type GroupCoordinator struct { type GroupCoordinator struct {
groups map[string]*ConsumerGroup // Group ID -> Group
groupsMu sync.RWMutex // Protects groups map
groups map[string]*ConsumerGroup // Group ID -> Group
groupsMu sync.RWMutex // Protects groups map
// Configuration // Configuration
sessionTimeoutMin int32 // Minimum session timeout (ms) sessionTimeoutMin int32 // Minimum session timeout (ms)
sessionTimeoutMax int32 // Maximum session timeout (ms) sessionTimeoutMax int32 // Maximum session timeout (ms)
rebalanceTimeoutMs int32 // Default rebalance timeout (ms) rebalanceTimeoutMs int32 // Default rebalance timeout (ms)
// Cleanup // Cleanup
cleanupTicker *time.Ticker cleanupTicker *time.Ticker
stopChan chan struct{} stopChan chan struct{}
@ -123,16 +123,16 @@ type GroupCoordinator struct {
func NewGroupCoordinator() *GroupCoordinator { func NewGroupCoordinator() *GroupCoordinator {
gc := &GroupCoordinator{ gc := &GroupCoordinator{
groups: make(map[string]*ConsumerGroup), groups: make(map[string]*ConsumerGroup),
sessionTimeoutMin: 6000, // 6 seconds
sessionTimeoutMax: 300000, // 5 minutes
sessionTimeoutMin: 6000, // 6 seconds
sessionTimeoutMax: 300000, // 5 minutes
rebalanceTimeoutMs: 300000, // 5 minutes rebalanceTimeoutMs: 300000, // 5 minutes
stopChan: make(chan struct{}),
stopChan: make(chan struct{}),
} }
// Start cleanup routine // Start cleanup routine
gc.cleanupTicker = time.NewTicker(30 * time.Second) gc.cleanupTicker = time.NewTicker(30 * time.Second)
go gc.cleanupRoutine() go gc.cleanupRoutine()
return gc return gc
} }
@ -140,7 +140,7 @@ func NewGroupCoordinator() *GroupCoordinator {
func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup { func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup {
gc.groupsMu.Lock() gc.groupsMu.Lock()
defer gc.groupsMu.Unlock() defer gc.groupsMu.Unlock()
group, exists := gc.groups[groupID] group, exists := gc.groups[groupID]
if !exists { if !exists {
group = &ConsumerGroup{ group = &ConsumerGroup{
@ -155,7 +155,7 @@ func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup {
} }
gc.groups[groupID] = group gc.groups[groupID] = group
} }
return group return group
} }
@ -163,7 +163,7 @@ func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup {
func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup { func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup {
gc.groupsMu.RLock() gc.groupsMu.RLock()
defer gc.groupsMu.RUnlock() defer gc.groupsMu.RUnlock()
return gc.groups[groupID] return gc.groups[groupID]
} }
@ -171,7 +171,7 @@ func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup {
func (gc *GroupCoordinator) RemoveGroup(groupID string) { func (gc *GroupCoordinator) RemoveGroup(groupID string) {
gc.groupsMu.Lock() gc.groupsMu.Lock()
defer gc.groupsMu.Unlock() defer gc.groupsMu.Unlock()
delete(gc.groups, groupID) delete(gc.groups, groupID)
} }
@ -179,7 +179,7 @@ func (gc *GroupCoordinator) RemoveGroup(groupID string) {
func (gc *GroupCoordinator) ListGroups() []string { func (gc *GroupCoordinator) ListGroups() []string {
gc.groupsMu.RLock() gc.groupsMu.RLock()
defer gc.groupsMu.RUnlock() defer gc.groupsMu.RUnlock()
groups := make([]string, 0, len(gc.groups)) groups := make([]string, 0, len(gc.groups))
for groupID := range gc.groups { for groupID := range gc.groups {
groups = append(groups, groupID) groups = append(groups, groupID)
@ -216,10 +216,10 @@ func (gc *GroupCoordinator) performCleanup() {
now := time.Now() now := time.Now()
gc.groupsMu.Lock() gc.groupsMu.Lock()
defer gc.groupsMu.Unlock() defer gc.groupsMu.Unlock()
for groupID, group := range gc.groups { for groupID, group := range gc.groups {
group.Mu.Lock() group.Mu.Lock()
// Check for expired members // Check for expired members
expiredMembers := make([]string, 0) expiredMembers := make([]string, 0)
for memberID, member := range group.Members { for memberID, member := range group.Members {
@ -228,7 +228,7 @@ func (gc *GroupCoordinator) performCleanup() {
expiredMembers = append(expiredMembers, memberID) expiredMembers = append(expiredMembers, memberID)
} }
} }
// Remove expired members // Remove expired members
for _, memberID := range expiredMembers { for _, memberID := range expiredMembers {
delete(group.Members, memberID) delete(group.Members, memberID)
@ -236,22 +236,22 @@ func (gc *GroupCoordinator) performCleanup() {
group.Leader = "" group.Leader = ""
} }
} }
// Update group state based on member count // Update group state based on member count
if len(group.Members) == 0 { if len(group.Members) == 0 {
if group.State != GroupStateEmpty { if group.State != GroupStateEmpty {
group.State = GroupStateEmpty group.State = GroupStateEmpty
group.Generation++ group.Generation++
} }
// Mark group for deletion if empty for too long (30 minutes) // Mark group for deletion if empty for too long (30 minutes)
if now.Sub(group.LastActivity) > 30*time.Minute { if now.Sub(group.LastActivity) > 30*time.Minute {
group.State = GroupStateDead group.State = GroupStateDead
} }
} }
group.Mu.Unlock() group.Mu.Unlock()
// Remove dead groups // Remove dead groups
if group.State == GroupStateDead { if group.State == GroupStateDead {
delete(gc.groups, groupID) delete(gc.groups, groupID)
@ -273,26 +273,26 @@ func (gc *GroupCoordinator) Close() {
func (gc *GroupCoordinator) GetGroupStats() map[string]interface{} { func (gc *GroupCoordinator) GetGroupStats() map[string]interface{} {
gc.groupsMu.RLock() gc.groupsMu.RLock()
defer gc.groupsMu.RUnlock() defer gc.groupsMu.RUnlock()
stats := map[string]interface{}{ stats := map[string]interface{}{
"total_groups": len(gc.groups), "total_groups": len(gc.groups),
"group_states": make(map[string]int), "group_states": make(map[string]int),
} }
stateCount := make(map[GroupState]int) stateCount := make(map[GroupState]int)
totalMembers := 0 totalMembers := 0
for _, group := range gc.groups { for _, group := range gc.groups {
group.Mu.RLock() group.Mu.RLock()
stateCount[group.State]++ stateCount[group.State]++
totalMembers += len(group.Members) totalMembers += len(group.Members)
group.Mu.RUnlock() group.Mu.RUnlock()
} }
stats["total_members"] = totalMembers stats["total_members"] = totalMembers
for state, count := range stateCount { for state, count := range stateCount {
stats["group_states"].(map[string]int)[state.String()] = count stats["group_states"].(map[string]int)[state.String()] = count
} }
return stats return stats
} }

10
weed/mq/kafka/protocol/handler.go

@ -8,7 +8,7 @@ import (
"net" "net"
"sync" "sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
@ -32,14 +32,14 @@ type Handler struct {
// Legacy in-memory mode (for backward compatibility and tests) // Legacy in-memory mode (for backward compatibility and tests)
topicsMu sync.RWMutex topicsMu sync.RWMutex
topics map[string]*TopicInfo // topic name -> topic info topics map[string]*TopicInfo // topic name -> topic info
ledgersMu sync.RWMutex ledgersMu sync.RWMutex
ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger
// SeaweedMQ integration (optional, for production use) // SeaweedMQ integration (optional, for production use)
seaweedMQHandler *integration.SeaweedMQHandler seaweedMQHandler *integration.SeaweedMQHandler
useSeaweedMQ bool useSeaweedMQ bool
// Consumer group coordination // Consumer group coordination
groupCoordinator *consumer.GroupCoordinator groupCoordinator *consumer.GroupCoordinator
} }
@ -76,7 +76,7 @@ func (h *Handler) Close() error {
if h.groupCoordinator != nil { if h.groupCoordinator != nil {
h.groupCoordinator.Close() h.groupCoordinator.Close()
} }
// Close SeaweedMQ handler if present // Close SeaweedMQ handler if present
if h.useSeaweedMQ && h.seaweedMQHandler != nil { if h.useSeaweedMQ && h.seaweedMQHandler != nil {
return h.seaweedMQHandler.Close() return h.seaweedMQHandler.Close()

Loading…
Cancel
Save