Browse Source
add stream writer
add stream writer
this should improve streaming write performance, which is common in many cases, e.g., copying large files.
This is additional to improved random read write operations: 3e69d19380
...19084d87918f297cac15e2471c19306176e0771f
pull/2539/head
chrislu
3 years ago
10 changed files with 331 additions and 35 deletions
-
107weed/filesys/dirty_pages_stream.go
-
6weed/filesys/dirty_pages_temp_file.go
-
16weed/filesys/filehandle.go
-
32weed/filesys/page_writer.go
-
29weed/filesys/page_writer/chunk_interval_list.go
-
11weed/filesys/page_writer/chunked_file_writer.go
-
4weed/filesys/page_writer/chunked_file_writer_test.go
-
119weed/filesys/page_writer/chunked_stream_writer.go
-
33weed/filesys/page_writer/chunked_stream_writer_test.go
-
9weed/filesys/page_writer_pattern.go
@ -0,0 +1,107 @@ |
|||||
|
package filesys |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"io" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type StreamDirtyPages struct { |
||||
|
f *File |
||||
|
writeWaitGroup sync.WaitGroup |
||||
|
pageAddLock sync.Mutex |
||||
|
chunkAddLock sync.Mutex |
||||
|
lastErr error |
||||
|
collection string |
||||
|
replication string |
||||
|
chunkedStream *page_writer.ChunkedStreamWriter |
||||
|
} |
||||
|
|
||||
|
func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages { |
||||
|
|
||||
|
dirtyPages := &StreamDirtyPages{ |
||||
|
f: file, |
||||
|
chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize), |
||||
|
} |
||||
|
|
||||
|
dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage) |
||||
|
|
||||
|
return dirtyPages |
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) { |
||||
|
|
||||
|
pages.pageAddLock.Lock() |
||||
|
defer pages.pageAddLock.Unlock() |
||||
|
|
||||
|
glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) |
||||
|
if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil { |
||||
|
pages.lastErr = err |
||||
|
} |
||||
|
|
||||
|
return |
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) FlushData() error { |
||||
|
pages.saveChunkedFileToStorage() |
||||
|
pages.writeWaitGroup.Wait() |
||||
|
if pages.lastErr != nil { |
||||
|
return fmt.Errorf("flush data: %v", pages.lastErr) |
||||
|
} |
||||
|
pages.chunkedStream.Reset() |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { |
||||
|
return pages.chunkedStream.ReadDataAt(data, startOffset) |
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) { |
||||
|
return pages.collection, pages.replication |
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) saveChunkedFileToStorage() { |
||||
|
|
||||
|
pages.chunkedStream.FlushAll() |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { |
||||
|
|
||||
|
mtime := time.Now().UnixNano() |
||||
|
pages.writeWaitGroup.Add(1) |
||||
|
writer := func() { |
||||
|
defer pages.writeWaitGroup.Done() |
||||
|
defer cleanupFn() |
||||
|
|
||||
|
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) |
||||
|
pages.lastErr = err |
||||
|
return |
||||
|
} |
||||
|
chunk.Mtime = mtime |
||||
|
pages.collection, pages.replication = collection, replication |
||||
|
pages.chunkAddLock.Lock() |
||||
|
pages.f.addChunks([]*filer_pb.FileChunk{chunk}) |
||||
|
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) |
||||
|
pages.chunkAddLock.Unlock() |
||||
|
|
||||
|
cleanupFn() |
||||
|
} |
||||
|
|
||||
|
if pages.f.wfs.concurrentWriters != nil { |
||||
|
pages.f.wfs.concurrentWriters.Execute(writer) |
||||
|
} else { |
||||
|
go writer() |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (pages StreamDirtyPages) Destroy() { |
||||
|
pages.chunkedStream.Reset() |
||||
|
} |
@ -0,0 +1,119 @@ |
|||||
|
package page_writer |
||||
|
|
||||
|
import ( |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util/mem" |
||||
|
"io" |
||||
|
"sync" |
||||
|
"sync/atomic" |
||||
|
) |
||||
|
|
||||
|
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) |
||||
|
|
||||
|
// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
|
||||
|
type ChunkedStreamWriter struct { |
||||
|
activeChunks map[LogicChunkIndex]*MemChunk |
||||
|
activeChunksLock sync.Mutex |
||||
|
ChunkSize int64 |
||||
|
saveToStorageFn SaveToStorageFunc |
||||
|
sync.Mutex |
||||
|
} |
||||
|
|
||||
|
type MemChunk struct { |
||||
|
buf []byte |
||||
|
usage *ChunkWrittenIntervalList |
||||
|
} |
||||
|
|
||||
|
var _ = io.WriterAt(&ChunkedStreamWriter{}) |
||||
|
|
||||
|
func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { |
||||
|
return &ChunkedStreamWriter{ |
||||
|
ChunkSize: chunkSize, |
||||
|
activeChunks: make(map[LogicChunkIndex]*MemChunk), |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { |
||||
|
cw.saveToStorageFn = saveToStorageFn |
||||
|
} |
||||
|
|
||||
|
func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { |
||||
|
cw.Lock() |
||||
|
defer cw.Unlock() |
||||
|
|
||||
|
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) |
||||
|
offsetRemainder := off % cw.ChunkSize |
||||
|
|
||||
|
memChunk, found := cw.activeChunks[logicChunkIndex] |
||||
|
if !found { |
||||
|
memChunk = &MemChunk{ |
||||
|
buf: mem.Allocate(int(cw.ChunkSize)), |
||||
|
usage: newChunkWrittenIntervalList(), |
||||
|
} |
||||
|
cw.activeChunks[logicChunkIndex] = memChunk |
||||
|
} |
||||
|
n = copy(memChunk.buf[offsetRemainder:], p) |
||||
|
memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) |
||||
|
if memChunk.usage.IsComplete(cw.ChunkSize) { |
||||
|
if cw.saveToStorageFn != nil { |
||||
|
cw.saveOneChunk(memChunk, logicChunkIndex) |
||||
|
delete(cw.activeChunks, logicChunkIndex) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return |
||||
|
} |
||||
|
|
||||
|
func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { |
||||
|
cw.Lock() |
||||
|
defer cw.Unlock() |
||||
|
|
||||
|
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) |
||||
|
memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize |
||||
|
memChunk, found := cw.activeChunks[logicChunkIndex] |
||||
|
if !found { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { |
||||
|
logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) |
||||
|
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) |
||||
|
if logicStart < logicStop { |
||||
|
copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) |
||||
|
maxStop = max(maxStop, logicStop) |
||||
|
} |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
func (cw *ChunkedStreamWriter) FlushAll() { |
||||
|
cw.Lock() |
||||
|
defer cw.Unlock() |
||||
|
for logicChunkIndex, memChunk := range cw.activeChunks { |
||||
|
if cw.saveToStorageFn != nil { |
||||
|
cw.saveOneChunk(memChunk, logicChunkIndex) |
||||
|
delete(cw.activeChunks, logicChunkIndex) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { |
||||
|
var referenceCounter = int32(memChunk.usage.size()) |
||||
|
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { |
||||
|
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) |
||||
|
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { |
||||
|
atomic.AddInt32(&referenceCounter, -1) |
||||
|
if atomic.LoadInt32(&referenceCounter) == 0 { |
||||
|
mem.Free(memChunk.buf) |
||||
|
} |
||||
|
}) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// Reset releases used resources
|
||||
|
func (cw *ChunkedStreamWriter) Reset() { |
||||
|
for t, memChunk := range cw.activeChunks { |
||||
|
mem.Free(memChunk.buf) |
||||
|
delete(cw.activeChunks, t) |
||||
|
} |
||||
|
} |
@ -0,0 +1,33 @@ |
|||||
|
package page_writer |
||||
|
|
||||
|
import ( |
||||
|
"github.com/stretchr/testify/assert" |
||||
|
"os" |
||||
|
"testing" |
||||
|
) |
||||
|
|
||||
|
func TestWriteChunkedStream(t *testing.T) { |
||||
|
x := NewChunkedStreamWriter(20) |
||||
|
defer x.Reset() |
||||
|
y := NewChunkedFileWriter(os.TempDir(), 12) |
||||
|
defer y.Reset() |
||||
|
|
||||
|
batchSize := 4 |
||||
|
buf := make([]byte, batchSize) |
||||
|
for i := 0; i < 256; i++ { |
||||
|
for x := 0; x < batchSize; x++ { |
||||
|
buf[x] = byte(i) |
||||
|
} |
||||
|
x.WriteAt(buf, int64(i*batchSize)) |
||||
|
y.WriteAt(buf, int64((255-i)*batchSize)) |
||||
|
} |
||||
|
|
||||
|
a := make([]byte, 1) |
||||
|
b := make([]byte, 1) |
||||
|
for i := 0; i < 256*batchSize; i++ { |
||||
|
x.ReadDataAt(a, int64(i)) |
||||
|
y.ReadDataAt(b, int64(256*batchSize-1-i)) |
||||
|
assert.Equal(t, a[0], b[0], "same read") |
||||
|
} |
||||
|
|
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue