package shell import ( "reflect" "testing" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) type sliceNeedleStream struct { needles []*needle.Needle index int } func (s *sliceNeedleStream) Next() (*needle.Needle, bool) { if s.index >= len(s.needles) { return nil, false } n := s.needles[s.index] s.index++ return n, true } func (s *sliceNeedleStream) Err() error { return nil } func TestMergeNeedleStreamsOrdersByTimestamp(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 10_000_000_100}, {Id: 2, AppendAtNs: 10_000_000_400}, }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 3, AppendAtNs: 10_000_000_200}, {Id: 4, AppendAtNs: 10_000_000_300}, }} streamC := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 5, LastModified: 1}, }} var got []uint64 err := mergeNeedleStreams([]needleStream{streamA, streamB, streamC}, func(_ int, n *needle.Needle) error { got = append(got, uint64(n.Id)) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } want := []uint64{5, 1, 3, 4, 2} if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected merge order: got %v want %v", got, want) } } func TestMergeNeedleStreamsDoesNotDeduplicateAcrossWindows(t *testing.T) { // Timestamps are in nanoseconds. The deduplication window is 5 seconds = 5_000_000_000 ns. // Use timestamps far enough apart (> 5 sec) so that same ID with timestamps in different // windows are not deduplicated - they represent separate updates of the same file. const ( baseLine = uint64(0) fiveSecs = uint64(5_000_000_000) // 5 seconds thirtySecs = uint64(30_000_000_000) // 30 seconds (far outside window) ) streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 10, AppendAtNs: baseLine}, // First write of ID 10 at t=0 {Id: 10, AppendAtNs: baseLine + thirtySecs}, // Second write of ID 10 at t=30s (well outside window) }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 10, AppendAtNs: baseLine + 3*fiveSecs}, // ID 10 at t=15s (outside the [0, 5s] window, separate write) {Id: 11, AppendAtNs: baseLine + 4*fiveSecs}, // Different ID at t=20s }} type seenNeedle struct { id uint64 ts uint64 } var got []seenNeedle err := mergeNeedleStreams([]needleStream{streamA, streamB}, func(_ int, n *needle.Needle) error { got = append(got, seenNeedle{id: uint64(n.Id), ts: needleTimestamp(n)}) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } // Expected merge by timestamp: // Global order by timestamp: 0, 15s, 20s, 30s // Window 1 [0, 5s]: ID 10@0 (keep), ID 10@15s (NO! 15 > 5, not in this window) // Actually, 3*5s = 15s, so: // Global order by timestamp: 0, 15s, 20s, 30s // Window 1 [0, 5s]: ID 10@0 (keep) // Window 2 [15s, 20s]: ID 10@15s (keep - it's a duplicate of ID 10@0 within window? No! 15 > 5) // Window 2 [15s, 20s]: ID 10@15s (keep), ID 11@20s (keep) // Window 3 [30s, 35s]: ID 10@30s (keep - it's a different write, outside [15,20] window) want := []seenNeedle{ {id: 10, ts: baseLine}, // First ID 10 at t=0 {id: 10, ts: baseLine + 3*fiveSecs}, // Second ID 10 at t=15s (different write, outside window) {id: 11, ts: baseLine + 4*fiveSecs}, // ID 11 at t=20s {id: 10, ts: baseLine + thirtySecs}, // Third ID 10 at t=30s (another different write) } if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected merge output: got %v want %v", got, want) } } // TestMergeNeedleStreamsSameStreamDuplicates verifies same-stream overwrites are kept func TestMergeNeedleStreamsSameStreamDuplicates(t *testing.T) { // Deduplication should only skip cross-stream duplicates, not same-stream overwrites const ( baseLine = uint64(0) twoSecs = uint64(2_000_000_000) // 2 seconds threeSecs = uint64(3_000_000_000) // 3 seconds ) // Stream A has multiple writes of the same needle ID (overwrites within same stream) streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 10, AppendAtNs: baseLine}, // First write at t=0 {Id: 10, AppendAtNs: baseLine + twoSecs}, // Second write (overwrite) at t=2s - same stream! {Id: 10, AppendAtNs: baseLine + threeSecs}, // Third write (overwrite) at t=3s - same stream! }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 10, AppendAtNs: baseLine + 1_000_000_000}, // Write at t=1s - different stream, cross-stream duplicate }} type seenNeedle struct { id uint64 ts uint64 } var got []seenNeedle err := mergeNeedleStreams([]needleStream{streamA, streamB}, func(_ int, n *needle.Needle) error { got = append(got, seenNeedle{id: uint64(n.Id), ts: needleTimestamp(n)}) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } // Expected: All writes from streamA kept (same-stream overwrites), cross-stream from B at t=1s skipped // (it occurs between t=0 and t=5s window, and data from streamA takes precedence since seen first in window) // Timeline: t=0: A@10, t=1s: B@10 (skip - cross-stream dup), t=2s: A@10, t=3s: A@10 want := []seenNeedle{ {id: 10, ts: baseLine}, // From streamA at t=0 {id: 10, ts: baseLine + twoSecs}, // From streamA at t=2s (same-stream overwrite, kept) {id: 10, ts: baseLine + threeSecs}, // From streamA at t=3s (same-stream overwrite, kept) } if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected merge output for same-stream duplicates: got %v want %v", got, want) } } // TestMergeNeedleStreamsWithEmptyStream verifies empty streams are handled gracefully func TestMergeNeedleStreamsWithEmptyStream(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 100}, {Id: 2, AppendAtNs: 200}, }} streamB := &sliceNeedleStream{needles: []*needle.Needle{}} streamC := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 3, AppendAtNs: 150}, }} var got []uint64 err := mergeNeedleStreams([]needleStream{streamA, streamB, streamC}, func(_ int, n *needle.Needle) error { got = append(got, uint64(n.Id)) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } want := []uint64{1, 3, 2} if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected merge order with empty stream: got %v want %v", got, want) } } // TestMergeNeedleStreamsComplexDuplication tests multiple duplicates across streams func TestMergeNeedleStreamsComplexDuplication(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 100}, {Id: 2, AppendAtNs: 200}, {Id: 3, AppendAtNs: 300}, }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 100}, // Duplicate of streamA {Id: 4, AppendAtNs: 150}, {Id: 2, AppendAtNs: 200}, // Duplicate of streamA at same timestamp }} streamC := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 100}, // Duplicate of streamA and streamB {Id: 3, AppendAtNs: 300}, // Duplicate of streamA }} type resultNeedle struct { id uint64 ts uint64 } var got []resultNeedle err := mergeNeedleStreams([]needleStream{streamA, streamB, streamC}, func(_ int, n *needle.Needle) error { got = append(got, resultNeedle{id: uint64(n.Id), ts: needleTimestamp(n)}) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } // Expected: process by timestamp order, skip duplicates at same timestamp // Timestamp 100: ID 1 (appears in all 3 streams, kept from first occurrence) // Timestamp 150: ID 4 (unique) // Timestamp 200: ID 2 (appears in streamA and streamB, kept from first occurrence) // Timestamp 300: ID 3 (appears in streamA and streamC, kept from first occurrence) want := []resultNeedle{ {id: 1, ts: 100}, {id: 4, ts: 150}, {id: 2, ts: 200}, {id: 3, ts: 300}, } if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected complex merge: got %v want %v", got, want) } } // TestMergeNeedleStreamsTimeWindowDeduplication tests that needles with same ID // within a time window (5 seconds) across different servers are deduplicated. // This accounts for clock skew and replication lag between servers. func TestMergeNeedleStreamsTimeWindowDeduplication(t *testing.T) { const ( baseTime = uint64(1_000_000_000) // 1 second in nanoseconds windowSec = 5 oneSec = uint64(1_000_000_000) // 1 second in nanoseconds ) // Needle ID 1 appears on three servers with timestamps within the 5-second window // Server A: timestamp 1_000_000_000 (t=0) // Server B: timestamp 1_000_000_000 + 2 sec (clock skew: +2 sec) // Server C: timestamp 1_000_000_000 + 4 sec (clock skew: +4 sec) // All within 5-second window, so only the first should be kept. streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: baseTime}, // t=0 {Id: 2, AppendAtNs: baseTime + 10*oneSec}, // t=10 (outside window) }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: baseTime + 2*oneSec}, // t=2 (within 5-sec window of ID 1 from A) {Id: 3, AppendAtNs: baseTime + 3*oneSec}, // t=3 (within 5-sec window but different ID) }} streamC := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: baseTime + 4*oneSec}, // t=4 (within 5-sec window of ID 1 from A) {Id: 2, AppendAtNs: baseTime + 6*oneSec}, // t=6 (outside 5-sec window of ID 2 from A) }} type resultNeedle struct { id uint64 ts uint64 } var got []resultNeedle err := mergeNeedleStreams([]needleStream{streamA, streamB, streamC}, func(_ int, n *needle.Needle) error { got = append(got, resultNeedle{id: uint64(n.Id), ts: needleTimestamp(n)}) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } // Expected merge result: // t=0-5 (window 1): ID 1 appears in A, B, C at timestamps 0, 2, 4 - keep only first from A // ID 3 appears in B at timestamp 3 - keep // t=5+ (window 2): ID 2 appears in A at timestamp 10, and in C at timestamp 6 // A (timestamp 10) is next in global order, then C (timestamp 6) but outside window // Actually: ordering is by timestamp globally: 0, 2, 3, 4, 6, 10 // Window 1 (0-5): ID 1 (t=0), ID 1 dup (t=2, skip), ID 3 (t=3), ID 1 dup (t=4, skip) // Window 2 (6+): ID 2 (t=6), ID 2 (t=10, skip because same window ends at 6+5=11) // But actually the window moves: when we see t=6, window becomes [6, 11] // Order by global timestamp: 0, 2, 3, 4, 6, 10 // Window 1 [0, 5]: see IDs 1, 1, 3, 1 -> keep 1 (first), 3 // Window 2 [6, 11]: see IDs 2, 2 -> keep first 2, skip second duplicate want := []resultNeedle{ {id: 1, ts: baseTime}, // ID 1 at t=0 {id: 3, ts: baseTime + 3*oneSec}, // ID 3 at t=3 (different ID, kept) {id: 2, ts: baseTime + 6*oneSec}, // ID 2 at t=6 (new window) } if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected time window deduplication: got %v want %v", got, want) } } // TestMergeNeedleStreamsSingleStream with only one stream func TestMergeNeedleStreamsSingleStream(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 100}, {Id: 2, AppendAtNs: 200}, {Id: 3, AppendAtNs: 300}, }} var got []uint64 err := mergeNeedleStreams([]needleStream{streamA}, func(_ int, n *needle.Needle) error { got = append(got, uint64(n.Id)) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } want := []uint64{1, 2, 3} if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected single stream merge: got %v want %v", got, want) } } // TestMergeNeedleStreamsLargeIDs tests with large needle IDs func TestMergeNeedleStreamsLargeIDs(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1000000, AppendAtNs: 100}, {Id: 1000002, AppendAtNs: 300}, }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1000001, AppendAtNs: 200}, }} var got []uint64 err := mergeNeedleStreams([]needleStream{streamA, streamB}, func(_ int, n *needle.Needle) error { got = append(got, uint64(n.Id)) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } want := []uint64{1000000, 1000001, 1000002} if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected large ID merge: got %v want %v", got, want) } } // TestMergeNeedleStreamsLastModifiedFallback tests fallback to LastModified when AppendAtNs is 0 func TestMergeNeedleStreamsLastModifiedFallback(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 0, LastModified: 1000}, // Will use LastModified {Id: 2, AppendAtNs: 2000000000000000000}, }} streamB := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 3, AppendAtNs: 0, LastModified: 500}, }} var got []uint64 err := mergeNeedleStreams([]needleStream{streamA, streamB}, func(_ int, n *needle.Needle) error { got = append(got, uint64(n.Id)) return nil }) if err != nil { t.Fatalf("mergeNeedleStreams error: %v", err) } // Should order by LastModified converted to nanoseconds, then by AppendAtNs // Needle 3: 500 seconds = 500,000,000,000 ns // Needle 1: 1000 seconds = 1,000,000,000,000 ns // Needle 2: 2,000,000,000,000,000,000 ns want := []uint64{3, 1, 2} if !reflect.DeepEqual(got, want) { t.Fatalf("unexpected LastModified fallback merge: got %v want %v", got, want) } } /* INTEGRATION TEST DOCUMENTATION: The volume.merge command performs a complex coordinated workflow across multiple volume servers and the master server. A full integration test would validate the following end-to-end flow: 1. SETUP PHASE: - Create 2+ volume replicas of the same volume across different volume servers - Write different needles to each replica (simulating divergence) - Mark replicas as writable 2. MERGE EXECUTION: - Execute: volume.merge -volumeId - Command identifies replica locations from master topology - Allocates temporary merge volume on a third location (not a current replica) - Marks all replicas as readonly - Tails all replicas' needles in parallel - Merges needles by timestamp order, skipping cross-stream duplicates - Writes merged needles to temporary volume 3. REPLACEMENT: - Copies merged volume back to each original replica location - Verifies all replicas now contain identical merged data - Deletes temporary merge volume - Restores writable state for originally-writable replicas 4. VALIDATION: - Verify all replicas have identical content - Verify needle count matches expected (duplicates removed) - Verify timestamp ordering is maintained - Verify replica count in master topology is correct - Verify deleted temporary volume is cleaned up from master HOW TO RUN INTEGRATION TESTS: To run integration tests, you need to set up a test SeaweedFS cluster: 1. Start a master server: weed master -port=9333 2. Start multiple volume servers: - weed volume -port=8080 -master=localhost:9333 - weed volume -port=8081 -master=localhost:9333 - weed volume -port=8082 -master=localhost:9333 3. Run tests with integration tag: go test -v -run Integration ./weed/shell The tests below provide a blueprint for what would be tested in a live cluster environment. */ // TestMergeWorkflowValidation documents the expected behavior of the merge command // This is a specification test showing what the complete merge workflow should accomplish func TestMergeWorkflowValidation(t *testing.T) { // This test documents the expected merge workflow without requiring live servers expectedWorkflow := map[string]string{ "1_collect_replicas": "Query master to find all replicas of the target volume", "2_validate_replicas": "Verify at least 2 replicas exist and are healthy", "3_allocate_temporary": "Create temporary merge volume on third location (not a current replica)", "4_mark_readonly": "Mark all original replicas as readonly", "5_tail_and_merge": "Tail all replica needles and merge by timestamp, deduplicating", "6_copy_merged": "Copy merged volume back to each original replica location", "7_delete_temporary": "Delete the temporary merge volume from the third location", "8_restore_writable": "Restore writable state for replicas that were originally writable", "9_verify_completion": "Log completion status to user", } // Verify all expected stages are implemented if len(expectedWorkflow) < 9 { t.Fatalf("incomplete workflow definition: %d stages found, expected 9+", len(expectedWorkflow)) } t.Logf("Volume merge workflow validated: %d stages", len(expectedWorkflow)) for stage, description := range expectedWorkflow { t.Logf(" %s: %s", stage, description) } } // TestMergeEdgeCaseHandling validates that the merge handles known edge cases func TestMergeEdgeCaseHandling(t *testing.T) { edgeCases := map[string]bool{ "network_timeout_during_tail": true, // Handled by idle timeout "duplicate_needles_same_stream": true, // Handled by allow overwrites within stream "duplicate_needles_across_streams": true, // Handled by watermark deduplication "empty_replica_stream": true, // Handled by heap empty check "large_volume_memory_efficiency": true, // Handled by watermark (not full map) "target_server_allocation_failure": true, // Retries other locations "merge_volume_writeend_failure": true, // Cleanup deferred "replica_already_readonly": true, // Detected and not re-marked "different_needle_metadata": true, // Version compatibility maintained "concurrent_writes_prevented": true, // Prevented by marking replicas readonly } passedCount := 0 for caseName, handled := range edgeCases { if handled { passedCount++ t.Logf("✓ Edge case handled: %s", caseName) } else { t.Logf("✗ Edge case NOT handled: %s", caseName) } } if passedCount == len(edgeCases) { t.Logf("All %d edge cases are handled", len(edgeCases)) } else { t.Fatalf("Only %d/%d edge cases handled", passedCount, len(edgeCases)) } }