Browse Source

upload only not flushed chunks

avoid_releasing_temp_file_on_write
chrislu 3 years ago
parent
commit
8f9d1c1e3c
  1. 4
      weed/filesys/dirty_pages_temp_file.go
  2. 8
      weed/filesys/page_writer/chunk_interval_list.go
  3. 6
      weed/filesys/page_writer/chunked_file_writer.go
  4. 4
      weed/filesys/page_writer/chunked_file_writer_test.go
  5. 2
      weed/filesys/page_writer/chunked_stream_writer_test.go

4
weed/filesys/dirty_pages_temp_file.go

@ -51,7 +51,6 @@ func (pages *TempFileDirtyPages) FlushData() error {
if pages.lastErr != nil { if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr) return fmt.Errorf("flush data: %v", pages.lastErr)
} }
pages.chunkedFile.Reset()
return nil return nil
} }
@ -68,6 +67,7 @@ func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
interval.MarkFlushed()
}) })
} }
@ -102,5 +102,5 @@ func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reade
} }
func (pages TempFileDirtyPages) Destroy() { func (pages TempFileDirtyPages) Destroy() {
pages.chunkedFile.Reset()
pages.chunkedFile.Destroy()
} }

8
weed/filesys/page_writer/chunk_interval_list.go

@ -6,6 +6,7 @@ import "math"
type ChunkWrittenInterval struct { type ChunkWrittenInterval struct {
StartOffset int64 StartOffset int64
stopOffset int64 stopOffset int64
flushed bool
prev *ChunkWrittenInterval prev *ChunkWrittenInterval
next *ChunkWrittenInterval next *ChunkWrittenInterval
} }
@ -18,6 +19,10 @@ func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
return interval.stopOffset-interval.StartOffset == chunkSize return interval.stopOffset-interval.StartOffset == chunkSize
} }
func (interval *ChunkWrittenInterval) MarkFlushed() {
interval.flushed = true
}
// ChunkWrittenIntervalList mark written intervals within one page chunk // ChunkWrittenIntervalList mark written intervals within one page chunk
type ChunkWrittenIntervalList struct { type ChunkWrittenIntervalList struct {
head *ChunkWrittenInterval head *ChunkWrittenInterval
@ -64,18 +69,21 @@ func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together // merge p and q together
p.stopOffset = q.stopOffset p.stopOffset = q.stopOffset
p.flushed = false
unlinkNodesBetween(p, q.next) unlinkNodesBetween(p, q.next)
return return
} }
if interval.StartOffset <= p.stopOffset { if interval.StartOffset <= p.stopOffset {
// merge new interval into p // merge new interval into p
p.stopOffset = interval.stopOffset p.stopOffset = interval.stopOffset
p.flushed = false
unlinkNodesBetween(p, q) unlinkNodesBetween(p, q)
return return
} }
if q.StartOffset <= interval.stopOffset { if q.StartOffset <= interval.stopOffset {
// merge new interval into q // merge new interval into q
q.StartOffset = interval.StartOffset q.StartOffset = interval.StartOffset
q.flushed = false
unlinkNodesBetween(p, q) unlinkNodesBetween(p, q)
return return
} }

6
weed/filesys/page_writer/chunked_file_writer.go

@ -106,13 +106,15 @@ func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, log
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
chunkUsage := cw.chunkUsages[actualChunkIndex] chunkUsage := cw.chunkUsages[actualChunkIndex]
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
if !t.flushed {
process(cw.file, logicChunkIndex, t) process(cw.file, logicChunkIndex, t)
} }
} }
}
} }
// Reset releases used resources
func (cw *ChunkedFileWriter) Reset() {
// Destroy releases used resources
func (cw *ChunkedFileWriter) Destroy() {
if cw.file != nil { if cw.file != nil {
cw.file.Close() cw.file.Close()
os.Remove(cw.file.Name()) os.Remove(cw.file.Name())

4
weed/filesys/page_writer/chunked_file_writer_test.go

@ -35,9 +35,9 @@ func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
func TestWriteChunkedFile(t *testing.T) { func TestWriteChunkedFile(t *testing.T) {
x := NewChunkedFileWriter(os.TempDir(), 20) x := NewChunkedFileWriter(os.TempDir(), 20)
defer x.Reset()
defer x.Destroy()
y := NewChunkedFileWriter(os.TempDir(), 12) y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Reset()
defer y.Destroy()
batchSize := 4 batchSize := 4
buf := make([]byte, batchSize) buf := make([]byte, batchSize)

2
weed/filesys/page_writer/chunked_stream_writer_test.go

@ -10,7 +10,7 @@ func TestWriteChunkedStream(t *testing.T) {
x := NewChunkedStreamWriter(20) x := NewChunkedStreamWriter(20)
defer x.Reset() defer x.Reset()
y := NewChunkedFileWriter(os.TempDir(), 12) y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Reset()
defer y.Destroy()
batchSize := 4 batchSize := 4
buf := make([]byte, batchSize) buf := make([]byte, batchSize)

Loading…
Cancel
Save