Browse Source

swap file based random write large file upload

pull/2613/head
chrislu 3 years ago
parent
commit
c376ccc5a5
  1. 4
      weed/filesys/dirty_pages_mem_chunk.go
  2. 112
      weed/filesys/dirty_pages_temp_file.go
  3. 159
      weed/filesys/page_writer/chunked_file_writer.go
  4. 60
      weed/filesys/page_writer/chunked_file_writer_test.go
  5. 116
      weed/filesys/page_writer/page_chunk_file.go
  6. 165
      weed/filesys/page_writer/upload_pipeline.go
  7. 2
      weed/filesys/page_writer/upload_pipeline_test.go

4
weed/filesys/dirty_pages_mem_chunk.go

@ -31,8 +31,10 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages {
fh: fh, fh: fh,
} }
swapFileDir := fh.f.wfs.option.getTempFilePageDir()
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(),
fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage)
fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, swapFileDir)
return dirtyPages return dirtyPages
} }

112
weed/filesys/dirty_pages_temp_file.go

@ -1,112 +0,0 @@
package filesys
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"os"
"sync"
"time"
)
type TempFileDirtyPages struct {
f *File
writeWaitGroup sync.WaitGroup
pageAddLock sync.Mutex
chunkAddLock sync.Mutex
lastErr error
collection string
replication string
chunkedFile *page_writer.ChunkedFileWriter
}
func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
f: file,
chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize),
}
return tempFile
}
func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
pages.lastErr = err
}
return
}
func (pages *TempFileDirtyPages) FlushData() error {
pages.saveChunkedFileToStorage()
pages.writeWaitGroup.Wait()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
pages.chunkedFile.Reset()
return nil
}
func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
return pages.chunkedFile.ReadDataAt(data, startOffset)
}
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
})
}
func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) {
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
writer := func() {
defer pages.writeWaitGroup.Done()
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
return
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.chunkAddLock.Lock()
defer pages.chunkAddLock.Unlock()
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
}
if pages.f.wfs.concurrentWriters != nil {
pages.f.wfs.concurrentWriters.Execute(writer)
} else {
go writer()
}
}
func (pages TempFileDirtyPages) Destroy() {
pages.chunkedFile.Reset()
}
func (pages *TempFileDirtyPages) LockForRead(startOffset, stopOffset int64) {
}
func (pages *TempFileDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
}

159
weed/filesys/page_writer/chunked_file_writer.go

@ -1,159 +0,0 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"io"
"os"
"sync"
)
type ActualChunkIndex int
// ChunkedFileWriter assumes the write requests will come in within chunks
type ChunkedFileWriter struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkUsages []*ChunkWrittenIntervalList
ChunkSize int64
sync.Mutex
}
var _ = io.WriterAt(&ChunkedFileWriter{})
func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
return &ChunkedFileWriter{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
ChunkSize: chunkSize,
}
}
func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
cw.Lock()
defer cw.Unlock()
if cw.file == nil {
cw.file, err = os.CreateTemp(cw.dir, "")
if err != nil {
glog.Errorf("create temp file: %v", err)
return
}
}
actualOffset, chunkUsage := cw.toActualWriteOffset(off)
n, err = cw.file.WriteAt(p, actualOffset)
if err == nil {
startOffset := off % cw.ChunkSize
chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
}
return
}
func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
cw.Lock()
defer cw.Unlock()
if cw.file == nil {
return
}
logicChunkIndex := off / cw.ChunkSize
actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
if chunkUsage != nil {
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
_, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
if err != nil {
glog.Errorf("reading temp file: %v", err)
break
}
maxStop = max(maxStop, logicStop)
}
}
}
return
}
func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
offsetRemainder := logicOffset % cw.ChunkSize
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if found {
return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
}
cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
chunkUsage = newChunkWrittenIntervalList()
cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
}
func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if found {
return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
}
return 0, nil
}
func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
chunkUsage := cw.chunkUsages[actualChunkIndex]
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
process(cw.file, logicChunkIndex, t)
}
}
}
// Reset releases used resources
func (cw *ChunkedFileWriter) Reset() {
if cw.file != nil {
cw.file.Close()
os.Remove(cw.file.Name())
cw.file = nil
}
cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
cw.chunkUsages = cw.chunkUsages[:0]
}
type FileIntervalReader struct {
f *os.File
startOffset int64
stopOffset int64
position int64
}
var _ = io.Reader(&FileIntervalReader{})
func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if !found {
// this should never happen
return nil
}
return &FileIntervalReader{
f: cw.file,
startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
position: 0,
}
}
func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
if err == nil || err == io.EOF {
fr.position += int64(n)
if fr.stopOffset-fr.startOffset-fr.position == 0 {
// return a tiny bit faster
err = io.EOF
return
}
}
return
}

60
weed/filesys/page_writer/chunked_file_writer_test.go

