Browse Source

chunked file works now

pull/2534/head
chrislu 3 years ago
parent
commit
032df784ed
  1. 2
      weed/filesys/dirty_pages_continuous.go
  2. 78
      weed/filesys/dirty_pages_temp_file.go
  3. 1
      weed/filesys/filehandle.go
  4. 28
      weed/filesys/page_writer.go
  5. 152
      weed/filesys/page_writer/chunked_file_writer.go
  6. 60
      weed/filesys/page_writer/chunked_file_writer_test.go
  7. 20
      weed/filesys/page_writer/dirty_pages.go
  8. 13
      weed/filesys/page_writer/dirty_pages_temp_interval.go
  9. 4
      weed/filesys/page_writer/page_chunk_interval_list.go

2
weed/filesys/dirty_pages_continuous.go

@ -147,3 +147,5 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication return pages.collection, pages.replication
} }
func (pages ContinuousDirtyPages) Destroy() {
}

78
weed/filesys/dirty_pages_temp_file.go

@ -13,21 +13,20 @@ import (
type TempFileDirtyPages struct { type TempFileDirtyPages struct {
f *File f *File
tf *os.File
writtenIntervals *page_writer.WrittenContinuousIntervals
writeWaitGroup sync.WaitGroup writeWaitGroup sync.WaitGroup
pageAddLock sync.Mutex pageAddLock sync.Mutex
chunkAddLock sync.Mutex chunkAddLock sync.Mutex
lastErr error lastErr error
collection string collection string
replication string replication string
chunkedFile *page_writer.ChunkedFileWriter
} }
func newTempFileDirtyPages(file *File) *TempFileDirtyPages {
func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{ tempFile := &TempFileDirtyPages{
f: file, f: file,
writtenIntervals: &page_writer.WrittenContinuousIntervals{},
chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize),
} }
return tempFile return tempFile
@ -38,28 +37,8 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.pageAddLock.Lock() pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock() defer pages.pageAddLock.Unlock()
if pages.tf == nil {
tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
if err != nil {
glog.Errorf("create temp file: %v", err)
pages.lastErr = err
return
}
pages.tf = tf
pages.writtenIntervals.TempFile = tf
pages.writtenIntervals.LastOffset = 0
}
writtenOffset := pages.writtenIntervals.LastOffset
dataSize := int64(len(data))
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
pages.lastErr = err pages.lastErr = err
} else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
pages.writtenIntervals.LastOffset += dataSize
} }
// pages.writtenIntervals.debug() // pages.writtenIntervals.debug()
@ -68,54 +47,38 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
} }
func (pages *TempFileDirtyPages) FlushData() error { func (pages *TempFileDirtyPages) FlushData() error {
pages.saveExistingPagesToStorage()
pages.saveChunkedFileToStorage()
pages.writeWaitGroup.Wait() pages.writeWaitGroup.Wait()
if pages.lastErr != nil { if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr) return fmt.Errorf("flush data: %v", pages.lastErr)
} }
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
if pages.tf != nil {
pages.writtenIntervals.TempFile = nil
pages.writtenIntervals.Lists = nil
pages.tf.Close()
os.Remove(pages.tf.Name())
pages.tf = nil
}
return nil return nil
} }
func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
return pages.chunkedFile.ReadDataAt(data, startOffset)
}
pageSize := pages.f.wfs.option.ChunkSizeLimit
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
for _, list := range pages.writtenIntervals.Lists {
listStopOffset := list.Offset() + list.Size()
for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop {
continue
}
// glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
}
}
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex int, interval *page_writer.PageChunkWrittenInterval) {
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size())
})
} }
func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) {
mtime := time.Now().UnixNano() mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1) pages.writeWaitGroup.Add(1)
writer := func() { writer := func() {
defer pages.writeWaitGroup.Done() defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil { if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
@ -135,12 +98,9 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s
} else { } else {
go writer() go writer()
} }
}
func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
return pages.writtenIntervals.ReadDataAt(data, startOffset)
} }
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
func (pages TempFileDirtyPages) Destroy() {
pages.chunkedFile.Destroy()
} }

1
weed/filesys/filehandle.go

@ -222,6 +222,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.reader = nil fh.reader = nil
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
fh.dirtyPages.Destroy()
} }
if fh.f.isOpen < 0 { if fh.f.isOpen < 0 {

28
weed/filesys/page_writer.go

@ -24,7 +24,7 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter {
pw := &PageWriter{ pw := &PageWriter{
f: file, f: file,
chunkSize: chunkSize, chunkSize: chunkSize,
randomWriter: newTempFileDirtyPages(file),
randomWriter: newTempFileDirtyPages(file, chunkSize),
streamWriter: newContinuousDirtyPages(file), streamWriter: newContinuousDirtyPages(file),
writerPattern: NewWriterPattern(file.Name, chunkSize), writerPattern: NewWriterPattern(file.Name, chunkSize),
} }
@ -63,11 +63,23 @@ func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData() return pw.randomWriter.FlushData()
} }
func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data)))
m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset)
m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset)
return max(m1, m2)
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset)
m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
maxStop = max(maxStop, max(m1, m2))
offset += readSize
data = data[readSize:]
}
return
} }
func (pw *PageWriter) GetStorageOptions() (collection, replication string) { func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
@ -76,3 +88,7 @@ func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
} }
return pw.randomWriter.GetStorageOptions() return pw.randomWriter.GetStorageOptions()
} }
func (pw *PageWriter) Destroy() {
pw.randomWriter.Destroy()
}

