diff --git a/.github/workflows/s3-go-tests.yml b/.github/workflows/s3-go-tests.yml index 373be6df7..311964a9b 100644 --- a/.github/workflows/s3-go-tests.yml +++ b/.github/workflows/s3-go-tests.yml @@ -482,4 +482,59 @@ jobs: path: test/s3/tagging/weed-test*.log retention-days: 3 + s3-remote-cache-tests: + name: S3 Remote Cache Tests + runs-on: ubuntu-22.04 + timeout-minutes: 20 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Install SeaweedFS + run: | + go install -buildvcs=false + + - name: Run S3 Remote Cache Tests + timeout-minutes: 15 + working-directory: test/s3/remote_cache + run: | + set -x + echo "=== System Information ===" + uname -a + free -h + + # Run the remote cache integration tests + # Tests singleflight deduplication for caching remote objects + make test-with-server || { + echo "❌ Test failed, checking logs..." + if [ -f primary-weed.log ]; then + echo "=== Primary server logs ===" + tail -100 primary-weed.log + fi + if [ -f remote-weed.log ]; then + echo "=== Remote server logs ===" + tail -100 remote-weed.log + fi + echo "=== Process information ===" + ps aux | grep -E "(weed|test)" || true + exit 1 + } + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: s3-remote-cache-test-logs + path: | + test/s3/remote_cache/primary-weed.log + test/s3/remote_cache/remote-weed.log + retention-days: 3 + # Removed SSE-C integration tests and compatibility job \ No newline at end of file diff --git a/.gitignore b/.gitignore index 965743c94..7bca32242 100644 --- a/.gitignore +++ b/.gitignore @@ -126,3 +126,7 @@ weed-iam test/kafka/kafka-client-loadtest/weed-linux-arm64 /test/tus/filerldb2 coverage.out +/test/s3/remote_cache/test-primary-data +/test/s3/remote_cache/test-remote-data +test/s3/remote_cache/remote-server.pid +test/s3/remote_cache/primary-server.pid diff --git a/test/s3/remote_cache/Makefile b/test/s3/remote_cache/Makefile new file mode 100644 index 000000000..1b7a64539 --- /dev/null +++ b/test/s3/remote_cache/Makefile @@ -0,0 +1,210 @@ +# Remote Storage Cache Integration Tests +# Tests the remote object caching functionality with singleflight deduplication +# Uses two SeaweedFS instances: primary (with caching) and secondary (as remote storage) + +.PHONY: all help build-weed check-deps start-remote stop-remote start-primary stop-primary \ + setup-remote test test-with-server clean logs logs-primary logs-remote health + +all: test-with-server + +# Configuration +WEED_BINARY := ../../../weed/weed_binary + +# Primary SeaweedFS (the one being tested - has remote caching) +PRIMARY_S3_PORT := 8333 +PRIMARY_FILER_PORT := 8888 +PRIMARY_MASTER_PORT := 9333 +PRIMARY_VOLUME_PORT := 8080 +PRIMARY_METRICS_PORT := 9324 +PRIMARY_DIR := ./test-primary-data + +# Secondary SeaweedFS (acts as "remote" S3 storage) +REMOTE_S3_PORT := 8334 +REMOTE_FILER_PORT := 8889 +REMOTE_MASTER_PORT := 9334 +REMOTE_VOLUME_PORT := 8081 +REMOTE_METRICS_PORT := 9325 +REMOTE_DIR := ./test-remote-data + +# Test configuration +TEST_TIMEOUT := 10m +TEST_PATTERN := TestRemoteCache + +# Buckets +REMOTE_BUCKET := remotesourcebucket + +# Default target +help: + @echo "Remote Storage Cache Integration Tests" + @echo "" + @echo "Uses two SeaweedFS instances:" + @echo " - Primary (port $(PRIMARY_S3_PORT)): Being tested, has remote caching" + @echo " - Remote (port $(REMOTE_S3_PORT)): Acts as remote S3 storage" + @echo "" + @echo "Available targets:" + @echo " help - Show this help message" + @echo " build-weed - Build the SeaweedFS binary" + @echo " check-deps - Check dependencies" + @echo " start-remote - Start remote SeaweedFS (secondary)" + @echo " stop-remote - Stop remote SeaweedFS" + @echo " start-primary - Start primary SeaweedFS" + @echo " stop-primary - Stop primary SeaweedFS" + @echo " setup-remote - Configure remote storage mount" + @echo " test - Run tests (assumes servers are running)" + @echo " test-with-server - Start servers, run tests, stop servers" + @echo " clean - Clean up all resources" + @echo " logs - Show server logs" + +# Build the SeaweedFS binary +build-weed: + @echo "Building SeaweedFS binary..." + @cd ../../../weed && go build -o weed_binary . + @chmod +x $(WEED_BINARY) + @echo "SeaweedFS binary built" + +check-deps: build-weed + @echo "Checking dependencies..." + @command -v go >/dev/null 2>&1 || (echo "Go is required" && exit 1) + @test -f $(WEED_BINARY) || (echo "SeaweedFS binary not found" && exit 1) + @echo "All dependencies available" + +# Start remote SeaweedFS (acts as the "remote" S3 storage) +start-remote: check-deps + @echo "Starting remote SeaweedFS (secondary instance)..." + @rm -f remote-server.pid + @mkdir -p $(REMOTE_DIR) + @$(WEED_BINARY) server \ + -s3 \ + -s3.port=$(REMOTE_S3_PORT) \ + -s3.allowDeleteBucketNotEmpty=true \ + -filer \ + -filer.port=$(REMOTE_FILER_PORT) \ + -master.port=$(REMOTE_MASTER_PORT) \ + -volume.port=$(REMOTE_VOLUME_PORT) \ + -master.volumeSizeLimitMB=50 \ + -volume.max=100 \ + -dir=$(REMOTE_DIR) \ + -volume.preStopSeconds=1 \ + -metricsPort=$(REMOTE_METRICS_PORT) \ + > remote-weed.log 2>&1 & echo $$! > remote-server.pid + @echo "Waiting for remote SeaweedFS to start..." + @for i in $$(seq 1 60); do \ + if curl -s http://localhost:$(REMOTE_S3_PORT) >/dev/null 2>&1; then \ + echo "Remote SeaweedFS started on port $(REMOTE_S3_PORT)"; \ + exit 0; \ + fi; \ + sleep 1; \ + done; \ + echo "ERROR: Remote SeaweedFS failed to start"; \ + cat remote-weed.log; \ + exit 1 + +stop-remote: + @echo "Stopping remote SeaweedFS..." + @if [ -f remote-server.pid ]; then \ + kill -TERM $$(cat remote-server.pid) 2>/dev/null || true; \ + sleep 2; \ + kill -KILL $$(cat remote-server.pid) 2>/dev/null || true; \ + rm -f remote-server.pid; \ + fi + @echo "Remote SeaweedFS stopped" + +# Start primary SeaweedFS (the one being tested) +start-primary: check-deps + @echo "Starting primary SeaweedFS..." + @rm -f primary-server.pid + @mkdir -p $(PRIMARY_DIR) + @$(WEED_BINARY) server \ + -s3 \ + -s3.port=$(PRIMARY_S3_PORT) \ + -s3.allowDeleteBucketNotEmpty=true \ + -filer \ + -filer.port=$(PRIMARY_FILER_PORT) \ + -master.port=$(PRIMARY_MASTER_PORT) \ + -volume.port=$(PRIMARY_VOLUME_PORT) \ + -master.volumeSizeLimitMB=50 \ + -volume.max=100 \ + -dir=$(PRIMARY_DIR) \ + -volume.preStopSeconds=1 \ + -metricsPort=$(PRIMARY_METRICS_PORT) \ + > primary-weed.log 2>&1 & echo $$! > primary-server.pid + @echo "Waiting for primary SeaweedFS to start..." + @for i in $$(seq 1 60); do \ + if curl -s http://localhost:$(PRIMARY_S3_PORT) >/dev/null 2>&1; then \ + echo "Primary SeaweedFS started on port $(PRIMARY_S3_PORT)"; \ + exit 0; \ + fi; \ + sleep 1; \ + done; \ + echo "ERROR: Primary SeaweedFS failed to start"; \ + cat primary-weed.log; \ + exit 1 + +stop-primary: + @echo "Stopping primary SeaweedFS..." + @if [ -f primary-server.pid ]; then \ + kill -TERM $$(cat primary-server.pid) 2>/dev/null || true; \ + sleep 2; \ + kill -KILL $$(cat primary-server.pid) 2>/dev/null || true; \ + rm -f primary-server.pid; \ + fi + @echo "Primary SeaweedFS stopped" + +# Create bucket on remote and configure remote storage mount on primary +setup-remote: + @echo "Creating bucket on remote SeaweedFS..." + @curl -s -X PUT "http://localhost:$(REMOTE_S3_PORT)/$(REMOTE_BUCKET)" || echo "Bucket may already exist" + @sleep 1 + @echo "Configuring remote storage on primary..." + @printf 'remote.configure -name=seaweedremote -type=s3 -s3.access_key=any -s3.secret_key=any -s3.endpoint=http://localhost:$(REMOTE_S3_PORT) -s3.region=us-east-1\nexit\n' | $(WEED_BINARY) shell -master=localhost:$(PRIMARY_MASTER_PORT) 2>&1 || echo "remote.configure done" + @sleep 2 + @echo "Mounting remote bucket on primary..." + @printf 'remote.mount -dir=/buckets/remotemounted -remote=seaweedremote/$(REMOTE_BUCKET) -nonempty\nexit\n' | $(WEED_BINARY) shell -master=localhost:$(PRIMARY_MASTER_PORT) 2>&1 || echo "remote.mount done" + @sleep 1 + @echo "Remote storage configured" + +# Run tests +test: check-deps + @echo "Running remote cache tests..." + @go test -v -timeout=$(TEST_TIMEOUT) -run "$(TEST_PATTERN)" . + @echo "Tests completed" + +# Full test workflow +test-with-server: start-remote start-primary + @sleep 3 + @$(MAKE) setup-remote || (echo "Remote setup failed" && $(MAKE) stop-primary stop-remote && exit 1) + @sleep 2 + @echo "Running remote cache tests..." + @$(MAKE) test || (echo "Tests failed" && tail -50 primary-weed.log && $(MAKE) stop-primary stop-remote && exit 1) + @$(MAKE) stop-primary stop-remote + @echo "All tests passed" + +# Show logs +logs: + @echo "=== Primary SeaweedFS Logs ===" + @if [ -f primary-weed.log ]; then tail -50 primary-weed.log; else echo "No log file"; fi + @echo "" + @echo "=== Remote SeaweedFS Logs ===" + @if [ -f remote-weed.log ]; then tail -50 remote-weed.log; else echo "No log file"; fi + +logs-primary: + @if [ -f primary-weed.log ]; then tail -f primary-weed.log; else echo "No log file"; fi + +logs-remote: + @if [ -f remote-weed.log ]; then tail -f remote-weed.log; else echo "No log file"; fi + +# Clean up +clean: + @$(MAKE) stop-primary + @$(MAKE) stop-remote + @rm -f primary-weed.log remote-weed.log primary-server.pid remote-server.pid + @rm -rf $(PRIMARY_DIR) $(REMOTE_DIR) + @rm -f remote_cache.test + @go clean -testcache + @echo "Cleanup completed" + +# Health check +health: + @echo "Checking server status..." + @curl -s http://localhost:$(PRIMARY_S3_PORT) >/dev/null 2>&1 && echo "Primary S3 ($(PRIMARY_S3_PORT)): UP" || echo "Primary S3 ($(PRIMARY_S3_PORT)): DOWN" + @curl -s http://localhost:$(REMOTE_S3_PORT) >/dev/null 2>&1 && echo "Remote S3 ($(REMOTE_S3_PORT)): UP" || echo "Remote S3 ($(REMOTE_S3_PORT)): DOWN" diff --git a/test/s3/remote_cache/README.md b/test/s3/remote_cache/README.md new file mode 100644 index 000000000..7dcc0f702 --- /dev/null +++ b/test/s3/remote_cache/README.md @@ -0,0 +1,157 @@ +# Remote Object Cache Integration Tests + +This directory contains integration tests for the remote object caching feature with singleflight deduplication. + +## Test Flow + +Each test follows this pattern: +1. **Write to local** - Upload data to primary SeaweedFS (local storage) +2. **Uncache** - Push data to remote storage and remove local chunks +3. **Read** - Read data (triggers caching from remote back to local) + +This tests the full remote caching workflow including singleflight deduplication. + +## Architecture + +```text +┌─────────────────────────────────────────────────────────────────┐ +│ Test Client │ +│ │ +│ 1. PUT data to primary SeaweedFS │ +│ 2. remote.cache.uncache (push to remote, purge local) │ +│ 3. GET data (triggers caching from remote) │ +│ 4. Verify singleflight deduplication │ +└──────────────────────────────────┬──────────────────────────────┘ + │ + ┌─────────────────┴─────────────────┐ + ▼ ▼ +┌────────────────────────────────────┐ ┌────────────────────────────────┐ +│ Primary SeaweedFS │ │ Remote SeaweedFS │ +│ (port 8333) │ │ (port 8334) │ +│ │ │ │ +│ - Being tested │ │ - Acts as "remote" S3 │ +│ - Has remote storage mounted │──▶│ - Receives uncached data │ +│ - Caches remote objects │ │ - Serves data for caching │ +│ - Singleflight deduplication │ │ │ +└────────────────────────────────────┘ └────────────────────────────────┘ +``` + +## What's Being Tested + +1. **Basic Remote Caching**: Write → Uncache → Read workflow +2. **Singleflight Deduplication**: Concurrent reads only trigger ONE caching operation +3. **Large Object Caching**: 5MB files cache correctly +4. **Range Requests**: Partial reads work with cached objects +5. **Not Found Handling**: Proper error for non-existent objects + +## Quick Start + +### Run Full Test Suite (Recommended) + +```bash +# Build SeaweedFS, start both servers, run tests, stop servers +make test-with-server +``` + +### Manual Steps + +```bash +# 1. Build SeaweedFS binary +make build-weed + +# 2. Start remote SeaweedFS (acts as "remote" storage) +make start-remote + +# 3. Start primary SeaweedFS (the one being tested) +make start-primary + +# 4. Configure remote storage mount +make setup-remote + +# 5. Run tests +make test + +# 6. Clean up +make clean +``` + +## Configuration + +### Primary SeaweedFS (Being Tested) + +| Service | Port | +|---------|------| +| S3 API | 8333 | +| Filer | 8888 | +| Master | 9333 | +| Volume | 8080 | + +### Remote SeaweedFS (Remote Storage) + +| Service | Port | +|---------|------| +| S3 API | 8334 | +| Filer | 8889 | +| Master | 9334 | +| Volume | 8081 | + +## Makefile Targets + +```bash +make help # Show all available targets +make build-weed # Build SeaweedFS binary +make start-remote # Start remote SeaweedFS +make start-primary # Start primary SeaweedFS +make setup-remote # Configure remote storage mount +make test # Run tests +make test-with-server # Full automated test workflow +make logs # Show server logs +make health # Check server status +make clean # Stop servers and clean up +``` + +## Test Details + +### TestRemoteCacheBasic +Basic workflow test: +1. Write object to primary (local) +2. Uncache (push to remote, remove local chunks) +3. Read (triggers caching from remote) +4. Read again (from local cache - should be faster) + +### TestRemoteCacheConcurrent +Singleflight deduplication test: +1. Write 1MB object +2. Uncache to remote +3. Launch 10 concurrent reads +4. All should succeed with correct data +5. Only ONE caching operation should run (singleflight) + +### TestRemoteCacheLargeObject +Large file test (5MB) to verify chunked transfer works correctly. + +### TestRemoteCacheRangeRequest +Tests HTTP range requests work correctly after caching. + +### TestRemoteCacheNotFound +Tests proper error handling for non-existent objects. + +## Troubleshooting + +### View logs +```bash +make logs # Show recent logs from both servers +make logs-primary # Follow primary logs in real-time +make logs-remote # Follow remote logs in real-time +``` + +### Check server health +```bash +make health +``` + +### Clean up and retry +```bash +make clean +make test-with-server +``` diff --git a/test/s3/remote_cache/remote_cache_test.go b/test/s3/remote_cache/remote_cache_test.go new file mode 100644 index 000000000..08eca1802 --- /dev/null +++ b/test/s3/remote_cache/remote_cache_test.go @@ -0,0 +1,375 @@ +package remote_cache + +import ( + "bytes" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test configuration +// Uses two SeaweedFS instances: +// - Primary: The one being tested (has remote caching) +// - Remote: Acts as the "remote" S3 storage +const ( + // Primary SeaweedFS + primaryEndpoint = "http://localhost:8333" + primaryMasterPort = "9333" + + // Remote SeaweedFS (acts as remote storage) + remoteEndpoint = "http://localhost:8334" + + // Credentials (anonymous access for testing) + accessKey = "any" + secretKey = "any" + + // Bucket name - mounted on primary as remote storage + testBucket = "remotemounted" + + // Path to weed binary + weedBinary = "../../../weed/weed_binary" +) + +var ( + primaryClient *s3.S3 + primaryClientOnce sync.Once +) + +func getPrimaryClient() *s3.S3 { + primaryClientOnce.Do(func() { + primaryClient = createS3Client(primaryEndpoint) + }) + return primaryClient +} + +func createS3Client(endpoint string) *s3.S3 { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String(endpoint), + Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), + DisableSSL: aws.Bool(!strings.HasPrefix(endpoint, "https")), + S3ForcePathStyle: aws.Bool(true), + }) + if err != nil { + panic(fmt.Sprintf("failed to create session: %v", err)) + } + return s3.New(sess) +} + +// skipIfNotRunning skips the test if the servers aren't running +func skipIfNotRunning(t *testing.T) { + resp, err := http.Get(primaryEndpoint) + if err != nil { + t.Skipf("Primary SeaweedFS not running at %s: %v", primaryEndpoint, err) + } + resp.Body.Close() + + resp, err = http.Get(remoteEndpoint) + if err != nil { + t.Skipf("Remote SeaweedFS not running at %s: %v", remoteEndpoint, err) + } + resp.Body.Close() +} + +// runWeedShell executes a weed shell command +func runWeedShell(t *testing.T, command string) (string, error) { + cmd := exec.Command(weedBinary, "shell", "-master=localhost:"+primaryMasterPort) + cmd.Stdin = strings.NewReader(command + "\nexit\n") + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("weed shell command '%s' failed: %v, output: %s", command, err, string(output)) + return string(output), err + } + return string(output), nil +} + +// uploadToPrimary uploads an object to the primary SeaweedFS (local write) +func uploadToPrimary(t *testing.T, key string, data []byte) { + _, err := getPrimaryClient().PutObject(&s3.PutObjectInput{ + Bucket: aws.String(testBucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + require.NoError(t, err, "failed to upload to primary SeaweedFS") +} + +// getFromPrimary gets an object from primary SeaweedFS +func getFromPrimary(t *testing.T, key string) []byte { + resp, err := getPrimaryClient().GetObject(&s3.GetObjectInput{ + Bucket: aws.String(testBucket), + Key: aws.String(key), + }) + require.NoError(t, err, "failed to get from primary SeaweedFS") + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read response body") + return data +} + +// syncToRemote syncs local data to remote storage +func syncToRemote(t *testing.T) { + t.Log("Syncing to remote storage...") + output, err := runWeedShell(t, "remote.cache.uncache -dir=/buckets/"+testBucket+" -include=*") + if err != nil { + t.Logf("syncToRemote warning: %v", err) + } + t.Log(output) + time.Sleep(1 * time.Second) +} + +// uncacheLocal purges the local cache, forcing data to be fetched from remote +func uncacheLocal(t *testing.T, pattern string) { + t.Logf("Purging local cache for pattern: %s", pattern) + output, err := runWeedShell(t, fmt.Sprintf("remote.uncache -dir=/buckets/%s -include=%s", testBucket, pattern)) + if err != nil { + t.Logf("uncacheLocal warning: %v", err) + } + t.Log(output) + time.Sleep(500 * time.Millisecond) +} + +// TestRemoteCacheBasic tests the basic caching workflow: +// 1. Write to local +// 2. Uncache (push to remote, remove local chunks) +// 3. Read (triggers caching from remote) +func TestRemoteCacheBasic(t *testing.T) { + skipIfNotRunning(t) + + testKey := fmt.Sprintf("test-basic-%d.txt", time.Now().UnixNano()) + testData := []byte("Hello, this is test data for remote caching!") + + // Step 1: Write to local + t.Log("Step 1: Writing object to primary SeaweedFS (local)...") + uploadToPrimary(t, testKey, testData) + + // Verify it's readable + result := getFromPrimary(t, testKey) + assert.Equal(t, testData, result, "initial read mismatch") + + // Step 2: Uncache - push to remote and remove local chunks + t.Log("Step 2: Uncaching (pushing to remote, removing local chunks)...") + uncacheLocal(t, testKey) + + // Step 3: Read - this should trigger caching from remote + t.Log("Step 3: Reading object (should trigger caching from remote)...") + start := time.Now() + result = getFromPrimary(t, testKey) + firstReadDuration := time.Since(start) + + assert.Equal(t, testData, result, "data mismatch after cache") + t.Logf("First read (from remote) took %v", firstReadDuration) + + // Step 4: Read again - should be from local cache + t.Log("Step 4: Reading again (should be from local cache)...") + start = time.Now() + result = getFromPrimary(t, testKey) + secondReadDuration := time.Since(start) + + assert.Equal(t, testData, result, "data mismatch on cached read") + t.Logf("Second read (from cache) took %v", secondReadDuration) + + t.Log("Basic caching test passed") +} + +// TestRemoteCacheConcurrent tests that concurrent reads of the same +// remote object only trigger ONE caching operation (singleflight deduplication) +func TestRemoteCacheConcurrent(t *testing.T) { + skipIfNotRunning(t) + + testKey := fmt.Sprintf("test-concurrent-%d.txt", time.Now().UnixNano()) + // Use larger data to make caching take measurable time + testData := make([]byte, 1024*1024) // 1MB + for i := range testData { + testData[i] = byte(i % 256) + } + + // Step 1: Write to local + t.Log("Step 1: Writing 1MB object to primary SeaweedFS...") + uploadToPrimary(t, testKey, testData) + + // Verify it's readable + result := getFromPrimary(t, testKey) + assert.Equal(t, len(testData), len(result), "initial size mismatch") + + // Step 2: Uncache + t.Log("Step 2: Uncaching (pushing to remote)...") + uncacheLocal(t, testKey) + + // Step 3: Launch many concurrent reads - singleflight should deduplicate + numRequests := 10 + var wg sync.WaitGroup + var successCount atomic.Int32 + var errorCount atomic.Int32 + results := make(chan []byte, numRequests) + + t.Logf("Step 3: Launching %d concurrent requests...", numRequests) + startTime := time.Now() + + for i := 0; i < numRequests; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + resp, err := getPrimaryClient().GetObject(&s3.GetObjectInput{ + Bucket: aws.String(testBucket), + Key: aws.String(testKey), + }) + if err != nil { + t.Logf("Request %d failed: %v", idx, err) + errorCount.Add(1) + return + } + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Logf("Request %d read failed: %v", idx, err) + errorCount.Add(1) + return + } + + results <- data + successCount.Add(1) + }(i) + } + + wg.Wait() + close(results) + totalDuration := time.Since(startTime) + + t.Logf("All %d requests completed in %v", numRequests, totalDuration) + t.Logf("Successful: %d, Failed: %d", successCount.Load(), errorCount.Load()) + + // Verify all successful requests returned correct data + for data := range results { + assert.Equal(t, len(testData), len(data), "data length mismatch") + } + + // All requests should succeed + assert.Equal(t, int32(numRequests), successCount.Load(), "some requests failed") + assert.Equal(t, int32(0), errorCount.Load(), "no requests should fail") + + t.Log("Concurrent caching test passed") +} + +// TestRemoteCacheLargeObject tests caching of larger objects +func TestRemoteCacheLargeObject(t *testing.T) { + skipIfNotRunning(t) + + testKey := fmt.Sprintf("test-large-%d.bin", time.Now().UnixNano()) + // 5MB object + testData := make([]byte, 5*1024*1024) + for i := range testData { + testData[i] = byte(i % 256) + } + + // Step 1: Write to local + t.Log("Step 1: Writing 5MB object to primary SeaweedFS...") + uploadToPrimary(t, testKey, testData) + + // Verify it's readable + result := getFromPrimary(t, testKey) + assert.Equal(t, len(testData), len(result), "initial size mismatch") + + // Step 2: Uncache + t.Log("Step 2: Uncaching...") + uncacheLocal(t, testKey) + + // Step 3: Read from remote + t.Log("Step 3: Reading 5MB object (should cache from remote)...") + start := time.Now() + result = getFromPrimary(t, testKey) + duration := time.Since(start) + + assert.Equal(t, len(testData), len(result), "size mismatch") + assert.Equal(t, testData, result, "data mismatch") + t.Logf("Large object cached in %v", duration) + + t.Log("Large object caching test passed") +} + +// TestRemoteCacheRangeRequest tests that range requests work after caching +func TestRemoteCacheRangeRequest(t *testing.T) { + skipIfNotRunning(t) + + testKey := fmt.Sprintf("test-range-%d.txt", time.Now().UnixNano()) + testData := []byte("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") + + // Write, uncache, then test range request + t.Log("Writing and uncaching object...") + uploadToPrimary(t, testKey, testData) + uncacheLocal(t, testKey) + + // Range request should work and trigger caching + t.Log("Testing range request (bytes 10-19)...") + resp, err := getPrimaryClient().GetObject(&s3.GetObjectInput{ + Bucket: aws.String(testBucket), + Key: aws.String(testKey), + Range: aws.String("bytes=10-19"), + }) + require.NoError(t, err) + defer resp.Body.Close() + + rangeData, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expected := testData[10:20] // "ABCDEFGHIJ" + assert.Equal(t, expected, rangeData, "range data mismatch") + t.Logf("Range request returned: %s", string(rangeData)) + + t.Log("Range request test passed") +} + +// TestRemoteCacheNotFound tests that non-existent objects return proper errors +func TestRemoteCacheNotFound(t *testing.T) { + skipIfNotRunning(t) + + testKey := fmt.Sprintf("non-existent-object-%d", time.Now().UnixNano()) + + _, err := getPrimaryClient().GetObject(&s3.GetObjectInput{ + Bucket: aws.String(testBucket), + Key: aws.String(testKey), + }) + + assert.Error(t, err, "should get error for non-existent object") + t.Logf("Got expected error: %v", err) + + t.Log("Not found test passed") +} + +// TestMain sets up and tears down the test environment +func TestMain(m *testing.M) { + if !isServerRunning(primaryEndpoint) { + fmt.Println("WARNING: Primary SeaweedFS not running at", primaryEndpoint) + fmt.Println(" Run 'make test-with-server' to start servers automatically") + } + if !isServerRunning(remoteEndpoint) { + fmt.Println("WARNING: Remote SeaweedFS not running at", remoteEndpoint) + fmt.Println(" Run 'make test-with-server' to start servers automatically") + } + + os.Exit(m.Run()) +} + +func isServerRunning(url string) bool { + resp, err := http.Get(url) + if err != nil { + return false + } + resp.Body.Close() + return true +} diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index 992d1e95a..bac0e5e82 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -25,12 +25,20 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):]) } -func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { - return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ +// CacheRemoteObjectToLocalCluster caches a remote object to the local cluster. +// It returns the updated entry with local chunk locations. +// Parameters remoteConf and remoteLocation are kept for backward compatibility but are not used. +func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) (*filer_pb.Entry, error) { + var cachedEntry *filer_pb.Entry + err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, cacheErr := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: string(parent), Name: entry.Name, }) - return err + if cacheErr == nil && resp != nil { + cachedEntry = resp.Entry + } + return cacheErr }) + return cachedEntry, err } diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 2d6444c6c..e2d54e307 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -196,6 +196,9 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) // Update cache glog.V(3).Infof("updateBucketConfigCacheFromEntry: updating cache for bucket %s, ObjectLockConfig=%+v", bucket, config.ObjectLockConfig) s3a.bucketConfigCache.Set(bucket, config) + // Remove from negative cache since bucket now exists + // This is important for buckets created via weed shell or other external means + s3a.bucketConfigCache.RemoveNegativeCache(bucket) } // invalidateBucketConfigCache removes a bucket from the configuration cache diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go index 431f7beb1..c036137f4 100644 --- a/weed/s3api/bucket_metadata.go +++ b/weed/s3api/bucket_metadata.go @@ -3,14 +3,15 @@ package s3api import ( "context" "encoding/json" + "math" + "sync" + "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" - "math" - "sync" ) var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) { @@ -85,8 +86,10 @@ func (r *BucketRegistry) init() error { func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) { bucketMetadata := buildBucketMetadata(r.s3a.iam, entry) r.metadataCacheLock.Lock() - defer r.metadataCacheLock.Unlock() r.metadataCache[entry.Name] = bucketMetadata + r.metadataCacheLock.Unlock() + // Remove from notFound cache since bucket now exists + r.unMarkNotFound(entry.Name) } func buildBucketMetadata(accountManager AccountManager, entry *filer_pb.Entry) *BucketMetaData { diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 42d198673..b6219bf4a 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -659,6 +659,13 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) return } + // Handle remote storage objects: cache to local cluster if object is remote-only + // This uses singleflight to deduplicate concurrent caching requests for the same object + // On cache error, gracefully falls back to streaming from remote + if objectEntryForSSE.IsInRemoteOnly() { + objectEntryForSSE = s3a.cacheRemoteObjectWithDedup(r.Context(), bucket, object, objectEntryForSSE) + } + // Re-check bucket policy with object entry for tag-based conditions (e.g., s3:ExistingObjectTag) if errCode := s3a.recheckPolicyWithObjectEntry(r, bucket, object, string(s3_constants.ACTION_READ), objectEntryForSSE.Extended, "GetObjectHandler"); errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -3319,3 +3326,63 @@ func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) // No part boundaries metadata or part not found return partsCount, nil } + +// cacheRemoteObjectWithDedup caches a remote-only object to the local cluster. +// The filer server handles singleflight deduplication, so all clients (S3, HTTP, Hadoop) benefit. +// On cache error, returns the original entry (streaming from remote will still work). +// Uses a bounded timeout to avoid blocking requests indefinitely. +func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, object string, entry *filer_pb.Entry) *filer_pb.Entry { + // Use a bounded timeout for caching to avoid blocking requests indefinitely + // 30 seconds should be enough for most objects; large objects may timeout but will still stream + const cacheTimeout = 30 * time.Second + cacheCtx, cancel := context.WithTimeout(ctx, cacheTimeout) + defer cancel() + + // Build the full path for the object + // Normalize object path: remove duplicate slashes and leading slash to avoid double slashes in path + dir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := strings.TrimPrefix(removeDuplicateSlashes(object), "/") + if idx := strings.LastIndex(normalizedObject, "/"); idx > 0 { + dir = dir + "/" + normalizedObject[:idx] + normalizedObject = normalizedObject[idx+1:] + } + + glog.V(2).Infof("cacheRemoteObjectWithDedup: caching %s/%s (remote size: %d)", bucket, object, entry.RemoteEntry.RemoteSize) + + // Call the filer's CacheRemoteObjectToLocalCluster via gRPC + // The filer handles singleflight deduplication internally + var cachedEntry *filer_pb.Entry + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, cacheErr := client.CacheRemoteObjectToLocalCluster(cacheCtx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{ + Directory: dir, + Name: normalizedObject, + }) + if cacheErr != nil { + return cacheErr + } + if resp != nil && resp.Entry != nil { + cachedEntry = resp.Entry + } + return nil + }) + + if err != nil { + // Caching failed - log and return original entry + // Streaming from remote storage will still work via filer proxy + if errors.Is(err, context.DeadlineExceeded) { + glog.V(1).Infof("cacheRemoteObjectWithDedup: timeout caching %s/%s after %v (will stream from remote)", bucket, object, cacheTimeout) + } else { + glog.Warningf("cacheRemoteObjectWithDedup: failed to cache %s/%s: %v (will stream from remote)", bucket, object, err) + } + return entry + } + + // If caching succeeded and we got chunks, use the cached entry's chunks + if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 { + glog.V(1).Infof("cacheRemoteObjectWithDedup: successfully cached %s/%s (%d chunks)", bucket, object, len(cachedEntry.GetChunks())) + // Preserve original entry metadata but use new chunks + entry.Chunks = cachedEntry.Chunks + } + + return entry +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 840aeab0e..ffb50e8c1 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -70,7 +70,7 @@ type S3ApiServer struct { inFlightDataSize int64 inFlightUploads int64 inFlightDataLimitCond *sync.Cond - embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 081d49ba0..7aafe0799 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -3,11 +3,13 @@ package weed_server import ( "context" "fmt" + "sort" "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -19,6 +21,59 @@ import ( ) func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) { + // Use singleflight to deduplicate concurrent caching requests for the same object + // This benefits all clients: S3 API, filer HTTP, Hadoop, etc. + cacheKey := req.Directory + "/" + req.Name + + result, err, shared := fs.remoteCacheGroup.Do(cacheKey, func() (interface{}, error) { + return fs.doCacheRemoteObjectToLocalCluster(ctx, req) + }) + + if shared { + glog.V(2).Infof("CacheRemoteObjectToLocalCluster: shared result for %s", cacheKey) + } + + if err != nil { + return nil, err + } + if result == nil { + return nil, fmt.Errorf("unexpected nil result from singleflight") + } + + resp, ok := result.(*filer_pb.CacheRemoteObjectToLocalClusterResponse) + if !ok { + return nil, fmt.Errorf("unexpected result type from singleflight") + } + return resp, nil +} + +// doCacheRemoteObjectToLocalCluster performs the actual caching operation. +// This is called from singleflight, so only one instance runs per object. +func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) { + // find the entry first to check if already cached + entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) + if err == filer_pb.ErrNotFound { + return nil, err + } + if err != nil { + return nil, fmt.Errorf("find entry %s/%s: %v", req.Directory, req.Name, err) + } + + resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{} + + // Early return if not a remote-only object or already cached + if entry.Remote == nil || entry.Remote.RemoteSize == 0 { + resp.Entry = entry.ToProtoEntry() + return resp, nil + } + if len(entry.GetChunks()) > 0 { + // Already has local chunks - already cached + glog.V(2).Infof("CacheRemoteObjectToLocalCluster: %s/%s already cached (%d chunks)", req.Directory, req.Name, len(entry.GetChunks())) + resp.Entry = entry.ToProtoEntry() + return resp, nil + } + + glog.V(1).Infof("CacheRemoteObjectToLocalCluster: caching %s/%s (remote size: %d)", req.Directory, req.Name, entry.Remote.RemoteSize) // load all mappings mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)) @@ -52,17 +107,6 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) } - // find the entry - entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) - if err == filer_pb.ErrNotFound { - return nil, err - } - - resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{} - if entry.Remote == nil || entry.Remote.RemoteSize == 0 { - return resp, nil - } - // detect storage option so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "") if err != nil { @@ -81,6 +125,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) var chunks []*filer_pb.FileChunk + var chunksMu sync.Mutex var fetchAndWriteErr error var wg sync.WaitGroup @@ -99,16 +144,28 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // assign one volume server assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { - fetchAndWriteErr = err + chunksMu.Lock() + if fetchAndWriteErr == nil { + fetchAndWriteErr = err + } + chunksMu.Unlock() return } if assignResult.Error != "" { - fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error) + chunksMu.Lock() + if fetchAndWriteErr == nil { + fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error) + } + chunksMu.Unlock() return } fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid) - if assignResult.Error != "" { - fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) + if parseErr != nil { + chunksMu.Lock() + if fetchAndWriteErr == nil { + fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) + } + chunksMu.Unlock() return } @@ -125,7 +182,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) var etag string err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ + resp, fetchErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), Cookie: uint32(fileId.Cookie), @@ -140,21 +197,23 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req Path: string(dest), }, }) - if fetchAndWriteErr != nil { - return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) - } else { - etag = resp.ETag + if fetchErr != nil { + return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchErr) } + etag = resp.ETag return nil }) - if err != nil && fetchAndWriteErr == nil { - fetchAndWriteErr = err + if err != nil { + chunksMu.Lock() + if fetchAndWriteErr == nil { + fetchAndWriteErr = err + } + chunksMu.Unlock() return } - chunks = append(chunks, &filer_pb.FileChunk{ - + chunk := &filer_pb.FileChunk{ FileId: assignResult.Fid, Offset: localOffset, Size: uint64(size), @@ -165,13 +224,24 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req FileKey: uint64(fileId.Key), Cookie: uint32(fileId.Cookie), }, - }) + } + chunksMu.Lock() + chunks = append(chunks, chunk) + chunksMu.Unlock() }) } wg.Wait() - if fetchAndWriteErr != nil { - return nil, fetchAndWriteErr + + chunksMu.Lock() + err = fetchAndWriteErr + // Sort chunks by offset to maintain file order + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].Offset < chunks[j].Offset + }) + chunksMu.Unlock() + if err != nil { + return nil, err } garbage := entry.GetChunks() diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 5cf7d42e0..57fd7ab25 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/stats" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" @@ -108,6 +109,9 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 + + // deduplicates concurrent remote object caching operations + remoteCacheGroup singleflight.Group } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 8b165b392..60ca33147 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -345,7 +345,7 @@ func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy) - if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil { + if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil { fmt.Fprintf(writer, "failed: %v\n", err) if executionErr == nil { executionErr = err