You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

341 lines
10 KiB

package main
import (
"bytes"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/stretchr/testify/require"
)
// TestFileGeneration tests generating 600 files of 100KB each targeting volume 1 with hardcoded cookie
func TestFileGeneration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping file generation test in short mode")
}
// Set up test cluster
cluster, cleanup := setupTestCluster(t)
defer cleanup()
// Wait for cluster to be ready
require.NoError(t, waitForClusterReady())
t.Logf("Test cluster ready with master at %s", cluster.masterAddress)
// Generate 600 files of 100KB each targeting volume 1
const targetVolumeId = needle.VolumeId(1)
const fileCount = 600
const fileSize = 100 * 1024 // 100KB files
fileIds := generateFilesToVolume1(t, fileCount, fileSize)
t.Logf("Generated %d files of %dKB each targeting volume %d", len(fileIds), fileSize/1024, targetVolumeId)
t.Logf("📝 Sample file IDs created: %v", fileIds[:5]) // Show first 5 file IDs
// Summary
t.Logf("📊 File Generation Summary:")
t.Logf(" • Volume ID: %d", targetVolumeId)
t.Logf(" • Total files created: %d", len(fileIds))
t.Logf(" • File size: %dKB each", fileSize/1024)
t.Logf(" • Hardcoded cookie: 0x12345678")
}
// TestFileDeletion tests deleting exactly 300 files from volume 1
func TestFileDeletion(t *testing.T) {
if testing.Short() {
t.Skip("Skipping file deletion test in short mode")
}
// Set up test cluster
cluster, cleanup := setupTestCluster(t)
defer cleanup()
// Wait for cluster to be ready
require.NoError(t, waitForClusterReady())
t.Logf("Test cluster ready with master at %s", cluster.masterAddress)
// First generate some files to delete
const targetVolumeId = needle.VolumeId(1)
const fileCount = 600
const fileSize = 100 * 1024 // 100KB files
t.Logf("Pre-generating files for deletion test...")
fileIds := generateFilesToVolume1(t, fileCount, fileSize)
t.Logf("Pre-generated %d files for deletion test", len(fileIds))
// Delete exactly 300 files from the volume
const deleteCount = 300
deletedFileIds := deleteSpecificFilesFromVolume(t, fileIds, deleteCount)
t.Logf("Deleted exactly %d files from volume %d", len(deletedFileIds), targetVolumeId)
t.Logf("🗑️ Sample deleted file IDs: %v", deletedFileIds[:5]) // Show first 5 deleted file IDs
// Summary
t.Logf("📊 File Deletion Summary:")
t.Logf(" • Volume ID: %d", targetVolumeId)
t.Logf(" • Files available for deletion: %d", len(fileIds))
t.Logf(" • Files deleted: %d", len(deletedFileIds))
t.Logf(" • Files remaining: %d", len(fileIds)-len(deletedFileIds))
t.Logf(" • Deletion success rate: %.1f%%", float64(len(deletedFileIds))/float64(deleteCount)*100)
}
// generateFilesToVolume1 creates 600 files of 100KB each targeting volume 1 with hardcoded cookie
func generateFilesToVolume1(t *testing.T, fileCount int, fileSize int) []string {
const targetVolumeId = needle.VolumeId(1)
const hardcodedCookie = uint32(0x12345678) // Hardcoded cookie for volume 1 files
var fileIds []string
t.Logf("Starting generation of %d files of %dKB each targeting volume %d", fileCount, fileSize/1024, targetVolumeId)
for i := 0; i < fileCount; i++ {
// Generate file content
fileData := make([]byte, fileSize)
rand.Read(fileData)
// Generate file ID targeting volume 1 with hardcoded cookie
// Use high needle key values to avoid collisions with assigned IDs
needleKey := uint64(i) + 0x2000000 // Start from a high offset for volume 1
generatedFid := needle.NewFileId(targetVolumeId, needleKey, hardcodedCookie)
// Upload directly to volume 1
err := uploadFileToVolumeDirectly(t, generatedFid, fileData)
require.NoError(t, err)
fileIds = append(fileIds, generatedFid.String())
// Log progress for first few files and every 50th file
if i < 5 || (i+1)%50 == 0 {
t.Logf("✅ Generated file %d/%d targeting volume %d: %s", i+1, fileCount, targetVolumeId, generatedFid.String())
}
}
t.Logf("✅ Successfully generated %d files targeting volume %d with hardcoded cookie 0x%08x", len(fileIds), targetVolumeId, hardcodedCookie)
return fileIds
}
// uploadFileToVolume uploads file data to an assigned volume server
func uploadFileToVolume(serverUrl, fid string, data []byte) error {
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid)
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// uploadFileToVolumeDirectly uploads a file using a generated file ID
func uploadFileToVolumeDirectly(t *testing.T, fid *needle.FileId, data []byte) error {
// Find the volume server hosting this volume
locations, found := findVolumeLocations(fid.VolumeId)
if !found {
return fmt.Errorf("volume %d not found", fid.VolumeId)
}
// Upload to the first available location
serverUrl := locations[0]
uploadUrl := fmt.Sprintf("http://%s/%s", serverUrl, fid.String())
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("direct upload failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// findVolumeLocations finds the server locations for a given volume using HTTP lookup
func findVolumeLocations(volumeId needle.VolumeId) ([]string, bool) {
// Query master for volume locations using HTTP API
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:9333/dir/lookup?volumeId=%d", volumeId))
if err != nil {
return nil, false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, false
}
// Parse JSON response
type LookupResult struct {
VolumeId string `json:"volumeId"`
Locations []struct {
Url string `json:"url"`
PublicUrl string `json:"publicUrl"`
} `json:"locations"`
Error string `json:"error"`
}
var result LookupResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
// Fallback to default locations for testing
return []string{"127.0.0.1:8080"}, true
}
if result.Error != "" {
return nil, false
}
var serverUrls []string
for _, location := range result.Locations {
// Convert Docker container hostnames to localhost with mapped ports
url := convertDockerHostnameToLocalhost(location.Url)
serverUrls = append(serverUrls, url)
}
if len(serverUrls) == 0 {
// Fallback to default for testing
return []string{"127.0.0.1:8080"}, true
}
return serverUrls, true
}
// convertDockerHostnameToLocalhost converts Docker container hostnames to localhost with mapped ports
func convertDockerHostnameToLocalhost(dockerUrl string) string {
// Map Docker container hostnames to localhost ports
hostPortMap := map[string]string{
"volume1:8080": "127.0.0.1:8080",
"volume2:8080": "127.0.0.1:8081",
"volume3:8080": "127.0.0.1:8082",
"volume4:8080": "127.0.0.1:8083",
"volume5:8080": "127.0.0.1:8084",
"volume6:8080": "127.0.0.1:8085",
}
if localhost, exists := hostPortMap[dockerUrl]; exists {
return localhost
}
// If not in map, return as-is (might be already localhost)
return dockerUrl
}
// deleteSpecificFilesFromVolume deletes exactly the specified number of files from the volume
func deleteSpecificFilesFromVolume(t *testing.T, fileIds []string, deleteCount int) []string {
var deletedFileIds []string
successfulDeletions := 0
if deleteCount > len(fileIds) {
deleteCount = len(fileIds)
}
t.Logf("🗑️ Starting deletion of exactly %d files out of %d total files", deleteCount, len(fileIds))
for i := 0; i < deleteCount; i++ {
fileId := fileIds[i]
// Parse file ID to get volume server location
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
t.Logf("Failed to parse file ID %s: %v", fileId, err)
continue
}
// Find volume server hosting this file
locations, found := findVolumeLocations(fid.VolumeId)
if !found {
t.Logf("Volume locations not found for file %s", fileId)
continue
}
// Delete file from volume server
deleteUrl := fmt.Sprintf("http://%s/%s", locations[0], fileId)
req, err := http.NewRequest("DELETE", deleteUrl, nil)
if err != nil {
t.Logf("Failed to create delete request for file %s: %v", fileId, err)
continue
}
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
t.Logf("Failed to delete file %s: %v", fileId, err)
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusAccepted || resp.StatusCode == http.StatusOK {
successfulDeletions++
deletedFileIds = append(deletedFileIds, fileId)
// Log progress for first few files and every 25th deletion
if i < 5 || (i+1)%25 == 0 {
t.Logf("🗑️ Deleted file %d/%d: %s (status: %d)", i+1, deleteCount, fileId, resp.StatusCode)
}
} else {
t.Logf("❌ Delete failed for file %s with status %d", fileId, resp.StatusCode)
}
}
t.Logf("✅ Deletion summary: %d files deleted successfully out of %d attempted", successfulDeletions, deleteCount)
return deletedFileIds
}
// Helper functions for test setup
func setupTestCluster(t *testing.T) (*TestCluster, func()) {
// Create test cluster similar to existing integration tests
// This is a simplified version - in practice would start actual servers
cluster := &TestCluster{
masterAddress: "127.0.0.1:9333",
volumeServers: []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8082",
"127.0.0.1:8083",
"127.0.0.1:8084",
"127.0.0.1:8085",
},
}
cleanup := func() {
// Cleanup cluster resources
t.Logf("Cleaning up test cluster")
}
return cluster, cleanup
}
func waitForClusterReady() error {
// Wait for test cluster to be ready
// In practice, this would ping the servers and wait for them to respond
time.Sleep(2 * time.Second)
return nil
}
type TestCluster struct {
masterAddress string
volumeServers []string
}