|
@ -4,7 +4,6 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
"github.com/chrislusf/seaweedfs/weed/util/mem" |
|
|
|
|
|
"sync" |
|
|
"sync" |
|
|
"sync/atomic" |
|
|
"sync/atomic" |
|
|
"time" |
|
|
"time" |
|
@ -62,10 +61,7 @@ func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { |
|
|
|
|
|
|
|
|
memChunk, found := cw.writableChunks[logicChunkIndex] |
|
|
memChunk, found := cw.writableChunks[logicChunkIndex] |
|
|
if !found { |
|
|
if !found { |
|
|
memChunk = &MemChunk{ |
|
|
|
|
|
buf: mem.Allocate(int(cw.ChunkSize)), |
|
|
|
|
|
usage: newChunkWrittenIntervalList(), |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
memChunk = NewMemChunk(logicChunkIndex, cw.ChunkSize) |
|
|
cw.writableChunks[logicChunkIndex] = memChunk |
|
|
cw.writableChunks[logicChunkIndex] = memChunk |
|
|
} |
|
|
} |
|
|
n = memChunk.WriteDataAt(p, offsetRemainder) |
|
|
n = memChunk.WriteDataAt(p, offsetRemainder) |
|
@ -85,7 +81,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { |
|
|
} |
|
|
} |
|
|
cw.sealedChunksLock.Unlock() |
|
|
cw.sealedChunksLock.Unlock() |
|
|
if found { |
|
|
if found { |
|
|
maxStop = sealedChunk.chunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize) |
|
|
|
|
|
|
|
|
maxStop = sealedChunk.chunk.ReadDataAt(p, off) |
|
|
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) |
|
|
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) |
|
|
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) |
|
|
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) |
|
|
} |
|
|
} |
|
@ -97,7 +93,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { |
|
|
if !found { |
|
|
if !found { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
writableMaxStop := writableChunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize) |
|
|
|
|
|
|
|
|
writableMaxStop := writableChunk.ReadDataAt(p, off) |
|
|
glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) |
|
|
glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) |
|
|
maxStop = max(maxStop, writableMaxStop) |
|
|
maxStop = max(maxStop, writableMaxStop) |
|
|
|
|
|
|
|
@ -174,7 +170,7 @@ func (cw *UploadPipeline) waitForCurrentWritersToComplete() { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { |
|
|
func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { |
|
|
if memChunk.IsComplete(cw.ChunkSize) { |
|
|
|
|
|
|
|
|
if memChunk.IsComplete() { |
|
|
cw.moveToSealed(memChunk, logicChunkIndex) |
|
|
cw.moveToSealed(memChunk, logicChunkIndex) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -199,7 +195,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic |
|
|
|
|
|
|
|
|
cw.uploaders.Execute(func() { |
|
|
cw.uploaders.Execute(func() { |
|
|
// first add to the file chunks
|
|
|
// first add to the file chunks
|
|
|
sealedChunk.chunk.SaveContent(cw.saveToStorageFn, logicChunkIndex, cw.ChunkSize) |
|
|
|
|
|
|
|
|
sealedChunk.chunk.SaveContent(cw.saveToStorageFn) |
|
|
|
|
|
|
|
|
// notify waiting process
|
|
|
// notify waiting process
|
|
|
atomic.AddInt32(&cw.uploaderCount, -1) |
|
|
atomic.AddInt32(&cw.uploaderCount, -1) |
|
|