Browse Source

working properly

pull/2074/head
Chris Lu 4 years ago
parent
commit
d06ecc2649
  1. 27
      weed/filesys/dirty_pages_temp_file.go
  2. 48
      weed/filesys/dirty_pages_temp_interval.go
  3. 4
      weed/filesys/filehandle.go

27
weed/filesys/dirty_pages_temp_file.go

@ -56,18 +56,23 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
} }
pages.tf = tf pages.tf = tf
pages.writtenIntervals.tempFile = tf pages.writtenIntervals.tempFile = tf
pages.writtenIntervals.lastOffset = 0
} }
writtenOffset := pages.writtenIntervals.TotalSize()
writtenOffset := pages.writtenIntervals.lastOffset
dataSize := int64(len(data))
glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+int64(len(data)))
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
pages.lastErr = err pages.lastErr = err
} else { } else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
pages.writtenIntervals.lastOffset += dataSize
} }
// pages.writtenIntervals.debug()
return return
} }
@ -81,6 +86,11 @@ func (pages *TempFileDirtyPages) FlushData() error {
pages.pageAddLock.Lock() pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock() defer pages.pageAddLock.Unlock()
if pages.tf != nil { if pages.tf != nil {
pages.writtenIntervals.tempFile = nil
pages.writtenIntervals.lists = nil
pages.tf.Close()
os.Remove(pages.tf.Name()) os.Remove(pages.tf.Name())
pages.tf = nil pages.tf = nil
} }
@ -91,15 +101,16 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
pageSize := pages.f.wfs.option.ChunkSizeLimit pageSize := pages.f.wfs.option.ChunkSizeLimit
uploadedSize := int64(0)
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
for _, list := range pages.writtenIntervals.lists { for _, list := range pages.writtenIntervals.lists {
for {
start, stop := max(list.Offset(), uploadedSize), min(list.Offset()+list.Size(), uploadedSize+pageSize)
listStopOffset := list.Offset() + list.Size()
for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop { if start >= stop {
break
continue
} }
uploadedSize = stop
glog.V(4).Infof("uploading %v [%d,%d)", pages.f.Name, start, stop)
// glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
pages.saveToStorage(list.ToReader(start, stop), start, stop-start) pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
} }
} }

48
weed/filesys/dirty_pages_temp_interval.go

@ -1,8 +1,8 @@
package filesys package filesys
import ( import (
"github.com/chrislusf/seaweedfs/weed/glog"
"io" "io"
"log"
"os" "os"
) )
@ -20,8 +20,9 @@ type WrittenIntervalLinkedList struct {
} }
type WrittenContinuousIntervals struct { type WrittenContinuousIntervals struct {
tempFile *os.File
lists []*WrittenIntervalLinkedList
tempFile *os.File
lastOffset int64
lists []*WrittenIntervalLinkedList
} }
func (list *WrittenIntervalLinkedList) Offset() int64 { func (list *WrittenIntervalLinkedList) Offset() int64 {
@ -95,6 +96,21 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv
} }
} }
func (c *WrittenContinuousIntervals) debug() {
log.Printf("++")
for _, l := range c.lists {
log.Printf("++++")
for t := l.Head; ; t = t.Next {
log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
if t.Next == nil {
break
}
}
log.Printf("----")
}
log.Printf("--")
}
func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) { func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
@ -223,6 +239,7 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
for t := l.Head; ; t = t.Next { for t := l.Head; ; t = t.Next {
startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop) startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
if startOffset < stopOffset { if startOffset < stopOffset {
// log.Printf("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset)) readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
} }
if t.Next == nil { if t.Next == nil {
@ -236,29 +253,32 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
} }
type FileSectionReader struct { type FileSectionReader struct {
file *os.File
Offset int64
dataStart int64
dataStop int64
file *os.File
tempStartOffset int64
Offset int64
dataStart int64
dataStop int64
} }
var _ = io.Reader(&FileSectionReader{}) var _ = io.Reader(&FileSectionReader{})
func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader { func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
return &FileSectionReader{ return &FileSectionReader{
file: tempfile,
Offset: offset,
dataStart: dataOffset,
dataStop: dataOffset + size,
file: tempfile,
tempStartOffset: offset,
Offset: offset,
dataStart: dataOffset,
dataStop: dataOffset + size,
} }
} }
func (f *FileSectionReader) Read(p []byte) (n int, err error) { func (f *FileSectionReader) Read(p []byte) (n int, err error) {
dataLen := min(f.dataStop-f.Offset, int64(len(p)))
if dataLen < 0 {
remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
if remaining <= 0 {
return 0, io.EOF return 0, io.EOF
} }
glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen)
dataLen := min(remaining, int64(len(p)))
// glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
n, err = f.file.ReadAt(p[:dataLen], f.Offset) n, err = f.file.ReadAt(p[:dataLen], f.Offset)
if n > 0 { if n > 0 {
f.Offset += int64(n) f.Offset += int64(n)

4
weed/filesys/filehandle.go

@ -38,8 +38,8 @@ type FileHandle struct {
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
fh := &FileHandle{ fh := &FileHandle{
f: file, f: file,
dirtyPages: newContinuousDirtyPages(file, writeOnly),
/// dirtyPages: newTempFileDirtyPages(file, writeOnly),
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
dirtyPages: newTempFileDirtyPages(file, writeOnly),
Uid: uid, Uid: uid,
Gid: gid, Gid: gid,
} }

Loading…
Cancel
Save