You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

445 lines
11 KiB

//go:build foundationdb
// +build foundationdb
package foundationdb
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/foundationdb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestFoundationDBStore_ConcurrentInserts(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
numGoroutines := 10
entriesPerGoroutine := 100
var wg sync.WaitGroup
errors := make(chan error, numGoroutines*entriesPerGoroutine)
// Launch concurrent insert operations
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for i := 0; i < entriesPerGoroutine; i++ {
entry := &filer.Entry{
FullPath: util.NewFullPath("/concurrent", fmt.Sprintf("g%d_file%d.txt", goroutineID, i)),
Attr: filer.Attr{
Mode: 0644,
Uid: uint32(goroutineID),
Gid: 1000,
Mtime: time.Now(),
},
}
err := store.InsertEntry(ctx, entry)
if err != nil {
errors <- fmt.Errorf("goroutine %d, entry %d: %v", goroutineID, i, err)
return
}
}
}(g)
}
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
t.Errorf("Concurrent insert error: %v", err)
}
// Verify all entries were inserted
expectedTotal := numGoroutines * entriesPerGoroutine
actualCount := 0
_, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool {
actualCount++
return true
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
}
if actualCount != expectedTotal {
t.Errorf("Expected %d entries, found %d", expectedTotal, actualCount)
}
}
func TestFoundationDBStore_ConcurrentReadsAndWrites(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
numReaders := 5
numWriters := 5
operationsPerGoroutine := 50
testFile := "/concurrent/rw_test_file.txt"
// Insert initial file
initialEntry := &filer.Entry{
FullPath: testFile,
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
err := store.InsertEntry(ctx, initialEntry)
if err != nil {
t.Fatalf("Initial InsertEntry failed: %v", err)
}
var wg sync.WaitGroup
errors := make(chan error, (numReaders+numWriters)*operationsPerGoroutine)
// Launch reader goroutines
for r := 0; r < numReaders; r++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for i := 0; i < operationsPerGoroutine; i++ {
_, err := store.FindEntry(ctx, testFile)
if err != nil {
errors <- fmt.Errorf("reader %d, operation %d: %v", readerID, i, err)
return
}
// Small delay to allow interleaving with writes
time.Sleep(1 * time.Millisecond)
}
}(r)
}
// Launch writer goroutines
for w := 0; w < numWriters; w++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for i := 0; i < operationsPerGoroutine; i++ {
entry := &filer.Entry{
FullPath: testFile,
Attr: filer.Attr{
Mode: 0644,
Uid: uint32(writerID + 1000),
Gid: uint32(i),
Mtime: time.Now(),
},
}
err := store.UpdateEntry(ctx, entry)
if err != nil {
errors <- fmt.Errorf("writer %d, operation %d: %v", writerID, i, err)
return
}
// Small delay to allow interleaving with reads
time.Sleep(1 * time.Millisecond)
}
}(w)
}
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
t.Errorf("Concurrent read/write error: %v", err)
}
// Verify final state
finalEntry, err := store.FindEntry(ctx, testFile)
if err != nil {
t.Fatalf("Final FindEntry failed: %v", err)
}
if finalEntry.FullPath != testFile {
t.Errorf("Expected final path %s, got %s", testFile, finalEntry.FullPath)
}
}
func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
numTransactions := 5
entriesPerTransaction := 10
var wg sync.WaitGroup
errors := make(chan error, numTransactions)
successfulTx := make(chan int, numTransactions)
// Launch concurrent transactions
for tx := 0; tx < numTransactions; tx++ {
wg.Add(1)
go func(txID int) {
defer wg.Done()
// Note: FoundationDB has optimistic concurrency control
// Some transactions may need to retry due to conflicts
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
txCtx, err := store.BeginTransaction(ctx)
if err != nil {
if attempt == maxRetries-1 {
errors <- fmt.Errorf("tx %d: failed to begin after %d attempts: %v", txID, maxRetries, err)
}
time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
continue
}
// Insert multiple entries in transaction
success := true
for i := 0; i < entriesPerTransaction; i++ {
entry := &filer.Entry{
FullPath: util.NewFullPath("/transactions", fmt.Sprintf("tx%d_file%d.txt", txID, i)),
Attr: filer.Attr{
Mode: 0644,
Uid: uint32(txID),
Gid: uint32(i),
Mtime: time.Now(),
},
}
err = store.InsertEntry(txCtx, entry)
if err != nil {
errors <- fmt.Errorf("tx %d, entry %d: insert failed: %v", txID, i, err)
store.RollbackTransaction(txCtx)
success = false
break
}
}
if success {
err = store.CommitTransaction(txCtx)
if err != nil {
if attempt == maxRetries-1 {
errors <- fmt.Errorf("tx %d: commit failed after %d attempts: %v", txID, maxRetries, err)
}
time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
continue
}
successfulTx <- txID
return
}
}
}(tx)
}
wg.Wait()
close(errors)
close(successfulTx)
// Check for errors
for err := range errors {
t.Errorf("Concurrent transaction error: %v", err)
}
// Count successful transactions
successCount := 0
successfulTxIDs := make([]int, 0)
for txID := range successfulTx {
successCount++
successfulTxIDs = append(successfulTxIDs, txID)
}
t.Logf("Successful transactions: %d/%d (IDs: %v)", successCount, numTransactions, successfulTxIDs)
// Verify entries from successful transactions
totalExpectedEntries := successCount * entriesPerTransaction
actualCount := 0
_, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool {
actualCount++
return true
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
}
if actualCount != totalExpectedEntries {
t.Errorf("Expected %d entries from successful transactions, found %d", totalExpectedEntries, actualCount)
}
}
func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
numWorkers := 10
directoriesPerWorker := 20
filesPerDirectory := 5
var wg sync.WaitGroup
errors := make(chan error, numWorkers*directoriesPerWorker*filesPerDirectory)
// Launch workers that create directories with files
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for d := 0; d < directoriesPerWorker; d++ {
dirPath := fmt.Sprintf("/worker%d/dir%d", workerID, d)
// Create files in directory
for f := 0; f < filesPerDirectory; f++ {
entry := &filer.Entry{
FullPath: util.NewFullPath(dirPath, fmt.Sprintf("file%d.txt", f)),
Attr: filer.Attr{
Mode: 0644,
Uid: uint32(workerID),
Gid: uint32(d),
Mtime: time.Now(),
},
}
err := store.InsertEntry(ctx, entry)
if err != nil {
errors <- fmt.Errorf("worker %d, dir %d, file %d: %v", workerID, d, f, err)
return
}
}
}
}(w)
}
wg.Wait()
close(errors)
// Check for errors
for err := range errors {
t.Errorf("Concurrent directory operation error: %v", err)
}
// Verify directory structure
for w := 0; w < numWorkers; w++ {
for d := 0; d < directoriesPerWorker; d++ {
dirPath := fmt.Sprintf("/worker%d/dir%d", w, d)
fileCount := 0
_, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool {
fileCount++
return true
})
if err != nil {
t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err)
continue
}
if fileCount != filesPerDirectory {
t.Errorf("Expected %d files in %s, found %d", filesPerDirectory, dirPath, fileCount)
}
}
}
}
func TestFoundationDBStore_ConcurrentKVOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
numWorkers := 8
operationsPerWorker := 100
var wg sync.WaitGroup
errors := make(chan error, numWorkers*operationsPerWorker)
// Launch workers performing KV operations
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for i := 0; i < operationsPerWorker; i++ {
key := []byte(fmt.Sprintf("worker%d_key%d", workerID, i))
value := []byte(fmt.Sprintf("worker%d_value%d_timestamp%d", workerID, i, time.Now().UnixNano()))
// Put operation
err := store.KvPut(ctx, key, value)
if err != nil {
errors <- fmt.Errorf("worker %d, operation %d: KvPut failed: %v", workerID, i, err)
continue
}
// Get operation
retrievedValue, err := store.KvGet(ctx, key)
if err != nil {
errors <- fmt.Errorf("worker %d, operation %d: KvGet failed: %v", workerID, i, err)
continue
}
if string(retrievedValue) != string(value) {
errors <- fmt.Errorf("worker %d, operation %d: value mismatch", workerID, i)
continue
}
// Delete operation (for some keys)
if i%5 == 0 {
err = store.KvDelete(ctx, key)
if err != nil {
errors <- fmt.Errorf("worker %d, operation %d: KvDelete failed: %v", workerID, i, err)
}
}
}
}(w)
}
wg.Wait()
close(errors)
// Check for errors
errorCount := 0
for err := range errors {
t.Errorf("Concurrent KV operation error: %v", err)
errorCount++
}
if errorCount > 0 {
t.Errorf("Total errors in concurrent KV operations: %d", errorCount)
}
}
func createTestStore(t *testing.T) *foundationdb.FoundationDBStore {
// Skip test if FoundationDB cluster file doesn't exist
clusterFile := os.Getenv("FDB_CLUSTER_FILE")
if clusterFile == "" {
clusterFile = "/var/fdb/config/fdb.cluster"
}
if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
t.Skip("FoundationDB cluster file not found, skipping test")
}
config := util.NewViper()
config.Set("foundationdb.cluster_file", clusterFile)
config.Set("foundationdb.api_version", 720)
config.Set("foundationdb.timeout", "10s")
config.Set("foundationdb.max_retry_delay", "2s")
config.Set("foundationdb.directory_prefix", fmt.Sprintf("seaweedfs_concurrent_test_%d", time.Now().UnixNano()))
store := &foundationdb.FoundationDBStore{}
err := store.Initialize(config, "foundationdb.")
if err != nil {
t.Fatalf("Failed to initialize FoundationDB store: %v", err)
}
return store
}