152
weed/filesys/page_writer/chunked_file_writer.go

@ -0,0 +1,152 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"io"
"os"
"sync"
)
// ChunkedFileWriter assumes the write requests will come in within chunks
type ChunkedFileWriter struct {
dir string
file *os.File
logicToActualChunkIndex map[int]int
chunkUsages []*PageChunkWrittenIntervalList
ChunkSize int64
sync.Mutex
}
var _ = io.WriterAt(&ChunkedFileWriter{})
func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
return &ChunkedFileWriter{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[int]int),
ChunkSize: chunkSize,
}
}
func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
cw.Lock()
defer cw.Unlock()
if cw.file == nil {
cw.file, err = os.CreateTemp(cw.dir, "")
if err != nil {
glog.Errorf("create temp file: %v", err)
return
}
}
actualOffset, chunkUsage := cw.toActualWriteOffset(off)
n, err = cw.file.WriteAt(p, actualOffset)
if err == nil {
startOffset := off % cw.ChunkSize
chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
}
return
}
func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
cw.Lock()
defer cw.Unlock()
if cw.file == nil {
return
}
logicChunkIndex := off / cw.ChunkSize
actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
if chunkUsage != nil {
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset)
logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
_, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
if err != nil {
glog.Errorf("reading temp file: %v", err)
break
}
maxStop = max(maxStop, logicStop)
}
}
}
return
}
func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *PageChunkWrittenIntervalList) {
logicChunkIndex := int(logicOffset / cw.ChunkSize)
offsetRemainder := logicOffset % cw.ChunkSize
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if found {
return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
}
cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages)
chunkUsage = newPageChunkWrittenIntervalList()
cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
}
func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *PageChunkWrittenIntervalList) {
logicChunkIndex := int(logicOffset / cw.ChunkSize)
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if found {
return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
}
return 0, nil
}
func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *PageChunkWrittenInterval)) {
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
chunkUsage := cw.chunkUsages[actualChunkIndex]
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
process(cw.file, logicChunkIndex, t)
}
}
}
func (cw *ChunkedFileWriter) Destroy() {
if cw.file != nil {
cw.file.Close()
os.Remove(cw.file.Name())
}
}
type FileIntervalReader struct {
f *os.File
startOffset int64
stopOffset int64
position int64
}
var _ = io.Reader(&FileIntervalReader{})
func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *PageChunkWrittenInterval) *FileIntervalReader {
actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
if !found {
// this should never happen
return nil
}
return &FileIntervalReader{
f: cw.file,
startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset,
stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
position: 0,
}
}
func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
if err == nil || err == io.EOF {
fr.position += int64(n)
if fr.stopOffset-fr.startOffset-fr.position == 0 {
// return a tiny bit faster
err = io.EOF
return
}
}
return
}

60
weed/filesys/page_writer/chunked_file_writer_test.go

@ -0,0 +1,60 @@
package page_writer
import (
"github.com/stretchr/testify/assert"
"os"
"testing"
)
func TestChunkedFileWriter_toActualOffset(t *testing.T) {
cw := NewChunkedFileWriter("", 16)
writeToFile(cw, 50, 60)
writeToFile(cw, 60, 64)
writeToFile(cw, 32, 40)
writeToFile(cw, 42, 48)
writeToFile(cw, 48, 50)
assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered")
assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals")
}
func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
_, chunkUsage := cw.toActualWriteOffset(startOffset)
// skip doing actual writing
innerOffset := startOffset % cw.ChunkSize
chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset)
}
func TestWriteChunkedFile(t *testing.T) {
x := NewChunkedFileWriter(os.TempDir(), 20)
defer x.Destroy()
y := NewChunkedFileWriter(os.TempDir(), 12)
defer y.Destroy()
batchSize := 4
buf := make([]byte, batchSize)
for i := 0; i < 256; i++ {
for x := 0; x < batchSize; x++ {
buf[x] = byte(i)
}
x.WriteAt(buf, int64(i*batchSize))
y.WriteAt(buf, int64((255-i)*batchSize))
}
a := make([]byte, 1)
b := make([]byte, 1)
for i := 0; i < 256*batchSize; i++ {
x.ReadDataAt(a, int64(i))
y.ReadDataAt(b, int64(256*batchSize-1-i))
assert.Equal(t, a[0], b[0], "same read")
}
}

20
weed/filesys/page_writer/dirty_pages.go

@ -5,4 +5,24 @@ type DirtyPages interface {
FlushData() error FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string) GetStorageOptions() (collection, replication string)
Destroy()
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func minInt(x, y int) int {
if x < y {
return x
}
return y
} }

13
weed/filesys/page_writer/dirty_pages_temp_interval.go

@ -287,16 +287,3 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) {
} }
return return
} }
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

4
weed/filesys/page_writer/page_chunk_interval_list.go

@ -10,6 +10,10 @@ type PageChunkWrittenInterval struct {
next *PageChunkWrittenInterval next *PageChunkWrittenInterval
} }
func (interval *PageChunkWrittenInterval) Size() int64 {
return interval.stopOffset - interval.startOffset
}
// PageChunkWrittenIntervalList mark written intervals within one page chunk // PageChunkWrittenIntervalList mark written intervals within one page chunk
type PageChunkWrittenIntervalList struct { type PageChunkWrittenIntervalList struct {
head *PageChunkWrittenInterval head *PageChunkWrittenInterval

Loading…
Cancel
Save