diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index 8498ec1e0..8cccded67 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -3,6 +3,7 @@ package page_writer import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" + "sync" "sync/atomic" ) @@ -13,6 +14,7 @@ var ( ) type MemChunk struct { + sync.RWMutex buf []byte usage *ChunkWrittenIntervalList chunkSize int64 @@ -30,11 +32,17 @@ func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { } func (mc *MemChunk) FreeResource() { + mc.Lock() + defer mc.Unlock() + atomic.AddInt64(&memChunkCounter, -1) mem.Free(mc.buf) } func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { + mc.Lock() + defer mc.Unlock() + innerOffset := offset % mc.chunkSize n = copy(mc.buf[innerOffset:], src) mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) @@ -42,6 +50,9 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { } func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { + mc.RLock() + defer mc.RUnlock() + memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) @@ -55,14 +66,23 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { } func (mc *MemChunk) IsComplete() bool { + mc.RLock() + defer mc.RUnlock() + return mc.usage.IsComplete(mc.chunkSize) } func (mc *MemChunk) WrittenSize() int64 { + mc.RLock() + defer mc.RUnlock() + return mc.usage.WrittenSize() } func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { + mc.RLock() + defer mc.RUnlock() + if saveFn == nil { return } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 4fbd18b16..bf2cdb256 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -24,6 +24,7 @@ type SwapFile struct { } type SwapFileChunk struct { + sync.RWMutex swapfile *SwapFile usage *ChunkWrittenIntervalList logicChunkIndex LogicChunkIndex @@ -79,11 +80,17 @@ func (sc *SwapFileChunk) FreeResource() { sc.swapfile.logicToActualChunkIndexLock.Lock() defer sc.swapfile.logicToActualChunkIndexLock.Unlock() + sc.Lock() + defer sc.Unlock() + sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex) delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) } func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { + sc.Lock() + defer sc.Unlock() + innerOffset := offset % sc.swapfile.chunkSize var err error n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) @@ -96,6 +103,9 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { } func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { + sc.RLock() + defer sc.RUnlock() + chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { logicStart := max(off, chunkStartOffset+t.StartOffset) @@ -113,10 +123,14 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { } func (sc *SwapFileChunk) IsComplete() bool { + sc.RLock() + defer sc.RUnlock() return sc.usage.IsComplete(sc.swapfile.chunkSize) } func (sc *SwapFileChunk) WrittenSize() int64 { + sc.RLock() + defer sc.RUnlock() return sc.usage.WrittenSize() } @@ -124,6 +138,9 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { if saveFn == nil { return } + sc.Lock() + defer sc.Unlock() + for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { data := mem.Allocate(int(t.Size())) sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) @@ -132,5 +149,6 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { }) mem.Free(data) } + sc.usage = newChunkWrittenIntervalList() }