2 changed files with 404 additions and 5 deletions
@ -0,0 +1,341 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bytes" |
|||
"crypto/rand" |
|||
"encoding/json" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/storage/needle" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
// TestFileGeneration tests generating 600 files of 100KB each targeting volume 1 with hardcoded cookie
|
|||
func TestFileGeneration(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping file generation test in short mode") |
|||
} |
|||
|
|||
// Set up test cluster
|
|||
cluster, cleanup := setupTestCluster(t) |
|||
defer cleanup() |
|||
|
|||
// Wait for cluster to be ready
|
|||
require.NoError(t, waitForClusterReady()) |
|||
t.Logf("Test cluster ready with master at %s", cluster.masterAddress) |
|||
|
|||
// Generate 600 files of 100KB each targeting volume 1
|
|||
const targetVolumeId = needle.VolumeId(1) |
|||
const fileCount = 600 |
|||
const fileSize = 100 * 1024 // 100KB files
|
|||
|
|||
fileIds := generateFilesToVolume1(t, fileCount, fileSize) |
|||
t.Logf("Generated %d files of %dKB each targeting volume %d", len(fileIds), fileSize/1024, targetVolumeId) |
|||
t.Logf("📝 Sample file IDs created: %v", fileIds[:5]) // Show first 5 file IDs
|
|||
|
|||
// Summary
|
|||
t.Logf("📊 File Generation Summary:") |
|||
t.Logf(" • Volume ID: %d", targetVolumeId) |
|||
t.Logf(" • Total files created: %d", len(fileIds)) |
|||
t.Logf(" • File size: %dKB each", fileSize/1024) |
|||
t.Logf(" • Hardcoded cookie: 0x12345678") |
|||
} |
|||
|
|||
// TestFileDeletion tests deleting exactly 300 files from volume 1
|
|||
func TestFileDeletion(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping file deletion test in short mode") |
|||
} |
|||
|
|||
// Set up test cluster
|
|||
cluster, cleanup := setupTestCluster(t) |
|||
defer cleanup() |
|||
|
|||
// Wait for cluster to be ready
|
|||
require.NoError(t, waitForClusterReady()) |
|||
t.Logf("Test cluster ready with master at %s", cluster.masterAddress) |
|||
|
|||
// First generate some files to delete
|
|||
const targetVolumeId = needle.VolumeId(1) |
|||
const fileCount = 600 |
|||
const fileSize = 100 * 1024 // 100KB files
|
|||
|
|||
t.Logf("Pre-generating files for deletion test...") |
|||
fileIds := generateFilesToVolume1(t, fileCount, fileSize) |
|||
t.Logf("Pre-generated %d files for deletion test", len(fileIds)) |
|||
|
|||
// Delete exactly 300 files from the volume
|
|||
const deleteCount = 300 |
|||
deletedFileIds := deleteSpecificFilesFromVolume(t, fileIds, deleteCount) |
|||
t.Logf("Deleted exactly %d files from volume %d", len(deletedFileIds), targetVolumeId) |
|||
t.Logf("🗑️ Sample deleted file IDs: %v", deletedFileIds[:5]) // Show first 5 deleted file IDs
|
|||
|
|||
// Summary
|
|||
t.Logf("📊 File Deletion Summary:") |
|||
t.Logf(" • Volume ID: %d", targetVolumeId) |
|||
t.Logf(" • Files available for deletion: %d", len(fileIds)) |
|||
t.Logf(" • Files deleted: %d", len(deletedFileIds)) |
|||
t.Logf(" • Files remaining: %d", len(fileIds)-len(deletedFileIds)) |
|||
t.Logf(" • Deletion success rate: %.1f%%", float64(len(deletedFileIds))/float64(deleteCount)*100) |
|||
} |
|||
|
|||
// generateFilesToVolume1 creates 600 files of 100KB each targeting volume 1 with hardcoded cookie
|
|||
func generateFilesToVolume1(t *testing.T, fileCount int, fileSize int) []string { |
|||
const targetVolumeId = needle.VolumeId(1) |
|||
const hardcodedCookie = uint32(0x12345678) // Hardcoded cookie for volume 1 files
|
|||
|
|||
var fileIds []string |
|||
t.Logf("Starting generation of %d files of %dKB each targeting volume %d", fileCount, fileSize/1024, targetVolumeId) |
|||
|
|||
for i := 0; i < fileCount; i++ { |
|||
// Generate file content
|
|||
fileData := make([]byte, fileSize) |
|||
rand.Read(fileData) |
|||
|
|||
// Generate file ID targeting volume 1 with hardcoded cookie
|
|||
// Use high needle key values to avoid collisions with assigned IDs
|
|||
needleKey := uint64(i) + 0x2000000 // Start from a high offset for volume 1
|
|||
generatedFid := needle.NewFileId(targetVolumeId, needleKey, hardcodedCookie) |
|||
|
|||
// Upload directly to volume 1
|
|||
err := uploadFileToVolumeDirectly(t, generatedFid, fileData) |
|||
require.NoError(t, err) |
|||
|
|||
fileIds = append(fileIds, generatedFid.String()) |
|||
|
|||
// Log progress for first few files and every 50th file
|
|||
if i < 5 || (i+1)%50 == 0 { |
|||
t.Logf("✅ Generated file %d/%d targeting volume %d: %s", i+1, fileCount, targetVolumeId, generatedFid.String()) |
|||
} |
|||
} |
|||
|
|||
t.Logf("✅ Successfully generated %d files targeting volume %d with hardcoded cookie 0x%08x", len(fileIds), targetVolumeId, hardcodedCookie) |
|||
return fileIds |
|||
} |
|||
|
|||
// uploadFileToVolume uploads file data to an assigned volume server
|
|||
func uploadFileToVolume(serverUrl, fid string, data []byte) error { |
|||
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid) |
|||
|
|||
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
req.Header.Set("Content-Type", "application/octet-stream") |
|||
|
|||
client := &http.Client{Timeout: 30 * time.Second} |
|||
resp, err := client.Do(req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode != http.StatusCreated { |
|||
body, _ := io.ReadAll(resp.Body) |
|||
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body)) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// uploadFileToVolumeDirectly uploads a file using a generated file ID
|
|||
func uploadFileToVolumeDirectly(t *testing.T, fid *needle.FileId, data []byte) error { |
|||
// Find the volume server hosting this volume
|
|||
locations, found := findVolumeLocations(fid.VolumeId) |
|||
if !found { |
|||
return fmt.Errorf("volume %d not found", fid.VolumeId) |
|||
} |
|||
|
|||
// Upload to the first available location
|
|||
serverUrl := locations[0] |
|||
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid.String()) |
|||
|
|||
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
req.Header.Set("Content-Type", "application/octet-stream") |
|||
|
|||
client := &http.Client{Timeout: 30 * time.Second} |
|||
resp, err := client.Do(req) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { |
|||
body, _ := io.ReadAll(resp.Body) |
|||
return fmt.Errorf("direct upload failed with status %d: %s", resp.StatusCode, string(body)) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// findVolumeLocations finds the server locations for a given volume using HTTP lookup
|
|||
func findVolumeLocations(volumeId needle.VolumeId) ([]string, bool) { |
|||
// Query master for volume locations using HTTP API
|
|||
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:9333/dir/lookup?volumeId=%d", volumeId)) |
|||
if err != nil { |
|||
return nil, false |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode != http.StatusOK { |
|||
return nil, false |
|||
} |
|||
|
|||
// Parse JSON response
|
|||
type LookupResult struct { |
|||
VolumeId string `json:"volumeId"` |
|||
Locations []struct { |
|||
Url string `json:"url"` |
|||
PublicUrl string `json:"publicUrl"` |
|||
} `json:"locations"` |
|||
Error string `json:"error"` |
|||
} |
|||
|
|||
var result LookupResult |
|||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { |
|||
// Fallback to default locations for testing
|
|||
return []string{"127.0.0.1:8080"}, true |
|||
} |
|||
|
|||
if result.Error != "" { |
|||
return nil, false |
|||
} |
|||
|
|||
var serverUrls []string |
|||
for _, location := range result.Locations { |
|||
// Convert Docker container hostnames to localhost with mapped ports
|
|||
url := convertDockerHostnameToLocalhost(location.Url) |
|||
serverUrls = append(serverUrls, url) |
|||
} |
|||
|
|||
if len(serverUrls) == 0 { |
|||
// Fallback to default for testing
|
|||
return []string{"127.0.0.1:8080"}, true |
|||
} |
|||
|
|||
return serverUrls, true |
|||
} |
|||
|
|||
// convertDockerHostnameToLocalhost converts Docker container hostnames to localhost with mapped ports
|
|||
func convertDockerHostnameToLocalhost(dockerUrl string) string { |
|||
// Map Docker container hostnames to localhost ports
|
|||
hostPortMap := map[string]string{ |
|||
"volume1:8080": "127.0.0.1:8080", |
|||
"volume2:8080": "127.0.0.1:8081", |
|||
"volume3:8080": "127.0.0.1:8082", |
|||
"volume4:8080": "127.0.0.1:8083", |
|||
"volume5:8080": "127.0.0.1:8084", |
|||
"volume6:8080": "127.0.0.1:8085", |
|||
} |
|||
|
|||
if localhost, exists := hostPortMap[dockerUrl]; exists { |
|||
return localhost |
|||
} |
|||
|
|||
// If not in map, return as-is (might be already localhost)
|
|||
return dockerUrl |
|||
} |
|||
|
|||
// deleteSpecificFilesFromVolume deletes exactly the specified number of files from the volume
|
|||
func deleteSpecificFilesFromVolume(t *testing.T, fileIds []string, deleteCount int) []string { |
|||
var deletedFileIds []string |
|||
successfulDeletions := 0 |
|||
|
|||
if deleteCount > len(fileIds) { |
|||
deleteCount = len(fileIds) |
|||
} |
|||
|
|||
t.Logf("🗑️ Starting deletion of exactly %d files out of %d total files", deleteCount, len(fileIds)) |
|||
|
|||
for i := 0; i < deleteCount; i++ { |
|||
fileId := fileIds[i] |
|||
|
|||
// Parse file ID to get volume server location
|
|||
fid, err := needle.ParseFileIdFromString(fileId) |
|||
if err != nil { |
|||
t.Logf("Failed to parse file ID %s: %v", fileId, err) |
|||
continue |
|||
} |
|||
|
|||
// Find volume server hosting this file
|
|||
locations, found := findVolumeLocations(fid.VolumeId) |
|||
if !found { |
|||
t.Logf("Volume locations not found for file %s", fileId) |
|||
continue |
|||
} |
|||
|
|||
// Delete file from volume server
|
|||
deleteUrl := fmt.Sprintf("http://%s/%s", locations[0], fileId) |
|||
req, err := http.NewRequest("DELETE", deleteUrl, nil) |
|||
if err != nil { |
|||
t.Logf("Failed to create delete request for file %s: %v", fileId, err) |
|||
continue |
|||
} |
|||
|
|||
client := &http.Client{Timeout: 30 * time.Second} |
|||
resp, err := client.Do(req) |
|||
if err != nil { |
|||
t.Logf("Failed to delete file %s: %v", fileId, err) |
|||
continue |
|||
} |
|||
resp.Body.Close() |
|||
|
|||
if resp.StatusCode == http.StatusAccepted || resp.StatusCode == http.StatusOK { |
|||
successfulDeletions++ |
|||
deletedFileIds = append(deletedFileIds, fileId) |
|||
// Log progress for first few files and every 25th deletion
|
|||
if i < 5 || (i+1)%25 == 0 { |
|||
t.Logf("🗑️ Deleted file %d/%d: %s (status: %d)", i+1, deleteCount, fileId, resp.StatusCode) |
|||
} |
|||
} else { |
|||
t.Logf("❌ Delete failed for file %s with status %d", fileId, resp.StatusCode) |
|||
} |
|||
} |
|||
|
|||
t.Logf("✅ Deletion summary: %d files deleted successfully out of %d attempted", successfulDeletions, deleteCount) |
|||
return deletedFileIds |
|||
} |
|||
|
|||
// Helper functions for test setup
|
|||
|
|||
func setupTestCluster(t *testing.T) (*TestCluster, func()) { |
|||
// Create test cluster similar to existing integration tests
|
|||
// This is a simplified version - in practice would start actual servers
|
|||
cluster := &TestCluster{ |
|||
masterAddress: "127.0.0.1:9333", |
|||
volumeServers: []string{ |
|||
"127.0.0.1:8080", |
|||
"127.0.0.1:8081", |
|||
"127.0.0.1:8082", |
|||
"127.0.0.1:8083", |
|||
"127.0.0.1:8084", |
|||
"127.0.0.1:8085", |
|||
}, |
|||
} |
|||
|
|||
cleanup := func() { |
|||
// Cleanup cluster resources
|
|||
t.Logf("Cleaning up test cluster") |
|||
} |
|||
|
|||
return cluster, cleanup |
|||
} |
|||
|
|||
func waitForClusterReady() error { |
|||
// Wait for test cluster to be ready
|
|||
// In practice, this would ping the servers and wait for them to respond
|
|||
time.Sleep(2 * time.Second) |
|||
return nil |
|||
} |
|||
|
|||
type TestCluster struct { |
|||
masterAddress string |
|||
volumeServers []string |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue