diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index 6335c4174..3c1b5e410 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -46,7 +46,6 @@ func (h *MaintenanceHandlers) ShowTaskDetail(c *gin.Context) { return } - c.Header("Content-Type", "text/html") taskDetailComponent := app.TaskDetail(taskDetail) layoutComponent := layout.Layout(c, taskDetailComponent) diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index 20f1ea97d..6ac28685e 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -306,25 +306,21 @@ func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *Maintenance return false // Fallback to existing logic for unknown types } - // Convert task objects taskObject := s.convertTaskToTaskSystem(task) if taskObject == nil { return false } - runningTaskObjects := s.convertTasksToTaskSystem(runningTasks) workerObjects := s.convertWorkersToTaskSystem(availableWorkers) - // Get the appropriate scheduler scheduler := s.taskRegistry.GetScheduler(taskType) if scheduler == nil { return false } - canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects) return canSchedule diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go index d9e4c6b04..6a74311dc 100644 --- a/weed/command/autocomplete.go +++ b/weed/command/autocomplete.go @@ -2,11 +2,11 @@ package command import ( "fmt" - "os" - "path/filepath" "github.com/posener/complete" completeinstall "github.com/posener/complete/cmd/install" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" + "os" + "path/filepath" "runtime" ) @@ -53,14 +53,14 @@ func printAutocompleteScript(shell string) bool { return false } - switch shell { - case "bash": - fmt.Printf("complete -C %q weed\n", binPath) - case "zsh": - fmt.Printf("autoload -U +X bashcompinit && bashcompinit\n") - fmt.Printf("complete -o nospace -C %q weed\n", binPath) - case "fish": - fmt.Printf(`function __complete_weed + switch shell { + case "bash": + fmt.Printf("complete -C %q weed\n", binPath) + case "zsh": + fmt.Printf("autoload -U +X bashcompinit && bashcompinit\n") + fmt.Printf("complete -o nospace -C %q weed\n", binPath) + case "fish": + fmt.Printf(`function __complete_weed set -lx COMP_LINE (commandline -cp) test -z (commandline -ct) and set COMP_LINE "$COMP_LINE " @@ -68,10 +68,10 @@ func printAutocompleteScript(shell string) bool { end complete -f -c weed -a "(__complete_weed)" `, binPath) - default: - fmt.Fprintf(os.Stderr, "unsupported shell: %s. Supported shells: bash, zsh, fish\n", shell) - return false - } + default: + fmt.Fprintf(os.Stderr, "unsupported shell: %s. Supported shells: bash, zsh, fish\n", shell) + return false + } return true } diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index f20e1a065..51a74c6a9 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -272,7 +272,6 @@ subscribeLoop: TsNs: logEntry.TsNs, } - if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: dataMsg, }}); err != nil { diff --git a/weed/mq/kafka/consumer/incremental_rebalancing_test.go b/weed/mq/kafka/consumer/incremental_rebalancing_test.go index 1352b2da0..64f0ba085 100644 --- a/weed/mq/kafka/consumer/incremental_rebalancing_test.go +++ b/weed/mq/kafka/consumer/incremental_rebalancing_test.go @@ -103,15 +103,15 @@ func TestIncrementalCooperativeAssignmentStrategy_RebalanceWithRevocation(t *tes t.Errorf("Expected member-2 to have 0 partitions during revocation, got %d", len(member2Assignments)) } - t.Logf("Revocation phase - Member-1: %d partitions, Member-2: %d partitions", + t.Logf("Revocation phase - Member-1: %d partitions, Member-2: %d partitions", len(member1Assignments), len(member2Assignments)) // Simulate time passing and second call (should move to assignment phase) time.Sleep(10 * time.Millisecond) - + // Force move to assignment phase by setting timeout to 0 state.RevocationTimeout = 0 - + assignments2 := strategy.Assign(members, topicPartitions) // Should complete rebalance @@ -136,7 +136,7 @@ func TestIncrementalCooperativeAssignmentStrategy_RebalanceWithRevocation(t *tes t.Errorf("Expected 4 total partitions after rebalance, got %d", totalFinalPartitions) } - t.Logf("Final assignment - Member-1: %d partitions, Member-2: %d partitions", + t.Logf("Final assignment - Member-1: %d partitions, Member-2: %d partitions", len(member1FinalAssignments), len(member2FinalAssignments)) } @@ -239,7 +239,7 @@ func TestIncrementalCooperativeAssignmentStrategy_MultipleTopics(t *testing.T) { t.Errorf("Expected partition %s to be assigned", expected) } } - + // Debug: Print all assigned partitions t.Logf("All assigned partitions: %v", allAssignedPartitions) } @@ -390,7 +390,7 @@ func TestIncrementalCooperativeAssignmentStrategy_StateTransitions(t *testing.T) // Force timeout to move to assignment phase state.RevocationTimeout = 0 strategy.Assign(members, topicPartitions) - + // Should complete and return to None state = strategy.GetRebalanceState() if state.Phase != RebalancePhaseNone { diff --git a/weed/mq/kafka/consumer/rebalance_timeout.go b/weed/mq/kafka/consumer/rebalance_timeout.go index 9844723c0..f4f65f37b 100644 --- a/weed/mq/kafka/consumer/rebalance_timeout.go +++ b/weed/mq/kafka/consumer/rebalance_timeout.go @@ -24,12 +24,12 @@ func (rtm *RebalanceTimeoutManager) CheckRebalanceTimeouts() { for _, group := range rtm.coordinator.groups { group.Mu.Lock() - + // Only check timeouts for groups in rebalancing states if group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance { rtm.checkGroupRebalanceTimeout(group, now) } - + group.Mu.Unlock() } } @@ -37,7 +37,7 @@ func (rtm *RebalanceTimeoutManager) CheckRebalanceTimeouts() { // checkGroupRebalanceTimeout checks and handles rebalance timeout for a specific group func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGroup, now time.Time) { expiredMembers := make([]string, 0) - + for memberID, member := range group.Members { // Check if member has exceeded its rebalance timeout rebalanceTimeout := time.Duration(member.RebalanceTimeout) * time.Millisecond @@ -45,21 +45,21 @@ func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGr // Use default rebalance timeout if not specified rebalanceTimeout = time.Duration(rtm.coordinator.rebalanceTimeoutMs) * time.Millisecond } - + // For members in pending state during rebalance, check against join time if member.State == MemberStatePending { if now.Sub(member.JoinedAt) > rebalanceTimeout { expiredMembers = append(expiredMembers, memberID) } } - + // Also check session timeout as a fallback sessionTimeout := time.Duration(member.SessionTimeout) * time.Millisecond if now.Sub(member.LastHeartbeat) > sessionTimeout { expiredMembers = append(expiredMembers, memberID) } } - + // Remove expired members and trigger rebalance if necessary if len(expiredMembers) > 0 { rtm.evictExpiredMembers(group, expiredMembers) @@ -70,13 +70,13 @@ func (rtm *RebalanceTimeoutManager) checkGroupRebalanceTimeout(group *ConsumerGr func (rtm *RebalanceTimeoutManager) evictExpiredMembers(group *ConsumerGroup, expiredMembers []string) { for _, memberID := range expiredMembers { delete(group.Members, memberID) - + // If the leader was evicted, clear leader if group.Leader == memberID { group.Leader = "" } } - + // Update group state based on remaining members if len(group.Members) == 0 { group.State = GroupStateEmpty @@ -92,18 +92,18 @@ func (rtm *RebalanceTimeoutManager) evictExpiredMembers(group *ConsumerGroup, ex break } } - + // Reset to preparing rebalance to restart the process group.State = GroupStatePreparingRebalance group.Generation++ - + // Mark remaining members as pending for _, member := range group.Members { member.State = MemberStatePending } } } - + group.LastActivity = time.Now() } @@ -112,7 +112,7 @@ func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRe if group.State != GroupStatePreparingRebalance && group.State != GroupStateCompletingRebalance { return false } - + return time.Since(group.LastActivity) > maxRebalanceDuration } @@ -120,14 +120,14 @@ func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRe func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) { group.Mu.Lock() defer group.Mu.Unlock() - + // If stuck in preparing rebalance, move to completing if group.State == GroupStatePreparingRebalance { group.State = GroupStateCompletingRebalance group.LastActivity = time.Now() return } - + // If stuck in completing rebalance, force to stable if group.State == GroupStateCompletingRebalance { group.State = GroupStateStable @@ -145,21 +145,21 @@ func (rtm *RebalanceTimeoutManager) GetRebalanceStatus(groupID string) *Rebalanc if group == nil { return nil } - + group.Mu.RLock() defer group.Mu.RUnlock() - + status := &RebalanceStatus{ - GroupID: groupID, - State: group.State, - Generation: group.Generation, - MemberCount: len(group.Members), - Leader: group.Leader, - LastActivity: group.LastActivity, - IsRebalancing: group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance, + GroupID: groupID, + State: group.State, + Generation: group.Generation, + MemberCount: len(group.Members), + Leader: group.Leader, + LastActivity: group.LastActivity, + IsRebalancing: group.State == GroupStatePreparingRebalance || group.State == GroupStateCompletingRebalance, RebalanceDuration: time.Since(group.LastActivity), } - + // Calculate member timeout status now := time.Now() for memberID, member := range group.Members { @@ -171,48 +171,48 @@ func (rtm *RebalanceTimeoutManager) GetRebalanceStatus(groupID string) *Rebalanc SessionTimeout: time.Duration(member.SessionTimeout) * time.Millisecond, RebalanceTimeout: time.Duration(member.RebalanceTimeout) * time.Millisecond, } - + // Calculate time until session timeout sessionTimeRemaining := memberStatus.SessionTimeout - now.Sub(member.LastHeartbeat) if sessionTimeRemaining < 0 { sessionTimeRemaining = 0 } memberStatus.SessionTimeRemaining = sessionTimeRemaining - + // Calculate time until rebalance timeout rebalanceTimeRemaining := memberStatus.RebalanceTimeout - now.Sub(member.JoinedAt) if rebalanceTimeRemaining < 0 { rebalanceTimeRemaining = 0 } memberStatus.RebalanceTimeRemaining = rebalanceTimeRemaining - + status.Members = append(status.Members, memberStatus) } - + return status } // RebalanceStatus represents the current status of a group's rebalance type RebalanceStatus struct { - GroupID string `json:"group_id"` - State GroupState `json:"state"` - Generation int32 `json:"generation"` - MemberCount int `json:"member_count"` - Leader string `json:"leader"` - LastActivity time.Time `json:"last_activity"` - IsRebalancing bool `json:"is_rebalancing"` - RebalanceDuration time.Duration `json:"rebalance_duration"` - Members []MemberTimeoutStatus `json:"members"` + GroupID string `json:"group_id"` + State GroupState `json:"state"` + Generation int32 `json:"generation"` + MemberCount int `json:"member_count"` + Leader string `json:"leader"` + LastActivity time.Time `json:"last_activity"` + IsRebalancing bool `json:"is_rebalancing"` + RebalanceDuration time.Duration `json:"rebalance_duration"` + Members []MemberTimeoutStatus `json:"members"` } // MemberTimeoutStatus represents timeout status for a group member type MemberTimeoutStatus struct { - MemberID string `json:"member_id"` - State MemberState `json:"state"` - LastHeartbeat time.Time `json:"last_heartbeat"` - JoinedAt time.Time `json:"joined_at"` - SessionTimeout time.Duration `json:"session_timeout"` - RebalanceTimeout time.Duration `json:"rebalance_timeout"` - SessionTimeRemaining time.Duration `json:"session_time_remaining"` - RebalanceTimeRemaining time.Duration `json:"rebalance_time_remaining"` + MemberID string `json:"member_id"` + State MemberState `json:"state"` + LastHeartbeat time.Time `json:"last_heartbeat"` + JoinedAt time.Time `json:"joined_at"` + SessionTimeout time.Duration `json:"session_timeout"` + RebalanceTimeout time.Duration `json:"rebalance_timeout"` + SessionTimeRemaining time.Duration `json:"session_time_remaining"` + RebalanceTimeRemaining time.Duration `json:"rebalance_time_remaining"` } diff --git a/weed/mq/kafka/consumer/rebalance_timeout_test.go b/weed/mq/kafka/consumer/rebalance_timeout_test.go index ac5f90aee..61dbf3fc5 100644 --- a/weed/mq/kafka/consumer/rebalance_timeout_test.go +++ b/weed/mq/kafka/consumer/rebalance_timeout_test.go @@ -8,14 +8,14 @@ import ( func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Create a group with a member that has a short rebalance timeout group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance - + member := &GroupMember{ ID: "member1", ClientID: "client1", @@ -27,15 +27,15 @@ func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) { } group.Members["member1"] = member group.Mu.Unlock() - + // Check timeouts - member should be evicted rtm.CheckRebalanceTimeouts() - + group.Mu.RLock() if len(group.Members) != 0 { t.Errorf("Expected member to be evicted due to rebalance timeout, but %d members remain", len(group.Members)) } - + if group.State != GroupStateEmpty { t.Errorf("Expected group state to be Empty after member eviction, got %s", group.State.String()) } @@ -45,18 +45,18 @@ func TestRebalanceTimeoutManager_CheckRebalanceTimeouts(t *testing.T) { func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Create a group with a member that has exceeded session timeout group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance - + member := &GroupMember{ ID: "member1", ClientID: "client1", - SessionTimeout: 1000, // 1 second + SessionTimeout: 1000, // 1 second RebalanceTimeout: 30000, // 30 seconds State: MemberStatePending, LastHeartbeat: time.Now().Add(-2 * time.Second), // Last heartbeat 2 seconds ago @@ -64,10 +64,10 @@ func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) { } group.Members["member1"] = member group.Mu.Unlock() - + // Check timeouts - member should be evicted due to session timeout rtm.CheckRebalanceTimeouts() - + group.Mu.RLock() if len(group.Members) != 0 { t.Errorf("Expected member to be evicted due to session timeout, but %d members remain", len(group.Members)) @@ -78,15 +78,15 @@ func TestRebalanceTimeoutManager_SessionTimeoutFallback(t *testing.T) { func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Create a group with leader and another member group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance group.Leader = "member1" - + // Leader with expired rebalance timeout leader := &GroupMember{ ID: "member1", @@ -98,7 +98,7 @@ func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) { JoinedAt: time.Now().Add(-2 * time.Second), } group.Members["member1"] = leader - + // Another member that's still valid member2 := &GroupMember{ ID: "member2", @@ -111,19 +111,19 @@ func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) { } group.Members["member2"] = member2 group.Mu.Unlock() - + // Check timeouts - leader should be evicted, new leader selected rtm.CheckRebalanceTimeouts() - + group.Mu.RLock() if len(group.Members) != 1 { t.Errorf("Expected 1 member to remain after leader eviction, got %d", len(group.Members)) } - + if group.Leader != "member2" { t.Errorf("Expected member2 to become new leader, got %s", group.Leader) } - + if group.State != GroupStatePreparingRebalance { t.Errorf("Expected group to restart rebalancing after leader eviction, got %s", group.State.String()) } @@ -133,37 +133,37 @@ func TestRebalanceTimeoutManager_LeaderEviction(t *testing.T) { func TestRebalanceTimeoutManager_IsRebalanceStuck(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Create a group that's been rebalancing for a while group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance group.LastActivity = time.Now().Add(-15 * time.Minute) // 15 minutes ago group.Mu.Unlock() - + // Check if rebalance is stuck (max 10 minutes) maxDuration := 10 * time.Minute if !rtm.IsRebalanceStuck(group, maxDuration) { t.Error("Expected rebalance to be detected as stuck") } - + // Test with a group that's not stuck group.Mu.Lock() group.LastActivity = time.Now().Add(-5 * time.Minute) // 5 minutes ago group.Mu.Unlock() - + if rtm.IsRebalanceStuck(group, maxDuration) { t.Error("Expected rebalance to not be detected as stuck") } - + // Test with stable group (should not be stuck) group.Mu.Lock() group.State = GroupStateStable group.LastActivity = time.Now().Add(-15 * time.Minute) group.Mu.Unlock() - + if rtm.IsRebalanceStuck(group, maxDuration) { t.Error("Stable group should not be detected as stuck") } @@ -172,37 +172,37 @@ func TestRebalanceTimeoutManager_IsRebalanceStuck(t *testing.T) { func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Test forcing completion from PreparingRebalance group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance - + member := &GroupMember{ ID: "member1", State: MemberStatePending, } group.Members["member1"] = member group.Mu.Unlock() - + rtm.ForceCompleteRebalance(group) - + group.Mu.RLock() if group.State != GroupStateCompletingRebalance { t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String()) } group.Mu.RUnlock() - + // Test forcing completion from CompletingRebalance rtm.ForceCompleteRebalance(group) - + group.Mu.RLock() if group.State != GroupStateStable { t.Errorf("Expected group state to be Stable, got %s", group.State.String()) } - + if member.State != MemberStateStable { t.Errorf("Expected member state to be Stable, got %s", member.State.String()) } @@ -212,15 +212,15 @@ func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) { func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Test with non-existent group status := rtm.GetRebalanceStatus("non-existent") if status != nil { t.Error("Expected nil status for non-existent group") } - + // Create a group with members group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() @@ -228,7 +228,7 @@ func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { group.Generation = 5 group.Leader = "member1" group.LastActivity = time.Now().Add(-2 * time.Minute) - + member1 := &GroupMember{ ID: "member1", State: MemberStatePending, @@ -238,7 +238,7 @@ func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { RebalanceTimeout: 300000, // 5 minutes } group.Members["member1"] = member1 - + member2 := &GroupMember{ ID: "member2", State: MemberStatePending, @@ -249,48 +249,48 @@ func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { } group.Members["member2"] = member2 group.Mu.Unlock() - + // Get status status = rtm.GetRebalanceStatus("test-group") - + if status == nil { t.Fatal("Expected non-nil status") } - + if status.GroupID != "test-group" { t.Errorf("Expected group ID 'test-group', got %s", status.GroupID) } - + if status.State != GroupStatePreparingRebalance { t.Errorf("Expected state PreparingRebalance, got %s", status.State.String()) } - + if status.Generation != 5 { t.Errorf("Expected generation 5, got %d", status.Generation) } - + if status.MemberCount != 2 { t.Errorf("Expected 2 members, got %d", status.MemberCount) } - + if status.Leader != "member1" { t.Errorf("Expected leader 'member1', got %s", status.Leader) } - + if !status.IsRebalancing { t.Error("Expected IsRebalancing to be true") } - + if len(status.Members) != 2 { t.Errorf("Expected 2 member statuses, got %d", len(status.Members)) } - + // Check member timeout calculations for _, memberStatus := range status.Members { if memberStatus.SessionTimeRemaining < 0 { t.Errorf("Session time remaining should not be negative for member %s", memberStatus.MemberID) } - + if memberStatus.RebalanceTimeRemaining < 0 { t.Errorf("Rebalance time remaining should not be negative for member %s", memberStatus.MemberID) } @@ -300,14 +300,14 @@ func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { func TestRebalanceTimeoutManager_DefaultRebalanceTimeout(t *testing.T) { coordinator := NewGroupCoordinator() defer coordinator.Close() - + rtm := coordinator.rebalanceTimeoutManager - + // Create a group with a member that has no rebalance timeout set (0) group := coordinator.GetOrCreateGroup("test-group") group.Mu.Lock() group.State = GroupStatePreparingRebalance - + member := &GroupMember{ ID: "member1", ClientID: "client1", @@ -319,10 +319,10 @@ func TestRebalanceTimeoutManager_DefaultRebalanceTimeout(t *testing.T) { } group.Members["member1"] = member group.Mu.Unlock() - + // Default rebalance timeout is 5 minutes (300000ms), so member should be evicted rtm.CheckRebalanceTimeouts() - + group.Mu.RLock() if len(group.Members) != 0 { t.Errorf("Expected member to be evicted using default rebalance timeout, but %d members remain", len(group.Members)) diff --git a/weed/mq/kafka/consumer_offset/memory_storage.go b/weed/mq/kafka/consumer_offset/memory_storage.go index 8814107bb..6e5c95782 100644 --- a/weed/mq/kafka/consumer_offset/memory_storage.go +++ b/weed/mq/kafka/consumer_offset/memory_storage.go @@ -142,4 +142,3 @@ func (m *MemoryStorage) Close() error { return nil } - diff --git a/weed/mq/kafka/consumer_offset/memory_storage_test.go b/weed/mq/kafka/consumer_offset/memory_storage_test.go index eaf849dc5..22720267b 100644 --- a/weed/mq/kafka/consumer_offset/memory_storage_test.go +++ b/weed/mq/kafka/consumer_offset/memory_storage_test.go @@ -206,4 +206,3 @@ func TestMemoryStorageOverwrite(t *testing.T) { assert.Equal(t, int64(20), offset) assert.Equal(t, "meta2", metadata) } - diff --git a/weed/mq/kafka/consumer_offset/storage.go b/weed/mq/kafka/consumer_offset/storage.go index d3f999faa..ad191b936 100644 --- a/weed/mq/kafka/consumer_offset/storage.go +++ b/weed/mq/kafka/consumer_offset/storage.go @@ -56,4 +56,3 @@ var ( ErrInvalidPartition = fmt.Errorf("invalid partition") ErrStorageClosed = fmt.Errorf("storage is closed") ) - diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index c01aac970..ef0a012ef 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -121,7 +121,6 @@ func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName stri offset := m.offsets[topicName][partitionID] m.offsets[topicName][partitionID]++ - // Store record record := &mockRecord{ key: key, diff --git a/weed/mq/kafka/package.go b/weed/mq/kafka/package.go index 01743a12b..1cb5dc8ed 100644 --- a/weed/mq/kafka/package.go +++ b/weed/mq/kafka/package.go @@ -9,5 +9,3 @@ package kafka // - offset/: Offset management // - schema/: Schema registry integration // - consumer/: Consumer group coordination - - diff --git a/weed/mq/kafka/partition_mapping.go b/weed/mq/kafka/partition_mapping.go index 697e67386..a956c3cde 100644 --- a/weed/mq/kafka/partition_mapping.go +++ b/weed/mq/kafka/partition_mapping.go @@ -51,5 +51,3 @@ func GetRangeSize() int32 { func GetMaxKafkaPartitions() int32 { return int32(pub_balancer.MaxPartitionCount) / 35 // 72 partitions } - - diff --git a/weed/mq/kafka/protocol/describe_cluster.go b/weed/mq/kafka/protocol/describe_cluster.go index af622de3c..5d963e45b 100644 --- a/weed/mq/kafka/protocol/describe_cluster.go +++ b/weed/mq/kafka/protocol/describe_cluster.go @@ -37,7 +37,6 @@ func (h *Handler) handleDescribeCluster(correlationID uint32, apiVersion uint16, // Tagged fields at end of request // (We don't parse them, just skip) - // Build response response := make([]byte, 0, 256) @@ -109,6 +108,5 @@ func (h *Handler) handleDescribeCluster(correlationID uint32, apiVersion uint16, // Response-level tagged fields (flexible response) response = append(response, 0x00) // Empty tagged fields - return response, nil } diff --git a/weed/mq/kafka/protocol/flexible_versions.go b/weed/mq/kafka/protocol/flexible_versions.go index ddb55e74f..77d1510ae 100644 --- a/weed/mq/kafka/protocol/flexible_versions.go +++ b/weed/mq/kafka/protocol/flexible_versions.go @@ -268,7 +268,6 @@ func parseCompactString(data []byte) ([]byte, int) { return nil, 0 } - if actualLength == 0 { // Empty string (length was 1) return []byte{}, consumed diff --git a/weed/mq/kafka/protocol/group_introspection.go b/weed/mq/kafka/protocol/group_introspection.go index 0ff3ed4b5..959a015a1 100644 --- a/weed/mq/kafka/protocol/group_introspection.go +++ b/weed/mq/kafka/protocol/group_introspection.go @@ -107,13 +107,13 @@ func (h *Handler) describeGroup(groupID string) DescribeGroupsGroup { } return DescribeGroupsGroup{ - ErrorCode: 0, - GroupID: groupID, - State: stateStr, - ProtocolType: "consumer", // Default protocol type - Protocol: group.Protocol, - Members: members, - AuthorizedOps: []int32{}, // Empty for now + ErrorCode: 0, + GroupID: groupID, + State: stateStr, + ProtocolType: "consumer", // Default protocol type + Protocol: group.Protocol, + Members: members, + AuthorizedOps: []int32{}, // Empty for now } } @@ -175,8 +175,8 @@ func (h *Handler) listAllGroups(statesFilter []string) []ListGroupsGroup { // Request/Response structures type DescribeGroupsRequest struct { - GroupIDs []string - IncludeAuthorizedOps bool + GroupIDs []string + IncludeAuthorizedOps bool } type DescribeGroupsResponse struct { diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 2768793d2..8dffd2313 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -661,7 +661,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return } // Removed V(4) logging from hot path - only log errors and important events - + // Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer var response []byte @@ -881,7 +881,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return fmt.Errorf("read message: %w", err) } - // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { return fmt.Errorf("message too short") @@ -890,7 +889,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - + // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) @@ -1050,7 +1049,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { requestStart := time.Now() apiName := getAPIName(APIKey(req.apiKey)) - // Only log high-volume requests at V(2), not V(4) if glog.V(2) { glog.V(2).Infof("[API] %s (key=%d, ver=%d, corr=%d)", @@ -1589,15 +1587,15 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } } @@ -1716,15 +1714,15 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } } @@ -1737,7 +1735,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( } if len(response) > 100 { } - + return response, nil } @@ -1828,7 +1826,6 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // NOTE: Correlation ID is handled by writeResponseWithCorrelationID // Do NOT include it in the response body - // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling @@ -1896,7 +1893,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // LeaderEpoch (4 bytes) - v7+ addition if apiVersion >= 7 { @@ -1905,11 +1902,11 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas @@ -1930,7 +1927,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, } if len(response) > 100 { } - + return response, nil } @@ -1994,12 +1991,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 - maxBytes := len(requestBody) if maxBytes > 64 { maxBytes = 64 } - + // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { @@ -3930,12 +3926,11 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) // v4+: Uses flexible format with tagged fields - maxBytes := len(requestBody) if maxBytes > 64 { maxBytes = 64 } - + offset := 0 // Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions) diff --git a/weed/mq/kafka/protocol/offset_storage_adapter.go b/weed/mq/kafka/protocol/offset_storage_adapter.go index 079c5b621..0481b4c42 100644 --- a/weed/mq/kafka/protocol/offset_storage_adapter.go +++ b/weed/mq/kafka/protocol/offset_storage_adapter.go @@ -47,4 +47,3 @@ func (a *offsetStorageAdapter) DeleteGroup(group string) error { func (a *offsetStorageAdapter) Close() error { return a.storage.Close() } - diff --git a/weed/mq/kafka/protocol/response_validation_example_test.go b/weed/mq/kafka/protocol/response_validation_example_test.go index 9476bb791..a69c03f4f 100644 --- a/weed/mq/kafka/protocol/response_validation_example_test.go +++ b/weed/mq/kafka/protocol/response_validation_example_test.go @@ -140,4 +140,3 @@ func TestMetadataResponseHasBrokers(t *testing.T) { t.Logf("✓ Metadata response correctly has %d broker(s)", parsedCount) } - diff --git a/weed/mq/kafka/schema/envelope_test.go b/weed/mq/kafka/schema/envelope_test.go index 4a209779e..24f16ee44 100644 --- a/weed/mq/kafka/schema/envelope_test.go +++ b/weed/mq/kafka/schema/envelope_test.go @@ -7,46 +7,46 @@ import ( func TestParseConfluentEnvelope(t *testing.T) { tests := []struct { - name string - input []byte - expectOK bool - expectID uint32 + name string + input []byte + expectOK bool + expectID uint32 expectFormat Format }{ { - name: "valid Avro message", - input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x10, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, // schema ID 1 + "Hello" - expectOK: true, - expectID: 1, + name: "valid Avro message", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x10, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, // schema ID 1 + "Hello" + expectOK: true, + expectID: 1, expectFormat: FormatAvro, }, { - name: "valid message with larger schema ID", - input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f}, // schema ID 1234 + "foo" - expectOK: true, - expectID: 1234, + name: "valid message with larger schema ID", + input: []byte{0x00, 0x00, 0x00, 0x04, 0xd2, 0x02, 0x66, 0x6f, 0x6f}, // schema ID 1234 + "foo" + expectOK: true, + expectID: 1234, expectFormat: FormatAvro, }, { - name: "too short message", - input: []byte{0x00, 0x00, 0x00}, - expectOK: false, + name: "too short message", + input: []byte{0x00, 0x00, 0x00}, + expectOK: false, }, { - name: "no magic byte", - input: []byte{0x01, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, - expectOK: false, + name: "no magic byte", + input: []byte{0x01, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, + expectOK: false, }, { - name: "empty message", - input: []byte{}, - expectOK: false, + name: "empty message", + input: []byte{}, + expectOK: false, }, { - name: "minimal valid message", - input: []byte{0x00, 0x00, 0x00, 0x00, 0x01}, // schema ID 1, empty payload - expectOK: true, - expectID: 1, + name: "minimal valid message", + input: []byte{0x00, 0x00, 0x00, 0x00, 0x01}, // schema ID 1, empty payload + expectOK: true, + expectID: 1, expectFormat: FormatAvro, }, } @@ -54,24 +54,24 @@ func TestParseConfluentEnvelope(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { envelope, ok := ParseConfluentEnvelope(tt.input) - + if ok != tt.expectOK { t.Errorf("ParseConfluentEnvelope() ok = %v, want %v", ok, tt.expectOK) return } - + if !tt.expectOK { return // No need to check further if we expected failure } - + if envelope.SchemaID != tt.expectID { t.Errorf("ParseConfluentEnvelope() schemaID = %v, want %v", envelope.SchemaID, tt.expectID) } - + if envelope.Format != tt.expectFormat { t.Errorf("ParseConfluentEnvelope() format = %v, want %v", envelope.Format, tt.expectFormat) } - + // Verify payload extraction expectedPayloadLen := len(tt.input) - 5 // 5 bytes for magic + schema ID if len(envelope.Payload) != expectedPayloadLen { @@ -150,11 +150,11 @@ func TestExtractSchemaID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { id, ok := ExtractSchemaID(tt.input) - + if ok != tt.expectOK { t.Errorf("ExtractSchemaID() ok = %v, want %v", ok, tt.expectOK) } - + if id != tt.expectID { t.Errorf("ExtractSchemaID() id = %v, want %v", id, tt.expectID) } @@ -200,12 +200,12 @@ func TestCreateConfluentEnvelope(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := CreateConfluentEnvelope(tt.format, tt.schemaID, tt.indexes, tt.payload) - + if len(result) != len(tt.expected) { t.Errorf("CreateConfluentEnvelope() length = %v, want %v", len(result), len(tt.expected)) return } - + for i, b := range result { if b != tt.expected[i] { t.Errorf("CreateConfluentEnvelope() byte[%d] = %v, want %v", i, b, tt.expected[i]) @@ -262,7 +262,7 @@ func TestEnvelopeValidate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { err := tt.envelope.Validate() - + if (err != nil) != tt.expectErr { t.Errorf("Envelope.Validate() error = %v, expectErr %v", err, tt.expectErr) } @@ -297,7 +297,7 @@ func TestEnvelopeMetadata(t *testing.T) { func BenchmarkParseConfluentEnvelope(b *testing.B) { // Create a test message testMsg := make([]byte, 1024) - testMsg[0] = 0x00 // Magic byte + testMsg[0] = 0x00 // Magic byte binary.BigEndian.PutUint32(testMsg[1:5], 123) // Schema ID // Fill rest with dummy data for i := 5; i < len(testMsg); i++ { diff --git a/weed/mq/kafka/schema/envelope_varint_test.go b/weed/mq/kafka/schema/envelope_varint_test.go index 92004c3d6..8bc51d7a0 100644 --- a/weed/mq/kafka/schema/envelope_varint_test.go +++ b/weed/mq/kafka/schema/envelope_varint_test.go @@ -100,7 +100,7 @@ func TestCreateConfluentEnvelopeWithProtobufIndexes(t *testing.T) { parsed, ok := ParseConfluentEnvelope(envelope) require.True(t, ok, "Should be able to parse envelope") assert.Equal(t, tc.schemaID, parsed.SchemaID) - + if tc.format == FormatProtobuf && len(tc.indexes) == 0 { // For Protobuf without indexes, payload should match assert.Equal(t, tc.payload, parsed.Payload, "Payload should match") diff --git a/weed/mq/metadata_constants.go b/weed/mq/metadata_constants.go index 18ba98a31..31f86c910 100644 --- a/weed/mq/metadata_constants.go +++ b/weed/mq/metadata_constants.go @@ -17,5 +17,3 @@ const ( // Source file tracking for parquet deduplication ExtendedAttrSources = "sources" // JSON-encoded list of source log files ) - - diff --git a/weed/mq/offset/migration.go b/weed/mq/offset/migration.go index 106129206..4e0a6ab12 100644 --- a/weed/mq/offset/migration.go +++ b/weed/mq/offset/migration.go @@ -118,17 +118,17 @@ func (m *MigrationManager) GetCurrentVersion() (int, error) { if err != nil { return 0, fmt.Errorf("failed to create migrations table: %w", err) } - + var version sql.NullInt64 err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version) if err != nil { return 0, fmt.Errorf("failed to get current version: %w", err) } - + if !version.Valid { return 0, nil // No migrations applied yet } - + return int(version.Int64), nil } @@ -138,29 +138,29 @@ func (m *MigrationManager) ApplyMigrations() error { if err != nil { return fmt.Errorf("failed to get current version: %w", err) } - + migrations := GetMigrations() - + for _, migration := range migrations { if migration.Version <= currentVersion { continue // Already applied } - + fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description) - + // Begin transaction tx, err := m.db.Begin() if err != nil { return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err) } - + // Execute migration SQL _, err = tx.Exec(migration.SQL) if err != nil { tx.Rollback() return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err) } - + // Record migration as applied _, err = tx.Exec( "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)", @@ -172,16 +172,16 @@ func (m *MigrationManager) ApplyMigrations() error { tx.Rollback() return fmt.Errorf("failed to record migration %d: %w", migration.Version, err) } - + // Commit transaction err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err) } - + fmt.Printf("Successfully applied migration %d\n", migration.Version) } - + return nil } @@ -203,7 +203,7 @@ func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) { return nil, fmt.Errorf("failed to query applied migrations: %w", err) } defer rows.Close() - + var migrations []AppliedMigration for rows.Next() { var migration AppliedMigration @@ -213,7 +213,7 @@ func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) { } migrations = append(migrations, migration) } - + return migrations, nil } @@ -223,17 +223,17 @@ func (m *MigrationManager) ValidateSchema() error { if err != nil { return fmt.Errorf("failed to get current version: %w", err) } - + migrations := GetMigrations() if len(migrations) == 0 { return nil } - + latestVersion := migrations[len(migrations)-1].Version if currentVersion < latestVersion { return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion) } - + return nil } @@ -253,21 +253,21 @@ func getCurrentTimestamp() int64 { func CreateDatabase(dbPath string) (*sql.DB, error) { // TODO: Support different database types (PostgreSQL, MySQL, etc.) // ASSUMPTION: Using SQLite for now, can be extended for other databases - + db, err := sql.Open("sqlite3", dbPath) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } - + // Configure SQLite for better performance pragmas := []string{ - "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency - "PRAGMA synchronous=NORMAL", // Balance between safety and performance - "PRAGMA cache_size=10000", // Increase cache size - "PRAGMA foreign_keys=ON", // Enable foreign key constraints - "PRAGMA temp_store=MEMORY", // Store temporary tables in memory + "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency + "PRAGMA synchronous=NORMAL", // Balance between safety and performance + "PRAGMA cache_size=10000", // Increase cache size + "PRAGMA foreign_keys=ON", // Enable foreign key constraints + "PRAGMA temp_store=MEMORY", // Store temporary tables in memory } - + for _, pragma := range pragmas { _, err := db.Exec(pragma) if err != nil { @@ -275,7 +275,7 @@ func CreateDatabase(dbPath string) (*sql.DB, error) { return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) } } - + // Apply migrations migrationManager := NewMigrationManager(db) err = migrationManager.ApplyMigrations() @@ -283,7 +283,7 @@ func CreateDatabase(dbPath string) (*sql.DB, error) { db.Close() return nil, fmt.Errorf("failed to apply migrations: %w", err) } - + return db, nil } diff --git a/weed/mq/schema/flat_schema_utils_test.go b/weed/mq/schema/flat_schema_utils_test.go index 2bce9014c..779d3705f 100644 --- a/weed/mq/schema/flat_schema_utils_test.go +++ b/weed/mq/schema/flat_schema_utils_test.go @@ -248,11 +248,11 @@ func TestValidateKeyColumns(t *testing.T) { // Helper function to check if string contains substring func contains(str, substr string) bool { - return len(str) >= len(substr) && - (len(substr) == 0 || str[len(str)-len(substr):] == substr || - str[:len(substr)] == substr || - len(str) > len(substr) && (str[len(str)-len(substr)-1:len(str)-len(substr)] == " " || str[len(str)-len(substr)-1] == ' ') && str[len(str)-len(substr):] == substr || - findInString(str, substr)) + return len(str) >= len(substr) && + (len(substr) == 0 || str[len(str)-len(substr):] == substr || + str[:len(substr)] == substr || + len(str) > len(substr) && (str[len(str)-len(substr)-1:len(str)-len(substr)] == " " || str[len(str)-len(substr)-1] == ' ') && str[len(str)-len(substr):] == substr || + findInString(str, substr)) } func findInString(str, substr string) bool { diff --git a/weed/pb/mq_agent_pb/publish_response_test.go b/weed/pb/mq_agent_pb/publish_response_test.go index 1f2e767e4..0c7b0ee3a 100644 --- a/weed/pb/mq_agent_pb/publish_response_test.go +++ b/weed/pb/mq_agent_pb/publish_response_test.go @@ -1,8 +1,8 @@ package mq_agent_pb import ( - "testing" "google.golang.org/protobuf/proto" + "testing" ) func TestPublishRecordResponseSerialization(t *testing.T) { diff --git a/weed/pb/schema_pb/offset_test.go b/weed/pb/schema_pb/offset_test.go index 28324836e..273d2d5d1 100644 --- a/weed/pb/schema_pb/offset_test.go +++ b/weed/pb/schema_pb/offset_test.go @@ -1,8 +1,8 @@ package schema_pb import ( - "testing" "google.golang.org/protobuf/proto" + "testing" ) func TestOffsetTypeEnums(t *testing.T) { @@ -34,8 +34,8 @@ func TestPartitionOffsetSerialization(t *testing.T) { RangeStop: 31, UnixTimeNs: 1234567890, }, - StartTsNs: 1234567890, - StartOffset: 42, // New field + StartTsNs: 1234567890, + StartOffset: 42, // New field } // Test proto marshaling/unmarshaling diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go index acb7dbd17..f57a4c6df 100644 --- a/weed/remote_storage/azure/azure_storage_client_test.go +++ b/weed/remote_storage/azure/azure_storage_client_test.go @@ -229,22 +229,22 @@ func TestToMetadata(t *testing.T) { s3_constants.AmzUserMetaPrefix + "789": []byte("value3"), }, expected: map[string]*string{ - "_123key": stringPtr("value1"), // starts with digit -> prefix _ - "_456_2d_test": stringPtr("value2"), // starts with digit AND has dash - "_789": stringPtr("value3"), + "_123key": stringPtr("value1"), // starts with digit -> prefix _ + "_456_2d_test": stringPtr("value2"), // starts with digit AND has dash + "_789": stringPtr("value3"), }, }, { name: "uppercase and mixed case keys", input: map[string][]byte{ - s3_constants.AmzUserMetaPrefix + "My-Key": []byte("value1"), - s3_constants.AmzUserMetaPrefix + "UPPERCASE": []byte("value2"), - s3_constants.AmzUserMetaPrefix + "MiXeD-CaSe": []byte("value3"), + s3_constants.AmzUserMetaPrefix + "My-Key": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "UPPERCASE": []byte("value2"), + s3_constants.AmzUserMetaPrefix + "MiXeD-CaSe": []byte("value3"), }, expected: map[string]*string{ - "my_2d_key": stringPtr("value1"), // lowercase + dash -> _2d_ - "uppercase": stringPtr("value2"), - "mixed_2d_case": stringPtr("value3"), + "my_2d_key": stringPtr("value1"), // lowercase + dash -> _2d_ + "uppercase": stringPtr("value2"), + "mixed_2d_case": stringPtr("value3"), }, }, { diff --git a/weed/s3api/s3_sse_s3_integration_test.go b/weed/s3api/s3_sse_s3_integration_test.go index 8232aea7f..4e0d91a5c 100644 --- a/weed/s3api/s3_sse_s3_integration_test.go +++ b/weed/s3api/s3_sse_s3_integration_test.go @@ -78,7 +78,7 @@ func TestSSES3EndToEndSmallFile(t *testing.T) { // Step 3: Decrypt (simulates what happens during GET) // This tests the IV retrieval path for inline files - + // First, deserialize metadata from storage retrievedKeyData := mockEntry.Extended[s3_constants.SeaweedFSSSES3Key] retrievedKey, err := DeserializeSSES3Metadata(retrievedKeyData, keyManager) diff --git a/weed/s3api/s3_validation_utils.go b/weed/s3api/s3_validation_utils.go index e0e80d0a8..f69fc9c26 100644 --- a/weed/s3api/s3_validation_utils.go +++ b/weed/s3api/s3_validation_utils.go @@ -71,7 +71,7 @@ func ValidateSSES3Key(sseKey *SSES3Key) error { if sseKey == nil { return fmt.Errorf("SSE-S3 key cannot be nil") } - + // Validate key bytes if sseKey.Key == nil { return fmt.Errorf("SSE-S3 key bytes cannot be nil") @@ -79,22 +79,22 @@ func ValidateSSES3Key(sseKey *SSES3Key) error { if len(sseKey.Key) != SSES3KeySize { return fmt.Errorf("invalid SSE-S3 key size: expected %d bytes, got %d", SSES3KeySize, len(sseKey.Key)) } - + // Validate algorithm if sseKey.Algorithm != SSES3Algorithm { return fmt.Errorf("invalid SSE-S3 algorithm: expected %q, got %q", SSES3Algorithm, sseKey.Algorithm) } - + // Validate key ID (should not be empty) if sseKey.KeyID == "" { return fmt.Errorf("SSE-S3 key ID cannot be empty") } - + // IV validation is optional during key creation - it will be set during encryption // If IV is set, validate its length if len(sseKey.IV) > 0 && len(sseKey.IV) != s3_constants.AESBlockSize { return fmt.Errorf("invalid SSE-S3 IV length: expected %d bytes, got %d", s3_constants.AESBlockSize, len(sseKey.IV)) } - + return nil } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 3e323163e..5cff1bc4b 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -74,12 +74,12 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection ev.Version = needle.Version(volumeInfo.Version) ev.datFileSize = volumeInfo.DatFileSize ev.ExpireAtSec = volumeInfo.ExpireAtSec - + // Initialize EC context from .vif if present; fallback to defaults if volumeInfo.EcShardConfig != nil { ds := int(volumeInfo.EcShardConfig.DataShards) ps := int(volumeInfo.EcShardConfig.ParityShards) - + // Validate shard counts to prevent zero or invalid values if ds <= 0 || ps <= 0 || ds+ps > MaxShardCount { glog.Warningf("Invalid EC config in VolumeInfo for volume %d (data=%d, parity=%d), using defaults", vid, ds, ps) diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go index 63d344b1a..5e4d4fab7 100644 --- a/weed/util/log_buffer/log_buffer_flush_gap_test.go +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -15,10 +15,11 @@ import ( // are lost in the gap between flushed disk data and in-memory buffer. // // OBSERVED BEHAVIOR FROM LOGS: -// Request offset: 1764 -// Disk contains: 1000-1763 (764 messages) -// Memory buffer starts at: 1800 -// Gap: 1764-1799 (36 messages) ← MISSING! +// +// Request offset: 1764 +// Disk contains: 1000-1763 (764 messages) +// Memory buffer starts at: 1800 +// Gap: 1764-1799 (36 messages) ← MISSING! // // This test verifies: // 1. All messages sent to buffer are accounted for @@ -27,46 +28,46 @@ import ( func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { var flushedMessages []*filer_pb.LogEntry var flushMu sync.Mutex - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) - + // Parse and store flushed messages flushMu.Lock() defer flushMu.Unlock() - + // Parse buffer to extract messages parsedCount := 0 for pos := 0; pos+4 < len(buf); { if pos+4 > len(buf) { break } - + size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) if pos+4+int(size) > len(buf) { break } - + entryData := buf[pos+4 : pos+4+int(size)] logEntry := &filer_pb.LogEntry{} if err := proto.Unmarshal(entryData, logEntry); err == nil { flushedMessages = append(flushedMessages, logEntry) parsedCount++ } - + pos += 4 + int(size) } - + t.Logf(" Parsed %d messages from flush buffer", parsedCount) } - + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) defer logBuffer.ShutdownLogBuffer() - + // Send 100 messages messageCount := 100 t.Logf("Sending %d messages...", messageCount) - + for i := 0; i < messageCount; i++ { logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), @@ -74,11 +75,11 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { TsNs: time.Now().UnixNano(), }) } - + // Force flush multiple times to simulate real workload t.Logf("Forcing flush...") logBuffer.ForceFlush() - + // Add more messages after flush for i := messageCount; i < messageCount+50; i++ { logBuffer.AddToBuffer(&mq_pb.DataMessage{ @@ -87,18 +88,18 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { TsNs: time.Now().UnixNano(), }) } - + // Force another flush logBuffer.ForceFlush() time.Sleep(200 * time.Millisecond) // Wait for flush to complete - + // Now check the buffer state logBuffer.RLock() bufferStartOffset := logBuffer.bufferStartOffset currentOffset := logBuffer.offset pos := logBuffer.pos logBuffer.RUnlock() - + flushMu.Lock() flushedCount := len(flushedMessages) var maxFlushedOffset int64 = -1 @@ -108,23 +109,23 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { maxFlushedOffset = flushedMessages[flushedCount-1].Offset } flushMu.Unlock() - + t.Logf("\nBUFFER STATE AFTER FLUSH:") t.Logf(" bufferStartOffset: %d", bufferStartOffset) t.Logf(" currentOffset (HWM): %d", currentOffset) t.Logf(" pos (bytes in buffer): %d", pos) t.Logf(" Messages sent: %d (offsets 0-%d)", messageCount+50, messageCount+49) t.Logf(" Messages flushed to disk: %d (offsets %d-%d)", flushedCount, minFlushedOffset, maxFlushedOffset) - + // CRITICAL CHECK: Is there a gap between flushed data and memory buffer? if flushedCount > 0 && maxFlushedOffset >= 0 { gap := bufferStartOffset - (maxFlushedOffset + 1) - + t.Logf("\nOFFSET CONTINUITY CHECK:") t.Logf(" Last flushed offset: %d", maxFlushedOffset) t.Logf(" Buffer starts at: %d", bufferStartOffset) t.Logf(" Gap: %d offsets", gap) - + if gap > 0 { t.Errorf("❌ CRITICAL BUG REPRODUCED: OFFSET GAP DETECTED!") t.Errorf(" Disk has offsets %d-%d", minFlushedOffset, maxFlushedOffset) @@ -137,22 +138,22 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { } else { t.Logf("✅ PASS: No gap detected - offsets are continuous") } - + // Check if we can read all expected offsets t.Logf("\nREADABILITY CHECK:") for testOffset := int64(0); testOffset < currentOffset; testOffset += 10 { // Try to read from buffer requestPosition := NewMessagePositionFromOffset(testOffset) buf, _, err := logBuffer.ReadFromBuffer(requestPosition) - + isReadable := (buf != nil && len(buf.Bytes()) > 0) || err == ResumeFromDiskError status := "✅" if !isReadable && err == nil { status = "❌ NOT READABLE" } - + t.Logf(" Offset %d: %s (buf=%v, err=%v)", testOffset, status, buf != nil, err) - + // If offset is in the gap, it should fail to read if flushedCount > 0 && testOffset > maxFlushedOffset && testOffset < bufferStartOffset { if isReadable { @@ -163,19 +164,19 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { } } } - + // Check that all sent messages are accounted for expectedMessageCount := messageCount + 50 messagesInMemory := int(currentOffset - bufferStartOffset) totalAccountedFor := flushedCount + messagesInMemory - + t.Logf("\nMESSAGE ACCOUNTING:") t.Logf(" Expected: %d messages", expectedMessageCount) t.Logf(" Flushed to disk: %d", flushedCount) t.Logf(" In memory buffer: %d (offset range %d-%d)", messagesInMemory, bufferStartOffset, currentOffset-1) t.Logf(" Total accounted for: %d", totalAccountedFor) t.Logf(" Missing: %d messages", expectedMessageCount-totalAccountedFor) - + if totalAccountedFor < expectedMessageCount { t.Errorf("❌ DATA LOSS CONFIRMED: %d messages are missing!", expectedMessageCount-totalAccountedFor) } else { @@ -188,23 +189,23 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { var flushCount int var flushMu sync.Mutex - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { flushMu.Lock() flushCount++ count := flushCount flushMu.Unlock() - + t.Logf("FLUSH #%d: minOffset=%d maxOffset=%d size=%d bytes", count, minOffset, maxOffset, len(buf)) } - + logBuffer := NewLogBuffer("test", 100*time.Millisecond, flushFn, nil, nil) defer logBuffer.ShutdownLogBuffer() - + // Send messages in batches with flushes in between for batch := 0; batch < 5; batch++ { t.Logf("\nBatch %d:", batch) - + // Send 20 messages for i := 0; i < 20; i++ { offset := int64(batch*20 + i) @@ -214,28 +215,28 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { TsNs: time.Now().UnixNano(), }) } - + // Check state before flush logBuffer.RLock() beforeFlushOffset := logBuffer.offset beforeFlushStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + // Force flush logBuffer.ForceFlush() time.Sleep(50 * time.Millisecond) - + // Check state after flush logBuffer.RLock() afterFlushOffset := logBuffer.offset afterFlushStart := logBuffer.bufferStartOffset prevBufferCount := len(logBuffer.prevBuffers.buffers) - + // Check prevBuffers state t.Logf(" Before flush: offset=%d, bufferStartOffset=%d", beforeFlushOffset, beforeFlushStart) t.Logf(" After flush: offset=%d, bufferStartOffset=%d, prevBuffers=%d", afterFlushOffset, afterFlushStart, prevBufferCount) - + // Check each prevBuffer for i, prevBuf := range logBuffer.prevBuffers.buffers { if prevBuf.size > 0 { @@ -244,7 +245,7 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { } } logBuffer.RUnlock() - + // CRITICAL: Check if bufferStartOffset advanced correctly expectedNewStart := beforeFlushOffset if afterFlushStart != expectedNewStart { @@ -261,10 +262,10 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { var allFlushedOffsets []int64 var flushMu sync.Mutex - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { t.Logf("FLUSH: offsets %d-%d (%d bytes)", minOffset, maxOffset, len(buf)) - + flushMu.Lock() // Record the offset range that was flushed for offset := minOffset; offset <= maxOffset; offset++ { @@ -272,13 +273,13 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { } flushMu.Unlock() } - + logBuffer := NewLogBuffer("test", 50*time.Millisecond, flushFn, nil, nil) defer logBuffer.ShutdownLogBuffer() - + // Concurrently write messages and force flushes var wg sync.WaitGroup - + // Writer goroutine wg.Add(1) go func() { @@ -294,7 +295,7 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { } } }() - + // Flusher goroutine wg.Add(1) go func() { @@ -304,31 +305,31 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { logBuffer.ForceFlush() } }() - + wg.Wait() time.Sleep(200 * time.Millisecond) // Wait for final flush - + // Check final state logBuffer.RLock() finalOffset := logBuffer.offset finalBufferStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + flushMu.Lock() flushedCount := len(allFlushedOffsets) flushMu.Unlock() - + expectedCount := int(finalOffset) inMemory := int(finalOffset - finalBufferStart) totalAccountedFor := flushedCount + inMemory - + t.Logf("\nFINAL STATE:") t.Logf(" Total messages sent: %d (offsets 0-%d)", expectedCount, expectedCount-1) t.Logf(" Flushed to disk: %d", flushedCount) t.Logf(" In memory: %d (offsets %d-%d)", inMemory, finalBufferStart, finalOffset-1) t.Logf(" Total accounted: %d", totalAccountedFor) t.Logf(" Missing: %d", expectedCount-totalAccountedFor) - + if totalAccountedFor < expectedCount { t.Errorf("❌ DATA LOSS in concurrent scenario: %d messages missing!", expectedCount-totalAccountedFor) } @@ -344,7 +345,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { messages []*filer_pb.LogEntry } var flushMu sync.Mutex - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { // Parse messages from buffer messages := []*filer_pb.LogEntry{} @@ -360,7 +361,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { } pos += 4 + int(size) } - + flushMu.Lock() flushedData = append(flushedData, struct { minOffset int64 @@ -368,17 +369,17 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { messages []*filer_pb.LogEntry }{minOffset, maxOffset, messages}) flushMu.Unlock() - + t.Logf("FLUSH: minOffset=%d maxOffset=%d, parsed %d messages", minOffset, maxOffset, len(messages)) } - + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) defer logBuffer.ShutdownLogBuffer() - + // Simulate broker behavior: assign Kafka offsets and add to buffer // This is what PublishWithOffset() does nextKafkaOffset := int64(0) - + // Round 1: Add 50 messages with Kafka offsets 0-49 t.Logf("\n=== ROUND 1: Adding messages 0-49 ===") for i := 0; i < 50; i++ { @@ -391,7 +392,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { logBuffer.AddLogEntryToBuffer(logEntry) nextKafkaOffset++ } - + // Check buffer state before flush logBuffer.RLock() beforeFlushOffset := logBuffer.offset @@ -399,11 +400,11 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { logBuffer.RUnlock() t.Logf("Before flush: logBuffer.offset=%d, bufferStartOffset=%d, nextKafkaOffset=%d", beforeFlushOffset, beforeFlushStart, nextKafkaOffset) - + // Flush logBuffer.ForceFlush() time.Sleep(100 * time.Millisecond) - + // Check buffer state after flush logBuffer.RLock() afterFlushOffset := logBuffer.offset @@ -411,7 +412,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { logBuffer.RUnlock() t.Logf("After flush: logBuffer.offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) - + // Round 2: Add another 50 messages with Kafka offsets 50-99 t.Logf("\n=== ROUND 2: Adding messages 50-99 ===") for i := 0; i < 50; i++ { @@ -424,20 +425,20 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { logBuffer.AddLogEntryToBuffer(logEntry) nextKafkaOffset++ } - + logBuffer.ForceFlush() time.Sleep(100 * time.Millisecond) - + // Verification: Check if all Kafka offsets are accounted for flushMu.Lock() t.Logf("\n=== VERIFICATION ===") t.Logf("Expected Kafka offsets: 0-%d", nextKafkaOffset-1) - + allOffsets := make(map[int64]bool) for flushIdx, flush := range flushedData { t.Logf("Flush #%d: minOffset=%d, maxOffset=%d, messages=%d", flushIdx, flush.minOffset, flush.maxOffset, len(flush.messages)) - + for _, msg := range flush.messages { if allOffsets[msg.Offset] { t.Errorf(" ❌ DUPLICATE: Offset %d appears multiple times!", msg.Offset) @@ -446,7 +447,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { } } flushMu.Unlock() - + // Check for missing offsets missingOffsets := []int64{} for expectedOffset := int64(0); expectedOffset < nextKafkaOffset; expectedOffset++ { @@ -454,7 +455,7 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { missingOffsets = append(missingOffsets, expectedOffset) } } - + if len(missingOffsets) > 0 { t.Errorf("\n❌ MISSING OFFSETS DETECTED: %d offsets missing", len(missingOffsets)) if len(missingOffsets) <= 20 { @@ -466,18 +467,18 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { } else { t.Logf("\n✅ SUCCESS: All %d Kafka offsets accounted for (0-%d)", nextKafkaOffset, nextKafkaOffset-1) } - + // Check buffer offset consistency logBuffer.RLock() finalOffset := logBuffer.offset finalBufferStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + t.Logf("\nFinal buffer state:") t.Logf(" logBuffer.offset: %d", finalOffset) t.Logf(" bufferStartOffset: %d", finalBufferStart) t.Logf(" Expected (nextKafkaOffset): %d", nextKafkaOffset) - + if finalOffset != nextKafkaOffset { t.Errorf("❌ logBuffer.offset mismatch: expected %d, got %d", nextKafkaOffset, finalOffset) } @@ -488,12 +489,12 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { var flushedOffsets []int64 var flushMu sync.Mutex - + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { // Simulate reading from disk - return flushed offsets flushMu.Lock() defer flushMu.Unlock() - + for _, offset := range flushedOffsets { if offset >= startPosition.Offset { logEntry := &filer_pb.LogEntry{ @@ -510,12 +511,12 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { } return startPosition, false, nil } - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { // Parse and store flushed offsets flushMu.Lock() defer flushMu.Unlock() - + for pos := 0; pos+4 < len(buf); { size := uint32(buf[pos])<<24 | uint32(buf[pos+1])<<16 | uint32(buf[pos+2])<<8 | uint32(buf[pos+3]) if pos+4+int(size) > len(buf) { @@ -528,14 +529,14 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { } pos += 4 + int(size) } - + t.Logf("FLUSH: Stored %d offsets to disk (minOffset=%d, maxOffset=%d)", len(flushedOffsets), minOffset, maxOffset) } - + logBuffer := NewLogBuffer("test", time.Hour, flushFn, readFromDiskFn, nil) defer logBuffer.ShutdownLogBuffer() - + // Add 100 messages t.Logf("Adding 100 messages...") for i := int64(0); i < 100; i++ { @@ -547,32 +548,32 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { } logBuffer.AddLogEntryToBuffer(logEntry) } - + // Flush (moves data to disk) t.Logf("Flushing...") logBuffer.ForceFlush() time.Sleep(100 * time.Millisecond) - + // Now try to read all messages using ReadMessagesAtOffset t.Logf("\nReading messages from offset 0...") messages, nextOffset, hwm, endOfPartition, err := logBuffer.ReadMessagesAtOffset(0, 1000, 1024*1024) - + t.Logf("Read result: messages=%d, nextOffset=%d, hwm=%d, endOfPartition=%v, err=%v", len(messages), nextOffset, hwm, endOfPartition, err) - + // Verify all offsets can be read readOffsets := make(map[int64]bool) for _, msg := range messages { readOffsets[msg.Offset] = true } - + missingOffsets := []int64{} for expectedOffset := int64(0); expectedOffset < 100; expectedOffset++ { if !readOffsets[expectedOffset] { missingOffsets = append(missingOffsets, expectedOffset) } } - + if len(missingOffsets) > 0 { t.Errorf("❌ MISSING OFFSETS after flush: %d offsets cannot be read", len(missingOffsets)) if len(missingOffsets) <= 20 { @@ -590,29 +591,29 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { flushedRanges := []struct{ min, max int64 }{} var flushMu sync.Mutex - + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { flushMu.Lock() flushedRanges = append(flushedRanges, struct{ min, max int64 }{minOffset, maxOffset}) flushMu.Unlock() t.Logf("FLUSH: offsets %d-%d", minOffset, maxOffset) } - + logBuffer := NewLogBuffer("test", time.Hour, flushFn, nil, nil) // Long interval, manual flush only defer logBuffer.ShutdownLogBuffer() - + // Send messages, flush, check state - repeat for round := 0; round < 3; round++ { t.Logf("\n=== ROUND %d ===", round) - + // Check state before adding messages logBuffer.RLock() beforeOffset := logBuffer.offset beforeStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + t.Logf("Before adding: offset=%d, bufferStartOffset=%d", beforeOffset, beforeStart) - + // Add 10 messages for i := 0; i < 10; i++ { logBuffer.AddToBuffer(&mq_pb.DataMessage{ @@ -621,28 +622,28 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { TsNs: time.Now().UnixNano(), }) } - + // Check state after adding logBuffer.RLock() afterAddOffset := logBuffer.offset afterAddStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + t.Logf("After adding: offset=%d, bufferStartOffset=%d", afterAddOffset, afterAddStart) - + // Force flush t.Logf("Forcing flush...") logBuffer.ForceFlush() time.Sleep(100 * time.Millisecond) - + // Check state after flush logBuffer.RLock() afterFlushOffset := logBuffer.offset afterFlushStart := logBuffer.bufferStartOffset logBuffer.RUnlock() - + t.Logf("After flush: offset=%d, bufferStartOffset=%d", afterFlushOffset, afterFlushStart) - + // CRITICAL CHECK: bufferStartOffset should advance to where offset was before flush if afterFlushStart != afterAddOffset { t.Errorf("❌ FLUSH BUG: bufferStartOffset did NOT advance correctly!") @@ -653,19 +654,19 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { t.Logf("✅ bufferStartOffset correctly advanced to %d", afterFlushStart) } } - + // Final verification: check all offset ranges are continuous flushMu.Lock() t.Logf("\n=== FLUSHED RANGES ===") for i, r := range flushedRanges { t.Logf("Flush #%d: offsets %d-%d", i, r.min, r.max) - + // Check continuity with previous flush if i > 0 { prevMax := flushedRanges[i-1].max currentMin := r.min gap := currentMin - (prevMax + 1) - + if gap > 0 { t.Errorf("❌ GAP between flush #%d and #%d: %d offsets missing!", i-1, i, gap) } else if gap < 0 { @@ -677,4 +678,3 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { } flushMu.Unlock() } - diff --git a/weed/worker/client.go b/weed/worker/client.go index 613d69987..4485154a7 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -24,7 +24,7 @@ type GrpcAdminClient struct { workerID string dialOption grpc.DialOption - cmds chan grpcCommand + cmds chan grpcCommand // Reconnection parameters maxReconnectAttempts int