Browse Source

write to disk during random writes, limiting total disk spaces used

pull/2756/head
chrislu 3 years ago
parent
commit
f2f68f675e
  1. 4
      weed/mount/dirty_pages_chunked.go
  2. 10
      weed/mount/page_writer.go
  3. 2
      weed/mount/page_writer/dirty_pages.go
  4. 8
      weed/mount/page_writer/page_chunk_swapfile.go
  5. 17
      weed/mount/page_writer/upload_pipeline.go
  6. 2
      weed/mount/page_writer/upload_pipeline_test.go
  7. 44
      weed/mount/page_writer_pattern.go
  8. 4
      weed/mount/weedfs_file_write.go

4
weed/mount/dirty_pages_chunked.go

@ -38,11 +38,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages return dirtyPages
} }
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) {
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
pages.hasWrites = true pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset)
pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
return return
} }

10
weed/mount/page_writer.go

@ -10,6 +10,7 @@ type PageWriter struct {
collection string collection string
replication string replication string
chunkSize int64 chunkSize int64
writerPattern *WriterPattern
randomWriter page_writer.DirtyPages randomWriter page_writer.DirtyPages
} }
@ -22,26 +23,27 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
pw := &PageWriter{ pw := &PageWriter{
fh: fh, fh: fh,
chunkSize: chunkSize, chunkSize: chunkSize,
writerPattern: NewWriterPattern(chunkSize),
randomWriter: newMemoryChunkPages(fh, chunkSize), randomWriter: newMemoryChunkPages(fh, chunkSize),
} }
return pw return pw
} }
func (pw *PageWriter) AddPage(offset int64, data []byte) {
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize])
pw.addToOneChunk(i, offset, data[:writeSize], isSequentail)
offset += writeSize offset += writeSize
data = data[writeSize:] data = data[writeSize:]
} }
} }
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
pw.randomWriter.AddPage(offset, data)
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
pw.randomWriter.AddPage(offset, data, isSequential)
} }
func (pw *PageWriter) FlushData() error { func (pw *PageWriter) FlushData() error {

2
weed/mount/page_writer/dirty_pages.go

@ -1,7 +1,7 @@
package page_writer package page_writer
type DirtyPages interface { type DirtyPages interface {
AddPage(offset int64, data []byte)
AddPage(offset int64, data []byte, isSequential bool)
FlushData() error FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string) GetStorageOptions() (collection, replication string)

8
weed/mount/page_writer/page_chunk_swapfile.go

@ -18,6 +18,7 @@ type SwapFile struct {
file *os.File file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64 chunkSize int64
freeActualChunkList []ActualChunkIndex
} }
type SwapFileChunk struct { type SwapFileChunk struct {
@ -53,7 +54,12 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
} }
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found { if !found {
if len(sf.freeActualChunkList) > 0 {
actualChunkIndex = sf.freeActualChunkList[0]
sf.freeActualChunkList = sf.freeActualChunkList[1:]
} else {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
}
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
} }
@ -66,6 +72,8 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
} }
func (sc *SwapFileChunk) FreeResource() { func (sc *SwapFileChunk) FreeResource() {
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
} }
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {

17
weed/mount/page_writer/upload_pipeline.go

@ -55,7 +55,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
} }
} }
func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
up.writableChunksLock.Lock() up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock() defer up.writableChunksLock.Unlock()
@ -63,13 +63,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
pageChunk, found := up.writableChunks[logicChunkIndex] pageChunk, found := up.writableChunks[logicChunkIndex]
if !found { if !found {
if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) {
// if total number of chunks is over 4 times of per file buffer count limit
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
} else if len(up.writableChunks) < up.bufferChunkLimit {
// if current file chunks is still under the per file buffer count limit
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
if len(up.writableChunks) > up.bufferChunkLimit {
// if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks { for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize() chunkFullness := mc.WrittenSize()
@ -81,7 +76,13 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex) delete(up.writableChunks, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
}
if isSequential &&
len(up.writableChunks) < up.bufferChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
} }
up.writableChunks[logicChunkIndex] = pageChunk up.writableChunks[logicChunkIndex] = pageChunk
} }

2
weed/mount/page_writer/upload_pipeline_test.go

@ -31,7 +31,7 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4) p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 { for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i)) util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i)
uploadPipeline.SaveDataAt(p, i, false)
} }
} }

44
weed/mount/page_writer_pattern.go

@ -0,0 +1,44 @@
package mount
type WriterPattern struct {
isStreaming bool
lastWriteOffset int64
chunkSize int64
}
// For streaming write: only cache the first chunk
// For random write: fall back to temp file approach
// writes can only change from streaming mode to non-streaming mode
func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{
isStreaming: true,
lastWriteOffset: -1,
chunkSize: chunkSize,
}
}
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
if rp.lastWriteOffset > offset {
rp.isStreaming = false
}
if rp.lastWriteOffset == -1 {
if offset != 0 {
rp.isStreaming = false
}
}
rp.lastWriteOffset = offset
}
func (rp *WriterPattern) IsStreamingMode() bool {
return rp.isStreaming
}
func (rp *WriterPattern) IsRandomMode() bool {
return !rp.isStreaming
}
func (rp *WriterPattern) Reset() {
rp.isStreaming = true
rp.lastWriteOffset = -1
}

4
weed/mount/weedfs_file_write.go

@ -43,6 +43,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
return 0, fuse.ENOENT return 0, fuse.ENOENT
} }
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
fh.Lock() fh.Lock()
defer fh.Unlock() defer fh.Unlock()
@ -56,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(offset, data)
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode())
written = uint32(len(data)) written = uint32(len(data))

Loading…
Cancel
Save