You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

369 lines
9.0 KiB

//go:build foundationdb
// +build foundationdb
package foundationdb
import (
"context"
"os"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/foundationdb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestFoundationDBStore_BasicOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
// Test InsertEntry
entry := &filer.Entry{
FullPath: "/test/file1.txt",
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
err := store.InsertEntry(ctx, entry)
if err != nil {
t.Fatalf("InsertEntry failed: %v", err)
}
// Test FindEntry
foundEntry, err := store.FindEntry(ctx, "/test/file1.txt")
if err != nil {
t.Fatalf("FindEntry failed: %v", err)
}
if foundEntry.FullPath != entry.FullPath {
t.Errorf("Expected path %s, got %s", entry.FullPath, foundEntry.FullPath)
}
if foundEntry.Attr.Mode != entry.Attr.Mode {
t.Errorf("Expected mode %o, got %o", entry.Attr.Mode, foundEntry.Attr.Mode)
}
// Test UpdateEntry
foundEntry.Attr.Mode = 0755
err = store.UpdateEntry(ctx, foundEntry)
if err != nil {
t.Fatalf("UpdateEntry failed: %v", err)
}
updatedEntry, err := store.FindEntry(ctx, "/test/file1.txt")
if err != nil {
t.Fatalf("FindEntry after update failed: %v", err)
}
if updatedEntry.Attr.Mode != 0755 {
t.Errorf("Expected updated mode 0755, got %o", updatedEntry.Attr.Mode)
}
// Test DeleteEntry
err = store.DeleteEntry(ctx, "/test/file1.txt")
if err != nil {
t.Fatalf("DeleteEntry failed: %v", err)
}
_, err = store.FindEntry(ctx, "/test/file1.txt")
if err == nil {
t.Error("Expected entry to be deleted, but it was found")
}
if err != filer_pb.ErrNotFound {
t.Errorf("Expected ErrNotFound, got %v", err)
}
}
func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
// Create multiple entries in a directory
testDir := "/test/dir"
files := []string{"file1.txt", "file2.txt", "file3.txt", "subdir/"}
for _, fileName := range files {
entry := &filer.Entry{
FullPath: util.NewFullPath(testDir, fileName),
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
if fileName == "subdir/" {
entry.Attr.Mode = 0755 | os.ModeDir
}
err := store.InsertEntry(ctx, entry)
if err != nil {
t.Fatalf("InsertEntry failed for %s: %v", fileName, err)
}
}
// Test ListDirectoryEntries
var listedFiles []string
lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
listedFiles = append(listedFiles, entry.Name())
return true
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
}
t.Logf("Last file name: %s", lastFileName)
t.Logf("Listed files: %v", listedFiles)
if len(listedFiles) != len(files) {
t.Errorf("Expected %d files, got %d", len(files), len(listedFiles))
}
// Test ListDirectoryPrefixedEntries
var prefixedFiles []string
_, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) bool {
prefixedFiles = append(prefixedFiles, entry.Name())
return true
})
if err != nil {
t.Fatalf("ListDirectoryPrefixedEntries failed: %v", err)
}
expectedPrefixedCount := 3 // file1.txt, file2.txt, file3.txt
if len(prefixedFiles) != expectedPrefixedCount {
t.Errorf("Expected %d prefixed files, got %d: %v", expectedPrefixedCount, len(prefixedFiles), prefixedFiles)
}
// Test DeleteFolderChildren
err = store.DeleteFolderChildren(ctx, testDir)
if err != nil {
t.Fatalf("DeleteFolderChildren failed: %v", err)
}
// Verify children are deleted
var remainingFiles []string
_, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
remainingFiles = append(remainingFiles, entry.Name())
return true
})
if err != nil {
t.Fatalf("ListDirectoryEntries after delete failed: %v", err)
}
if len(remainingFiles) != 0 {
t.Errorf("Expected no files after DeleteFolderChildren, got %d: %v", len(remainingFiles), remainingFiles)
}
}
func TestFoundationDBStore_TransactionOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
// Begin transaction
txCtx, err := store.BeginTransaction(ctx)
if err != nil {
t.Fatalf("BeginTransaction failed: %v", err)
}
// Insert entry in transaction
entry := &filer.Entry{
FullPath: "/test/tx_file.txt",
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
err = store.InsertEntry(txCtx, entry)
if err != nil {
t.Fatalf("InsertEntry in transaction failed: %v", err)
}
// Entry should not be visible outside transaction yet
_, err = store.FindEntry(ctx, "/test/tx_file.txt")
if err == nil {
t.Error("Entry should not be visible before transaction commit")
}
// Commit transaction
err = store.CommitTransaction(txCtx)
if err != nil {
t.Fatalf("CommitTransaction failed: %v", err)
}
// Entry should now be visible
foundEntry, err := store.FindEntry(ctx, "/test/tx_file.txt")
if err != nil {
t.Fatalf("FindEntry after commit failed: %v", err)
}
if foundEntry.FullPath != entry.FullPath {
t.Errorf("Expected path %s, got %s", entry.FullPath, foundEntry.FullPath)
}
// Test rollback
txCtx2, err := store.BeginTransaction(ctx)
if err != nil {
t.Fatalf("BeginTransaction for rollback test failed: %v", err)
}
entry2 := &filer.Entry{
FullPath: "/test/rollback_file.txt",
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
err = store.InsertEntry(txCtx2, entry2)
if err != nil {
t.Fatalf("InsertEntry for rollback test failed: %v", err)
}
// Rollback transaction
err = store.RollbackTransaction(txCtx2)
if err != nil {
t.Fatalf("RollbackTransaction failed: %v", err)
}
// Entry should not exist after rollback
_, err = store.FindEntry(ctx, "/test/rollback_file.txt")
if err == nil {
t.Error("Entry should not exist after rollback")
}
if err != filer_pb.ErrNotFound {
t.Errorf("Expected ErrNotFound after rollback, got %v", err)
}
}
func TestFoundationDBStore_KVOperations(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
// Test KvPut
key := []byte("test_key")
value := []byte("test_value")
err := store.KvPut(ctx, key, value)
if err != nil {
t.Fatalf("KvPut failed: %v", err)
}
// Test KvGet
retrievedValue, err := store.KvGet(ctx, key)
if err != nil {
t.Fatalf("KvGet failed: %v", err)
}
if string(retrievedValue) != string(value) {
t.Errorf("Expected value %s, got %s", value, retrievedValue)
}
// Test KvDelete
err = store.KvDelete(ctx, key)
if err != nil {
t.Fatalf("KvDelete failed: %v", err)
}
// Verify key is deleted
_, err = store.KvGet(ctx, key)
if err == nil {
t.Error("Expected key to be deleted")
}
if err != filer.ErrKvNotFound {
t.Errorf("Expected ErrKvNotFound, got %v", err)
}
}
func TestFoundationDBStore_LargeEntry(t *testing.T) {
store := createTestStore(t)
defer store.Shutdown()
ctx := context.Background()
// Create entry with many chunks (to test compression)
entry := &filer.Entry{
FullPath: "/test/large_file.txt",
Attr: filer.Attr{
Mode: 0644,
Uid: 1000,
Gid: 1000,
Mtime: time.Now(),
},
}
// Add many chunks to trigger compression
for i := 0; i < filer.CountEntryChunksForGzip+10; i++ {
chunk := &filer_pb.FileChunk{
FileId: util.Uint64toHex(uint64(i)),
Offset: int64(i * 1024),
Size: 1024,
}
entry.Chunks = append(entry.Chunks, chunk)
}
err := store.InsertEntry(ctx, entry)
if err != nil {
t.Fatalf("InsertEntry with large chunks failed: %v", err)
}
// Retrieve and verify
foundEntry, err := store.FindEntry(ctx, "/test/large_file.txt")
if err != nil {
t.Fatalf("FindEntry for large file failed: %v", err)
}
if len(foundEntry.Chunks) != len(entry.Chunks) {
t.Errorf("Expected %d chunks, got %d", len(entry.Chunks), len(foundEntry.Chunks))
}
// Verify some chunk data
if foundEntry.Chunks[0].FileId != entry.Chunks[0].FileId {
t.Errorf("Expected first chunk FileId %s, got %s", entry.Chunks[0].FileId, foundEntry.Chunks[0].FileId)
}
}
func createTestStore(t *testing.T) *foundationdb.FoundationDBStore {
// Skip test if FoundationDB cluster file doesn't exist
clusterFile := os.Getenv("FDB_CLUSTER_FILE")
if clusterFile == "" {
clusterFile = "/var/fdb/config/fdb.cluster"
}
if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
t.Skip("FoundationDB cluster file not found, skipping test")
}
config := util.NewViper()
config.Set("foundationdb.cluster_file", clusterFile)
config.Set("foundationdb.api_version", 630)
config.Set("foundationdb.timeout", "10s")
config.Set("foundationdb.max_retry_delay", "2s")
config.Set("foundationdb.directory_prefix", "seaweedfs_test")
store := &foundationdb.FoundationDBStore{}
err := store.Initialize(config, "foundationdb.")
if err != nil {
t.Fatalf("Failed to initialize FoundationDB store: %v", err)
}
return store
}