You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
341 lines
10 KiB
341 lines
10 KiB
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestFileGeneration tests generating 600 files of 100KB each targeting volume 1 with hardcoded cookie
|
|
func TestFileGeneration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping file generation test in short mode")
|
|
}
|
|
|
|
// Set up test cluster
|
|
cluster, cleanup := setupTestCluster(t)
|
|
defer cleanup()
|
|
|
|
// Wait for cluster to be ready
|
|
require.NoError(t, waitForClusterReady())
|
|
t.Logf("Test cluster ready with master at %s", cluster.masterAddress)
|
|
|
|
// Generate 600 files of 100KB each targeting volume 1
|
|
const targetVolumeId = needle.VolumeId(1)
|
|
const fileCount = 600
|
|
const fileSize = 100 * 1024 // 100KB files
|
|
|
|
fileIds := generateFilesToVolume1(t, fileCount, fileSize)
|
|
t.Logf("Generated %d files of %dKB each targeting volume %d", len(fileIds), fileSize/1024, targetVolumeId)
|
|
t.Logf("📝 Sample file IDs created: %v", fileIds[:5]) // Show first 5 file IDs
|
|
|
|
// Summary
|
|
t.Logf("📊 File Generation Summary:")
|
|
t.Logf(" • Volume ID: %d", targetVolumeId)
|
|
t.Logf(" • Total files created: %d", len(fileIds))
|
|
t.Logf(" • File size: %dKB each", fileSize/1024)
|
|
t.Logf(" • Hardcoded cookie: 0x12345678")
|
|
}
|
|
|
|
// TestFileDeletion tests deleting exactly 300 files from volume 1
|
|
func TestFileDeletion(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping file deletion test in short mode")
|
|
}
|
|
|
|
// Set up test cluster
|
|
cluster, cleanup := setupTestCluster(t)
|
|
defer cleanup()
|
|
|
|
// Wait for cluster to be ready
|
|
require.NoError(t, waitForClusterReady())
|
|
t.Logf("Test cluster ready with master at %s", cluster.masterAddress)
|
|
|
|
// First generate some files to delete
|
|
const targetVolumeId = needle.VolumeId(1)
|
|
const fileCount = 600
|
|
const fileSize = 100 * 1024 // 100KB files
|
|
|
|
t.Logf("Pre-generating files for deletion test...")
|
|
fileIds := generateFilesToVolume1(t, fileCount, fileSize)
|
|
t.Logf("Pre-generated %d files for deletion test", len(fileIds))
|
|
|
|
// Delete exactly 300 files from the volume
|
|
const deleteCount = 300
|
|
deletedFileIds := deleteSpecificFilesFromVolume(t, fileIds, deleteCount)
|
|
t.Logf("Deleted exactly %d files from volume %d", len(deletedFileIds), targetVolumeId)
|
|
t.Logf("🗑️ Sample deleted file IDs: %v", deletedFileIds[:5]) // Show first 5 deleted file IDs
|
|
|
|
// Summary
|
|
t.Logf("📊 File Deletion Summary:")
|
|
t.Logf(" • Volume ID: %d", targetVolumeId)
|
|
t.Logf(" • Files available for deletion: %d", len(fileIds))
|
|
t.Logf(" • Files deleted: %d", len(deletedFileIds))
|
|
t.Logf(" • Files remaining: %d", len(fileIds)-len(deletedFileIds))
|
|
t.Logf(" • Deletion success rate: %.1f%%", float64(len(deletedFileIds))/float64(deleteCount)*100)
|
|
}
|
|
|
|
// generateFilesToVolume1 creates 600 files of 100KB each targeting volume 1 with hardcoded cookie
|
|
func generateFilesToVolume1(t *testing.T, fileCount int, fileSize int) []string {
|
|
const targetVolumeId = needle.VolumeId(1)
|
|
const hardcodedCookie = uint32(0x12345678) // Hardcoded cookie for volume 1 files
|
|
|
|
var fileIds []string
|
|
t.Logf("Starting generation of %d files of %dKB each targeting volume %d", fileCount, fileSize/1024, targetVolumeId)
|
|
|
|
for i := 0; i < fileCount; i++ {
|
|
// Generate file content
|
|
fileData := make([]byte, fileSize)
|
|
rand.Read(fileData)
|
|
|
|
// Generate file ID targeting volume 1 with hardcoded cookie
|
|
// Use high needle key values to avoid collisions with assigned IDs
|
|
needleKey := uint64(i) + 0x2000000 // Start from a high offset for volume 1
|
|
generatedFid := needle.NewFileId(targetVolumeId, needleKey, hardcodedCookie)
|
|
|
|
// Upload directly to volume 1
|
|
err := uploadFileToVolumeDirectly(t, generatedFid, fileData)
|
|
require.NoError(t, err)
|
|
|
|
fileIds = append(fileIds, generatedFid.String())
|
|
|
|
// Log progress for first few files and every 50th file
|
|
if i < 5 || (i+1)%50 == 0 {
|
|
t.Logf("✅ Generated file %d/%d targeting volume %d: %s", i+1, fileCount, targetVolumeId, generatedFid.String())
|
|
}
|
|
}
|
|
|
|
t.Logf("✅ Successfully generated %d files targeting volume %d with hardcoded cookie 0x%08x", len(fileIds), targetVolumeId, hardcodedCookie)
|
|
return fileIds
|
|
}
|
|
|
|
// uploadFileToVolume uploads file data to an assigned volume server
|
|
func uploadFileToVolume(serverUrl, fid string, data []byte) error {
|
|
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid)
|
|
|
|
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
|
|
client := &http.Client{Timeout: 30 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusCreated {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadFileToVolumeDirectly uploads a file using a generated file ID
|
|
func uploadFileToVolumeDirectly(t *testing.T, fid *needle.FileId, data []byte) error {
|
|
// Find the volume server hosting this volume
|
|
locations, found := findVolumeLocations(fid.VolumeId)
|
|
if !found {
|
|
return fmt.Errorf("volume %d not found", fid.VolumeId)
|
|
}
|
|
|
|
// Upload to the first available location
|
|
serverUrl := locations[0]
|
|
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid.String())
|
|
|
|
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
|
|
client := &http.Client{Timeout: 30 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("direct upload failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// findVolumeLocations finds the server locations for a given volume using HTTP lookup
|
|
func findVolumeLocations(volumeId needle.VolumeId) ([]string, bool) {
|
|
// Query master for volume locations using HTTP API
|
|
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:9333/dir/lookup?volumeId=%d", volumeId))
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, false
|
|
}
|
|
|
|
// Parse JSON response
|
|
type LookupResult struct {
|
|
VolumeId string `json:"volumeId"`
|
|
Locations []struct {
|
|
Url string `json:"url"`
|
|
PublicUrl string `json:"publicUrl"`
|
|
} `json:"locations"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
var result LookupResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
// Fallback to default locations for testing
|
|
return []string{"127.0.0.1:8080"}, true
|
|
}
|
|
|
|
if result.Error != "" {
|
|
return nil, false
|
|
}
|
|
|
|
var serverUrls []string
|
|
for _, location := range result.Locations {
|
|
// Convert Docker container hostnames to localhost with mapped ports
|
|
url := convertDockerHostnameToLocalhost(location.Url)
|
|
serverUrls = append(serverUrls, url)
|
|
}
|
|
|
|
if len(serverUrls) == 0 {
|
|
// Fallback to default for testing
|
|
return []string{"127.0.0.1:8080"}, true
|
|
}
|
|
|
|
return serverUrls, true
|
|
}
|
|
|
|
// convertDockerHostnameToLocalhost converts Docker container hostnames to localhost with mapped ports
|
|
func convertDockerHostnameToLocalhost(dockerUrl string) string {
|
|
// Map Docker container hostnames to localhost ports
|
|
hostPortMap := map[string]string{
|
|
"volume1:8080": "127.0.0.1:8080",
|
|
"volume2:8080": "127.0.0.1:8081",
|
|
"volume3:8080": "127.0.0.1:8082",
|
|
"volume4:8080": "127.0.0.1:8083",
|
|
"volume5:8080": "127.0.0.1:8084",
|
|
"volume6:8080": "127.0.0.1:8085",
|
|
}
|
|
|
|
if localhost, exists := hostPortMap[dockerUrl]; exists {
|
|
return localhost
|
|
}
|
|
|
|
// If not in map, return as-is (might be already localhost)
|
|
return dockerUrl
|
|
}
|
|
|
|
// deleteSpecificFilesFromVolume deletes exactly the specified number of files from the volume
|
|
func deleteSpecificFilesFromVolume(t *testing.T, fileIds []string, deleteCount int) []string {
|
|
var deletedFileIds []string
|
|
successfulDeletions := 0
|
|
|
|
if deleteCount > len(fileIds) {
|
|
deleteCount = len(fileIds)
|
|
}
|
|
|
|
t.Logf("🗑️ Starting deletion of exactly %d files out of %d total files", deleteCount, len(fileIds))
|
|
|
|
for i := 0; i < deleteCount; i++ {
|
|
fileId := fileIds[i]
|
|
|
|
// Parse file ID to get volume server location
|
|
fid, err := needle.ParseFileIdFromString(fileId)
|
|
if err != nil {
|
|
t.Logf("Failed to parse file ID %s: %v", fileId, err)
|
|
continue
|
|
}
|
|
|
|
// Find volume server hosting this file
|
|
locations, found := findVolumeLocations(fid.VolumeId)
|
|
if !found {
|
|
t.Logf("Volume locations not found for file %s", fileId)
|
|
continue
|
|
}
|
|
|
|
// Delete file from volume server
|
|
deleteUrl := fmt.Sprintf("http://%s/%s", locations[0], fileId)
|
|
req, err := http.NewRequest("DELETE", deleteUrl, nil)
|
|
if err != nil {
|
|
t.Logf("Failed to create delete request for file %s: %v", fileId, err)
|
|
continue
|
|
}
|
|
|
|
client := &http.Client{Timeout: 30 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
t.Logf("Failed to delete file %s: %v", fileId, err)
|
|
continue
|
|
}
|
|
resp.Body.Close()
|
|
|
|
if resp.StatusCode == http.StatusAccepted || resp.StatusCode == http.StatusOK {
|
|
successfulDeletions++
|
|
deletedFileIds = append(deletedFileIds, fileId)
|
|
// Log progress for first few files and every 25th deletion
|
|
if i < 5 || (i+1)%25 == 0 {
|
|
t.Logf("🗑️ Deleted file %d/%d: %s (status: %d)", i+1, deleteCount, fileId, resp.StatusCode)
|
|
}
|
|
} else {
|
|
t.Logf("❌ Delete failed for file %s with status %d", fileId, resp.StatusCode)
|
|
}
|
|
}
|
|
|
|
t.Logf("✅ Deletion summary: %d files deleted successfully out of %d attempted", successfulDeletions, deleteCount)
|
|
return deletedFileIds
|
|
}
|
|
|
|
// Helper functions for test setup
|
|
|
|
func setupTestCluster(t *testing.T) (*TestCluster, func()) {
|
|
// Create test cluster similar to existing integration tests
|
|
// This is a simplified version - in practice would start actual servers
|
|
cluster := &TestCluster{
|
|
masterAddress: "127.0.0.1:9333",
|
|
volumeServers: []string{
|
|
"127.0.0.1:8080",
|
|
"127.0.0.1:8081",
|
|
"127.0.0.1:8082",
|
|
"127.0.0.1:8083",
|
|
"127.0.0.1:8084",
|
|
"127.0.0.1:8085",
|
|
},
|
|
}
|
|
|
|
cleanup := func() {
|
|
// Cleanup cluster resources
|
|
t.Logf("Cleaning up test cluster")
|
|
}
|
|
|
|
return cluster, cleanup
|
|
}
|
|
|
|
func waitForClusterReady() error {
|
|
// Wait for test cluster to be ready
|
|
// In practice, this would ping the servers and wait for them to respond
|
|
time.Sleep(2 * time.Second)
|
|
return nil
|
|
}
|
|
|
|
type TestCluster struct {
|
|
masterAddress string
|
|
volumeServers []string
|
|
}
|