@ -1,60 +0,0 @@
package page_writer
import (
"github.com/stretchr/testify/assert"
"os"
"testing"
)
func TestChunkedFileWriter_toActualOffset(t *testing.T) {
cw := NewChunkedFileWriter("", 16)
writeToFile(cw, 50, 60)
writeToFile(cw, 60, 64)
writeToFile(cw, 32, 40)
writeToFile(cw, 42, 48)
writeToFile(cw, 48, 50)
assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered")
assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals")
}
func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
_, chunkUsage := cw.toActualWriteOffset(startOffset)
// skip doing actual writing
innerOffset := startOffset % cw.ChunkSize
chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset)
}
func TestWriteChunkedFile(t *testing.T) {
x := NewChunkedFileWriter(os.TempDir(), 20)
defer x.Reset()
y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Reset()
batchSize := 4
buf := make([]byte, batchSize)
for i := 0; i < 256; i++ {
for x := 0; x < batchSize; x++ {
buf[x] = byte(i)
}
x.WriteAt(buf, int64(i*batchSize))
y.WriteAt(buf, int64((255-i)*batchSize))
}
a := make([]byte, 1)
b := make([]byte, 1)
for i := 0; i < 256*batchSize; i++ {
x.ReadDataAt(a, int64(i))
y.ReadDataAt(b, int64(256*batchSize-1-i))
assert.Equal(t, a[0], b[0], "same read")
}
}

116
weed/filesys/page_writer/page_chunk_file.go

@ -0,0 +1,116 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"os"
)
var (
_ = PageChunk(&TempFileChunk{})
)
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
}
type TempFileChunk struct {
swapfile *SwapFile
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
if sf.file != nil {
sf.file.Close()
os.Remove(sf.file.Name())
}
}
func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *TempFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
if err != nil {
glog.Errorf("create swap file: %v", err)
return nil
}
}
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
}
return &TempFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
}
}
func (tc *TempFileChunk) FreeResource() {
}
func (tc *TempFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % tc.swapfile.chunkSize
var err error
n, err = tc.swapfile.file.WriteAt(src, int64(tc.actualChunkIndex)*tc.swapfile.chunkSize+innerOffset)
if err == nil {
tc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
} else {
glog.Errorf("failed to write swap file %s: %v", tc.swapfile.file.Name(), err)
}
return
}
func (tc *TempFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
chunkStartOffset := int64(tc.logicChunkIndex) * tc.swapfile.chunkSize
for t := tc.usage.head.next; t != tc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(tc.actualChunkIndex)*tc.swapfile.chunkSize
if _, err := tc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", tc.swapfile.file.Name(), err)
break
}
maxStop = max(maxStop, logicStop)
}
}
return
}
func (tc *TempFileChunk) IsComplete() bool {
return tc.usage.IsComplete(tc.swapfile.chunkSize)
}
func (tc *TempFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := tc.usage.head.next; t != tc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
tc.swapfile.file.ReadAt(data, t.StartOffset+int64(tc.actualChunkIndex)*tc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(tc.logicChunkIndex)*tc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
})
mem.Free(data)
}
}

165
weed/filesys/page_writer/upload_pipeline.go

