chrislu
3 years ago
4 changed files with 95 additions and 3 deletions
-
2weed/filesys/filehandle.go
-
78weed/filesys/page_writer.go
-
1weed/filesys/page_writer/page_chunk.go
-
17weed/filesys/page_writer_pattern.go
@ -0,0 +1,78 @@ |
|||
package filesys |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
) |
|||
|
|||
type PageWriter struct { |
|||
f *File |
|||
collection string |
|||
replication string |
|||
chunkSize int64 |
|||
writerPattern *WriterPattern |
|||
|
|||
randomWriter page_writer.DirtyPages |
|||
streamWriter page_writer.DirtyPages |
|||
} |
|||
|
|||
var ( |
|||
_ = page_writer.DirtyPages(&PageWriter{}) |
|||
) |
|||
|
|||
func newPageWriter(file *File, chunkSize int64) *PageWriter { |
|||
pw := &PageWriter{ |
|||
f: file, |
|||
chunkSize: chunkSize, |
|||
randomWriter: newTempFileDirtyPages(file), |
|||
streamWriter: newContinuousDirtyPages(file), |
|||
writerPattern: NewWriterPattern(file.Name, chunkSize), |
|||
} |
|||
return pw |
|||
} |
|||
|
|||
func (pw *PageWriter) AddPage(offset int64, data []byte) { |
|||
|
|||
glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) |
|||
|
|||
pw.writerPattern.MonitorWriteAt(offset, len(data)) |
|||
|
|||
chunkIndex := offset / pw.chunkSize |
|||
for i := chunkIndex; len(data) > 0; i++ { |
|||
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) |
|||
pw.addToOneChunk(i, offset, data[:writeSize]) |
|||
offset += writeSize |
|||
data = data[writeSize:] |
|||
} |
|||
} |
|||
|
|||
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { |
|||
if chunkIndex > 0 { |
|||
if pw.writerPattern.IsStreamingMode() { |
|||
pw.streamWriter.AddPage(offset, data) |
|||
return |
|||
} |
|||
} |
|||
pw.randomWriter.AddPage(offset, data) |
|||
} |
|||
|
|||
func (pw *PageWriter) FlushData() error { |
|||
if err := pw.streamWriter.FlushData(); err != nil { |
|||
return err |
|||
} |
|||
return pw.randomWriter.FlushData() |
|||
} |
|||
|
|||
func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { |
|||
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data))) |
|||
m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset) |
|||
m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset) |
|||
return max(m1, m2) |
|||
} |
|||
|
|||
func (pw *PageWriter) GetStorageOptions() (collection, replication string) { |
|||
if pw.writerPattern.IsStreamingMode() { |
|||
return pw.streamWriter.GetStorageOptions() |
|||
} |
|||
return pw.randomWriter.GetStorageOptions() |
|||
} |
@ -0,0 +1 @@ |
|||
package page_writer |
@ -1,22 +1,35 @@ |
|||
package page_writer |
|||
package filesys |
|||
|
|||
import "fmt" |
|||
|
|||
type WriterPattern struct { |
|||
isStreaming bool |
|||
lastWriteOffset int64 |
|||
chunkSize int64 |
|||
fileName string |
|||
} |
|||
|
|||
// For streaming write: only cache the first chunk
|
|||
// For random write: fall back to temp file approach
|
|||
// writes can only change from streaming mode to non-streaming mode
|
|||
|
|||
func NewWriterPattern() *WriterPattern { |
|||
func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern { |
|||
return &WriterPattern{ |
|||
isStreaming: true, |
|||
lastWriteOffset: 0, |
|||
chunkSize: chunkSize, |
|||
fileName: fileName, |
|||
} |
|||
} |
|||
|
|||
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { |
|||
if rp.lastWriteOffset == 0 { |
|||
} |
|||
if rp.lastWriteOffset > offset { |
|||
if rp.isStreaming { |
|||
fmt.Printf("file %s ==> non streaming at [%d,%d)\n", rp.fileName, offset, offset+int64(size)) |
|||
} |
|||
fmt.Printf("write %s [%d,%d)\n", rp.fileName, offset, offset+int64(size)) |
|||
rp.isStreaming = false |
|||
} |
|||
rp.lastWriteOffset = offset |
Write
Preview
Loading…
Cancel
Save
Reference in new issue