diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 25b071e7d..52bc8c80a 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -38,13 +38,13 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { return dirtyPages } -func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { +func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) error { pages.hasWrites = true glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) - pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs) + _, err := pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs) - return + return err } func (pages *ChunkedDirtyPages) FlushData() error { diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 58ae03cda..a95e19160 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -29,21 +29,24 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { return pw } -func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { +func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) error { glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs) + if err := pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs); err != nil { + return err + } offset += writeSize data = data[writeSize:] } + return nil } -func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) { - pw.randomWriter.AddPage(offset, data, isSequential, tsNs) +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) error { + return pw.randomWriter.AddPage(offset, data, isSequential, tsNs) } func (pw *PageWriter) FlushData() error { diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go index 7cddcf69e..cec365231 100644 --- a/weed/mount/page_writer/dirty_pages.go +++ b/weed/mount/page_writer/dirty_pages.go @@ -1,7 +1,7 @@ package page_writer type DirtyPages interface { - AddPage(offset int64, data []byte, isSequential bool, tsNs int64) + AddPage(offset int64, data []byte, isSequential bool, tsNs int64) error FlushData() error ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) Destroy() diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index dd9781b68..c37a9e2ca 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -1,12 +1,13 @@ package page_writer import ( - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "os" "sync" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" ) var ( @@ -42,23 +43,34 @@ func NewSwapFile(dir string, chunkSize int64) *SwapFile { } } func (sf *SwapFile) FreeResource() { + sf.chunkTrackingLock.Lock() + defer sf.chunkTrackingLock.Unlock() if sf.file != nil { sf.file.Close() os.Remove(sf.file.Name()) + sf.file = nil } } func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { + sf.chunkTrackingLock.Lock() + defer sf.chunkTrackingLock.Unlock() + if sf.file == nil { var err error sf.file, err = os.CreateTemp(sf.dir, "") + if os.IsNotExist(err) { + if mkdirErr := os.MkdirAll(sf.dir, 0700); mkdirErr != nil { + glog.Errorf("create/recreate swap directory %s: %v", sf.dir, mkdirErr) + return nil + } + sf.file, err = os.CreateTemp(sf.dir, "") + } if err != nil { - glog.Errorf("create swap file: %v", err) + glog.Errorf("create swap file in %s: %v", sf.dir, err) return nil } } - sf.chunkTrackingLock.Lock() - defer sf.chunkTrackingLock.Unlock() sf.activeChunkCount++ diff --git a/weed/mount/page_writer/page_chunk_swapfile_test.go b/weed/mount/page_writer/page_chunk_swapfile_test.go new file mode 100644 index 000000000..4c3210b77 --- /dev/null +++ b/weed/mount/page_writer/page_chunk_swapfile_test.go @@ -0,0 +1,105 @@ +package page_writer + +import ( + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" +) + +func TestSwapFile_NewSwapFileChunk_Concurrent(t *testing.T) { + tempDir, err := os.MkdirTemp("", "seaweedfs_swap_test") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + sf := NewSwapFile(filepath.Join(tempDir, "swap"), 1024) + defer sf.FreeResource() + + var wg sync.WaitGroup + var failures atomic.Uint32 + numConcurrent := 10 + for i := 0; i < numConcurrent; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + chunk := sf.NewSwapFileChunk(LogicChunkIndex(idx)) + if chunk == nil { + failures.Add(1) + } + }(i) + } + wg.Wait() + + if failures.Load() > 0 { + t.Errorf("failed to create %d chunks", failures.Load()) + } + + if sf.file == nil { + t.Error("sf.file should not be nil") + } +} + +func TestSwapFile_MkdirAll_Permissions(t *testing.T) { + tempDir, err := os.MkdirTemp("", "seaweedfs_swap_perm_test") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + swapDir := filepath.Join(tempDir, "swap_subdir") + sf := NewSwapFile(swapDir, 1024) + defer sf.FreeResource() + + // This should trigger os.MkdirAll + if sf.NewSwapFileChunk(0) == nil { + t.Fatal("NewSwapFileChunk failed to create a chunk") + } + + info, err := os.Stat(swapDir) + if err != nil { + t.Fatalf("failed to stat swap dir: %v", err) + } + + if !info.IsDir() { + t.Errorf("expected %s to be a directory", swapDir) + } + + // Check permissions - should be 0700 + if info.Mode().Perm() != 0700 { + t.Errorf("expected permissions 0700, got %o", info.Mode().Perm()) + } +} + +func TestSwapFile_RecreateDir(t *testing.T) { + tempDir, err := os.MkdirTemp("", "seaweedfs_swap_recreate_test") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + swapDir := filepath.Join(tempDir, "swap_recreate") + sf := NewSwapFile(swapDir, 1024) + + // Create first chunk + if sf.NewSwapFileChunk(0) == nil { + t.Fatal("NewSwapFileChunk failed to create the first chunk") + } + sf.FreeResource() + + // Delete the directory + os.RemoveAll(swapDir) + + // Create second chunk - should recreate directory + chunk := sf.NewSwapFileChunk(1) + if chunk == nil { + t.Error("failed to create chunk after directory deletion") + } + + if _, err := os.Stat(swapDir); os.IsNotExist(err) { + t.Error("swap directory was not recreated") + } + sf.FreeResource() +} diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index bd7fc99dd..4a0f1867c 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -2,10 +2,11 @@ package page_writer import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" ) type LogicChunkIndex int @@ -55,7 +56,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, return t } -func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) { +func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int, err error) { up.chunksLock.Lock() defer up.chunksLock.Unlock() @@ -103,6 +104,9 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN } else { pageChunk = up.swapFile.NewSwapFileChunk(logicChunkIndex) // fmt.Printf(" create file chunk %d\n", logicChunkIndex) + if pageChunk == nil { + return 0, fmt.Errorf("failed to create swap file chunk") + } } up.writableChunks[logicChunkIndex] = pageChunk } diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 2d803f6af..b84b61b71 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -1,8 +1,9 @@ package page_writer import ( - "github.com/seaweedfs/seaweedfs/weed/util" "testing" + + "github.com/seaweedfs/seaweedfs/weed/util" ) func TestUploadPipeline(t *testing.T) { diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index bcf5ae03a..2f9e3a4b2 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -128,11 +128,14 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) // Perform the write fhOut.dirtyPages.writerPattern.MonitorWriteAt(offsetOut, int(numBytesRead)) - fhOut.dirtyPages.AddPage( + if err := fhOut.dirtyPages.AddPage( offsetOut, buff[:numBytesRead], fhOut.dirtyPages.writerPattern.IsSequentialMode(), - nowUnixNano) + nowUnixNano); err != nil { + glog.Errorf("AddPage error: %v", err) + return 0, fuse.EIO + } // Accumulate for the next loop iteration totalCopied += numBytesRead diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 00511c5cd..91fb718ee 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -1,11 +1,13 @@ package mount import ( - "github.com/hanwen/go-fuse/v2/fuse" - "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "syscall" "time" + + "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" ) /** @@ -72,7 +74,10 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs) + if err := fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs); err != nil { + glog.Errorf("AddPage error: %v", err) + return 0, fuse.EIO + } written = uint32(len(data))