@ -24,6 +24,7 @@ type UploadPipeline struct {
saveToStorageFn SaveToStorageFunc saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex activeReadChunksLock sync.Mutex
swapFile *SwapFile
} }
type SealedChunk struct { type SealedChunk struct {
@ -39,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
} }
} }
func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline {
func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline {
return &UploadPipeline{ return &UploadPipeline{
ChunkSize: chunkSize, ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk), writableChunks: make(map[LogicChunkIndex]PageChunk),
@ -49,177 +50,185 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx
saveToStorageFn: saveToStorageFn, saveToStorageFn: saveToStorageFn,
filepath: filepath, filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int), activeReadChunks: make(map[LogicChunkIndex]int),
swapFile: NewSwapFile(swapFileDir, chunkSize),
} }
} }
func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
cw.writableChunksLock.Lock()
defer cw.writableChunksLock.Unlock()
func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
memChunk, found := cw.writableChunks[logicChunkIndex]
memChunk, found := up.writableChunks[logicChunkIndex]
if !found { if !found {
memChunk = NewMemChunk(logicChunkIndex, cw.ChunkSize)
cw.writableChunks[logicChunkIndex] = memChunk
if len(up.writableChunks) < 0 {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
memChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
if memChunk == nil {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
}
up.writableChunks[logicChunkIndex] = memChunk
} }
n = memChunk.WriteDataAt(p, off) n = memChunk.WriteDataAt(p, off)
cw.maybeMoveToSealed(memChunk, logicChunkIndex)
up.maybeMoveToSealed(memChunk, logicChunkIndex)
return return
} }
func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
// read from sealed chunks first // read from sealed chunks first
cw.sealedChunksLock.Lock()
sealedChunk, found := cw.sealedChunks[logicChunkIndex]
up.sealedChunksLock.Lock()
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found { if found {
sealedChunk.referenceCounter++ sealedChunk.referenceCounter++
} }
cw.sealedChunksLock.Unlock()
up.sealedChunksLock.Unlock()
if found { if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off) maxStop = sealedChunk.chunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex))
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
} }
// read from writable chunks last // read from writable chunks last
cw.writableChunksLock.Lock()
defer cw.writableChunksLock.Unlock()
writableChunk, found := cw.writableChunks[logicChunkIndex]
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
writableChunk, found := up.writableChunks[logicChunkIndex]
if !found { if !found {
return return
} }
writableMaxStop := writableChunk.ReadDataAt(p, off) writableMaxStop := writableChunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop) maxStop = max(maxStop, writableMaxStop)
return return
} }
func (cw *UploadPipeline) FlushAll() {
cw.writableChunksLock.Lock()
defer cw.writableChunksLock.Unlock()
func (up *UploadPipeline) FlushAll() {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
for logicChunkIndex, memChunk := range cw.writableChunks {
cw.moveToSealed(memChunk, logicChunkIndex)
for logicChunkIndex, memChunk := range up.writableChunks {
up.moveToSealed(memChunk, logicChunkIndex)
} }
cw.waitForCurrentWritersToComplete()
up.waitForCurrentWritersToComplete()
} }
func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize)
if stopOffset%cw.ChunkSize > 0 {
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1 stopLogicChunkIndex += 1
} }
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := cw.activeReadChunks[i]; found {
cw.activeReadChunks[i] = count + 1
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else { } else {
cw.activeReadChunks[i] = 1
up.activeReadChunks[i] = 1
} }
} }
} }
func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize)
if stopOffset%cw.ChunkSize > 0 {
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1 stopLogicChunkIndex += 1
} }
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := cw.activeReadChunks[i]; found {
if count, found := up.activeReadChunks[i]; found {
if count == 1 { if count == 1 {
delete(cw.activeReadChunks, i)
delete(up.activeReadChunks, i)
} else { } else {
cw.activeReadChunks[i] = count - 1
up.activeReadChunks[i] = count - 1
} }
} }
} }
} }
func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
cw.activeReadChunksLock.Lock()
defer cw.activeReadChunksLock.Unlock()
if count, found := cw.activeReadChunks[logicChunkIndex]; found {
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0 return count > 0
} }
return false return false
} }
func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
cw.uploaderCountCond.L.Lock()
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100) t := int32(100)
for { for {
t = atomic.LoadInt32(&cw.uploaderCount)
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 { if t <= 0 {
break break
} }
cw.uploaderCountCond.Wait()
up.uploaderCountCond.Wait()
} }
cw.uploaderCountCond.L.Unlock()
up.uploaderCountCond.L.Unlock()
} }
func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() { if memChunk.IsComplete() {
cw.moveToSealed(memChunk, logicChunkIndex)
up.moveToSealed(memChunk, logicChunkIndex)
} }
} }
func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&cw.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount)
func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&up.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
cw.sealedChunksLock.Lock()
up.sealedChunksLock.Lock()
if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found {
oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex))
if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
} }
sealedChunk := &SealedChunk{ sealedChunk := &SealedChunk{
chunk: memChunk, chunk: memChunk,
referenceCounter: 1, // default 1 is for uploading process referenceCounter: 1, // default 1 is for uploading process
} }
cw.sealedChunks[logicChunkIndex] = sealedChunk
delete(cw.writableChunks, logicChunkIndex)
up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex)
cw.sealedChunksLock.Unlock()
up.sealedChunksLock.Unlock()
cw.uploaders.Execute(func() {
up.uploaders.Execute(func() {
// first add to the file chunks // first add to the file chunks
sealedChunk.chunk.SaveContent(cw.saveToStorageFn)
sealedChunk.chunk.SaveContent(up.saveToStorageFn)
// notify waiting process // notify waiting process
atomic.AddInt32(&cw.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount)
atomic.AddInt32(&up.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
// Lock and Unlock are not required, // Lock and Unlock are not required,
// but it may signal multiple times during one wakeup, // but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them! // and the waiting goroutine may miss some of them!
cw.uploaderCountCond.L.Lock()
cw.uploaderCountCond.Broadcast()
cw.uploaderCountCond.L.Unlock()
up.uploaderCountCond.L.Lock()
up.uploaderCountCond.Broadcast()
up.uploaderCountCond.L.Unlock()
// wait for readers // wait for readers
for cw.IsLocked(logicChunkIndex) {
for up.IsLocked(logicChunkIndex) {
time.Sleep(59 * time.Millisecond) time.Sleep(59 * time.Millisecond)
} }
// then remove from sealed chunks // then remove from sealed chunks
cw.sealedChunksLock.Lock()
defer cw.sealedChunksLock.Unlock()
delete(cw.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex))
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
delete(up.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
}) })
} }
func (p2 *UploadPipeline) Shutdown() {
func (up *UploadPipeline) Shutdown() {
up.swapFile.FreeResource()
} }

2
weed/filesys/page_writer/upload_pipeline_test.go

@ -7,7 +7,7 @@ import (
func TestUploadPipeline(t *testing.T) { func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil)
uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil, "")
writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144) writeRange(uploadPipeline, 131072, 262144)

Loading…
Cancel
Save