You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
445 lines
11 KiB
445 lines
11 KiB
//go:build foundationdb
|
|
// +build foundationdb
|
|
|
|
package foundationdb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer/foundationdb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
func TestFoundationDBStore_ConcurrentInserts(t *testing.T) {
|
|
store := createTestStore(t)
|
|
defer store.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
numGoroutines := 10
|
|
entriesPerGoroutine := 100
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numGoroutines*entriesPerGoroutine)
|
|
|
|
// Launch concurrent insert operations
|
|
for g := 0; g < numGoroutines; g++ {
|
|
wg.Add(1)
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < entriesPerGoroutine; i++ {
|
|
entry := &filer.Entry{
|
|
FullPath: util.NewFullPath("/concurrent", fmt.Sprintf("g%d_file%d.txt", goroutineID, i)),
|
|
Attr: filer.Attr{
|
|
Mode: 0644,
|
|
Uid: uint32(goroutineID),
|
|
Gid: 1000,
|
|
Mtime: time.Now(),
|
|
},
|
|
}
|
|
|
|
err := store.InsertEntry(ctx, entry)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("goroutine %d, entry %d: %v", goroutineID, i, err)
|
|
return
|
|
}
|
|
}
|
|
}(g)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
// Check for errors
|
|
for err := range errors {
|
|
t.Errorf("Concurrent insert error: %v", err)
|
|
}
|
|
|
|
// Verify all entries were inserted
|
|
expectedTotal := numGoroutines * entriesPerGoroutine
|
|
actualCount := 0
|
|
|
|
_, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool {
|
|
actualCount++
|
|
return true
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ListDirectoryEntries failed: %v", err)
|
|
}
|
|
|
|
if actualCount != expectedTotal {
|
|
t.Errorf("Expected %d entries, found %d", expectedTotal, actualCount)
|
|
}
|
|
}
|
|
|
|
func TestFoundationDBStore_ConcurrentReadsAndWrites(t *testing.T) {
|
|
store := createTestStore(t)
|
|
defer store.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
numReaders := 5
|
|
numWriters := 5
|
|
operationsPerGoroutine := 50
|
|
testFile := "/concurrent/rw_test_file.txt"
|
|
|
|
// Insert initial file
|
|
initialEntry := &filer.Entry{
|
|
FullPath: testFile,
|
|
Attr: filer.Attr{
|
|
Mode: 0644,
|
|
Uid: 1000,
|
|
Gid: 1000,
|
|
Mtime: time.Now(),
|
|
},
|
|
}
|
|
err := store.InsertEntry(ctx, initialEntry)
|
|
if err != nil {
|
|
t.Fatalf("Initial InsertEntry failed: %v", err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, (numReaders+numWriters)*operationsPerGoroutine)
|
|
|
|
// Launch reader goroutines
|
|
for r := 0; r < numReaders; r++ {
|
|
wg.Add(1)
|
|
go func(readerID int) {
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < operationsPerGoroutine; i++ {
|
|
_, err := store.FindEntry(ctx, testFile)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("reader %d, operation %d: %v", readerID, i, err)
|
|
return
|
|
}
|
|
|
|
// Small delay to allow interleaving with writes
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}(r)
|
|
}
|
|
|
|
// Launch writer goroutines
|
|
for w := 0; w < numWriters; w++ {
|
|
wg.Add(1)
|
|
go func(writerID int) {
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < operationsPerGoroutine; i++ {
|
|
entry := &filer.Entry{
|
|
FullPath: testFile,
|
|
Attr: filer.Attr{
|
|
Mode: 0644,
|
|
Uid: uint32(writerID + 1000),
|
|
Gid: uint32(i),
|
|
Mtime: time.Now(),
|
|
},
|
|
}
|
|
|
|
err := store.UpdateEntry(ctx, entry)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("writer %d, operation %d: %v", writerID, i, err)
|
|
return
|
|
}
|
|
|
|
// Small delay to allow interleaving with reads
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}(w)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
// Check for errors
|
|
for err := range errors {
|
|
t.Errorf("Concurrent read/write error: %v", err)
|
|
}
|
|
|
|
// Verify final state
|
|
finalEntry, err := store.FindEntry(ctx, testFile)
|
|
if err != nil {
|
|
t.Fatalf("Final FindEntry failed: %v", err)
|
|
}
|
|
|
|
if finalEntry.FullPath != testFile {
|
|
t.Errorf("Expected final path %s, got %s", testFile, finalEntry.FullPath)
|
|
}
|
|
}
|
|
|
|
func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) {
|
|
store := createTestStore(t)
|
|
defer store.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
numTransactions := 5
|
|
entriesPerTransaction := 10
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numTransactions)
|
|
successfulTx := make(chan int, numTransactions)
|
|
|
|
// Launch concurrent transactions
|
|
for tx := 0; tx < numTransactions; tx++ {
|
|
wg.Add(1)
|
|
go func(txID int) {
|
|
defer wg.Done()
|
|
|
|
// Note: FoundationDB has optimistic concurrency control
|
|
// Some transactions may need to retry due to conflicts
|
|
maxRetries := 3
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
txCtx, err := store.BeginTransaction(ctx)
|
|
if err != nil {
|
|
if attempt == maxRetries-1 {
|
|
errors <- fmt.Errorf("tx %d: failed to begin after %d attempts: %v", txID, maxRetries, err)
|
|
}
|
|
time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
// Insert multiple entries in transaction
|
|
success := true
|
|
for i := 0; i < entriesPerTransaction; i++ {
|
|
entry := &filer.Entry{
|
|
FullPath: util.NewFullPath("/transactions", fmt.Sprintf("tx%d_file%d.txt", txID, i)),
|
|
Attr: filer.Attr{
|
|
Mode: 0644,
|
|
Uid: uint32(txID),
|
|
Gid: uint32(i),
|
|
Mtime: time.Now(),
|
|
},
|
|
}
|
|
|
|
err = store.InsertEntry(txCtx, entry)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("tx %d, entry %d: insert failed: %v", txID, i, err)
|
|
store.RollbackTransaction(txCtx)
|
|
success = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if success {
|
|
err = store.CommitTransaction(txCtx)
|
|
if err != nil {
|
|
if attempt == maxRetries-1 {
|
|
errors <- fmt.Errorf("tx %d: commit failed after %d attempts: %v", txID, maxRetries, err)
|
|
}
|
|
time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
|
|
continue
|
|
}
|
|
successfulTx <- txID
|
|
return
|
|
}
|
|
}
|
|
}(tx)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
close(successfulTx)
|
|
|
|
// Check for errors
|
|
for err := range errors {
|
|
t.Errorf("Concurrent transaction error: %v", err)
|
|
}
|
|
|
|
// Count successful transactions
|
|
successCount := 0
|
|
successfulTxIDs := make([]int, 0)
|
|
for txID := range successfulTx {
|
|
successCount++
|
|
successfulTxIDs = append(successfulTxIDs, txID)
|
|
}
|
|
|
|
t.Logf("Successful transactions: %d/%d (IDs: %v)", successCount, numTransactions, successfulTxIDs)
|
|
|
|
// Verify entries from successful transactions
|
|
totalExpectedEntries := successCount * entriesPerTransaction
|
|
actualCount := 0
|
|
|
|
_, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool {
|
|
actualCount++
|
|
return true
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("ListDirectoryEntries failed: %v", err)
|
|
}
|
|
|
|
if actualCount != totalExpectedEntries {
|
|
t.Errorf("Expected %d entries from successful transactions, found %d", totalExpectedEntries, actualCount)
|
|
}
|
|
}
|
|
|
|
func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) {
|
|
store := createTestStore(t)
|
|
defer store.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
numWorkers := 10
|
|
directoriesPerWorker := 20
|
|
filesPerDirectory := 5
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numWorkers*directoriesPerWorker*filesPerDirectory)
|
|
|
|
// Launch workers that create directories with files
|
|
for w := 0; w < numWorkers; w++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
for d := 0; d < directoriesPerWorker; d++ {
|
|
dirPath := fmt.Sprintf("/worker%d/dir%d", workerID, d)
|
|
|
|
// Create files in directory
|
|
for f := 0; f < filesPerDirectory; f++ {
|
|
entry := &filer.Entry{
|
|
FullPath: util.NewFullPath(dirPath, fmt.Sprintf("file%d.txt", f)),
|
|
Attr: filer.Attr{
|
|
Mode: 0644,
|
|
Uid: uint32(workerID),
|
|
Gid: uint32(d),
|
|
Mtime: time.Now(),
|
|
},
|
|
}
|
|
|
|
err := store.InsertEntry(ctx, entry)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("worker %d, dir %d, file %d: %v", workerID, d, f, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}(w)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
// Check for errors
|
|
for err := range errors {
|
|
t.Errorf("Concurrent directory operation error: %v", err)
|
|
}
|
|
|
|
// Verify directory structure
|
|
for w := 0; w < numWorkers; w++ {
|
|
for d := 0; d < directoriesPerWorker; d++ {
|
|
dirPath := fmt.Sprintf("/worker%d/dir%d", w, d)
|
|
|
|
fileCount := 0
|
|
_, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool {
|
|
fileCount++
|
|
return true
|
|
})
|
|
if err != nil {
|
|
t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err)
|
|
continue
|
|
}
|
|
|
|
if fileCount != filesPerDirectory {
|
|
t.Errorf("Expected %d files in %s, found %d", filesPerDirectory, dirPath, fileCount)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFoundationDBStore_ConcurrentKVOperations(t *testing.T) {
|
|
store := createTestStore(t)
|
|
defer store.Shutdown()
|
|
|
|
ctx := context.Background()
|
|
numWorkers := 8
|
|
operationsPerWorker := 100
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numWorkers*operationsPerWorker)
|
|
|
|
// Launch workers performing KV operations
|
|
for w := 0; w < numWorkers; w++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < operationsPerWorker; i++ {
|
|
key := []byte(fmt.Sprintf("worker%d_key%d", workerID, i))
|
|
value := []byte(fmt.Sprintf("worker%d_value%d_timestamp%d", workerID, i, time.Now().UnixNano()))
|
|
|
|
// Put operation
|
|
err := store.KvPut(ctx, key, value)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("worker %d, operation %d: KvPut failed: %v", workerID, i, err)
|
|
continue
|
|
}
|
|
|
|
// Get operation
|
|
retrievedValue, err := store.KvGet(ctx, key)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("worker %d, operation %d: KvGet failed: %v", workerID, i, err)
|
|
continue
|
|
}
|
|
|
|
if string(retrievedValue) != string(value) {
|
|
errors <- fmt.Errorf("worker %d, operation %d: value mismatch", workerID, i)
|
|
continue
|
|
}
|
|
|
|
// Delete operation (for some keys)
|
|
if i%5 == 0 {
|
|
err = store.KvDelete(ctx, key)
|
|
if err != nil {
|
|
errors <- fmt.Errorf("worker %d, operation %d: KvDelete failed: %v", workerID, i, err)
|
|
}
|
|
}
|
|
}
|
|
}(w)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
// Check for errors
|
|
errorCount := 0
|
|
for err := range errors {
|
|
t.Errorf("Concurrent KV operation error: %v", err)
|
|
errorCount++
|
|
}
|
|
|
|
if errorCount > 0 {
|
|
t.Errorf("Total errors in concurrent KV operations: %d", errorCount)
|
|
}
|
|
}
|
|
|
|
func createTestStore(t *testing.T) *foundationdb.FoundationDBStore {
|
|
// Skip test if FoundationDB cluster file doesn't exist
|
|
clusterFile := os.Getenv("FDB_CLUSTER_FILE")
|
|
if clusterFile == "" {
|
|
clusterFile = "/var/fdb/config/fdb.cluster"
|
|
}
|
|
|
|
if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
|
|
t.Skip("FoundationDB cluster file not found, skipping test")
|
|
}
|
|
|
|
config := util.NewViper()
|
|
config.Set("foundationdb.cluster_file", clusterFile)
|
|
config.Set("foundationdb.api_version", 720)
|
|
config.Set("foundationdb.timeout", "10s")
|
|
config.Set("foundationdb.max_retry_delay", "2s")
|
|
config.Set("foundationdb.directory_prefix", fmt.Sprintf("seaweedfs_concurrent_test_%d", time.Now().UnixNano()))
|
|
|
|
store := &foundationdb.FoundationDBStore{}
|
|
err := store.Initialize(config, "foundationdb.")
|
|
if err != nil {
|
|
t.Fatalf("Failed to initialize FoundationDB store: %v", err)
|
|
}
|
|
|
|
return store
|
|
}
|