Browse Source

group chunks into sections

pull/4089/head
chrislu 2 years ago
parent
commit
744a757809
  1. 152
      weed/filer/filechunk_group.go
  2. 36
      weed/filer/filechunk_group_test.go
  3. 94
      weed/filer/filechunk_section.go
  4. 1
      weed/mount/dirty_pages_chunked.go
  5. 35
      weed/mount/filehandle.go
  6. 20
      weed/mount/filehandle_read.go
  7. 2
      weed/mount/weedfs_attr.go
  8. 38
      weed/mount/weedfs_file_lseek.go

152
weed/filer/filechunk_group.go

@ -0,0 +1,152 @@
package filer
import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"sync"
)
type ChunkGroup struct {
lookupFn wdclient.LookupFileIdFunctionType
chunkCache chunk_cache.ChunkCache
manifestChunks []*filer_pb.FileChunk
sections map[SectionIndex]*FileChunkSection
sectionsLock sync.RWMutex
}
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
group := &ChunkGroup{
lookupFn: lookupFn,
chunkCache: chunkCache,
sections: make(map[SectionIndex]*FileChunkSection),
}
err := group.SetChunks(chunks)
return group, err
}
func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
group.sectionsLock.Lock()
defer group.sectionsLock.Unlock()
sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
if !found {
section = &FileChunkSection{
sectionIndex: si,
}
group.sections[si] = section
}
section.addChunk(chunk)
}
return nil
}
func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
if !found {
for i := rangeStart; i < rangeStop; i++ {
buff[i-offset] = 0
}
continue
}
xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)
if xErr != nil {
err = xErr
}
n += xn
tsNs = max(tsNs, xTsNs)
}
return
}
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
dataChunks = append(dataChunks, chunk)
continue
}
resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk)
if err != nil {
return err
}
group.manifestChunks = append(group.manifestChunks, chunk)
dataChunks = append(dataChunks, resolvedChunks...)
}
for _, chunk := range dataChunks {
sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
if !found {
section = &FileChunkSection{
sectionIndex: si,
}
group.sections[si] = section
}
section.chunks = append(section.chunks, chunk)
}
}
return nil
}
const (
// see weedfs_file_lseek.go
SEEK_DATA uint32 = 3 // seek to next data after the offset
// SEEK_HOLE uint32 = 4 // seek to next hole after the offset
)
// FIXME: needa tests
func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock()
return group.doSearchChunks(offset, fileSize, whence)
}
func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize)
if whence == SEEK_DATA {
for si := sectionIndex; si < maxSectionIndex+1; si++ {
section, foundSection := group.sections[si]
if !foundSection {
continue
}
sectionStart := section.DataStartOffset(group, offset, fileSize)
if sectionStart == -1 {
continue
}
return true, sectionStart
}
return false, 0
} else {
// whence == SEEK_HOLE
for si := sectionIndex; si < maxSectionIndex; si++ {
section, foundSection := group.sections[si]
if !foundSection {
return true, offset
}
holeStart := section.NextStopOffset(group, offset, fileSize)
if holeStart%SectionSize == 0 {
continue
}
return true, holeStart
}
return true, fileSize
}
}

36
weed/filer/filechunk_group_test.go

@ -0,0 +1,36 @@
package filer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestChunkGroup_doSearchChunks(t *testing.T) {
type fields struct {
sections map[SectionIndex]*FileChunkSection
}
type args struct {
offset int64
fileSize int64
whence uint32
}
tests := []struct {
name string
fields fields
args args
wantFound bool
wantOut int64
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
group := &ChunkGroup{
sections: tt.fields.sections,
}
gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
})
}
}

94
weed/filer/filechunk_section.go

@ -0,0 +1,94 @@
package filer
import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"sync"
)
const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB
type SectionIndex int64
type FileChunkSection struct {
sectionIndex SectionIndex
chunks []*filer_pb.FileChunk
entryViewCache []VisibleInterval
chunkViews []*ChunkView
reader *ChunkReadAt
lock sync.Mutex
}
func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
section.lock.Lock()
defer section.lock.Unlock()
section.chunks = append(section.chunks, chunk)
// FIXME: this can be improved to an incremental change
section.entryViewCache = nil
return nil
}
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
return section.reader.ReadAtWithTime(buff, offset)
}
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
if section.entryViewCache == nil {
section.entryViewCache = readResolvedChunks(section.chunks)
section.chunks, _ = SeparateGarbageChunks(section.entryViewCache, section.chunks)
if section.reader != nil {
_ = section.reader.Close()
section.reader = nil
}
}
if section.reader == nil {
chunkViews := ViewFromVisibleIntervals(section.entryViewCache, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
section.reader = NewChunkReaderAtFromClient(group.lookupFn, chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
}
}
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
for _, visible := range section.entryViewCache {
if visible.stop <= offset {
continue
}
if offset < visible.start {
return offset
}
return offset
}
return -1
}
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
isAfterOffset := false
for _, visible := range section.entryViewCache {
if !isAfterOffset {
if visible.stop <= offset {
continue
}
isAfterOffset = true
}
if offset < visible.start {
return offset
}
// now visible.start <= offset
if offset < visible.stop {
offset = visible.stop
}
}
return offset
}

1
weed/mount/dirty_pages_chunked.go

@ -77,6 +77,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade
return
}
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryChunkGroup.AddChunk(chunk)
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
}

35
weed/mount/filehandle.go

@ -20,13 +20,13 @@ type FileHandle struct {
counter int64
entry *LockedEntry
entryLock sync.Mutex
entryChunkGroup *filer.ChunkGroup
inode uint64
wfs *WFS
// cache file has been written to
dirtyMetadata bool
dirtyPages *PageWriter
entryViewCache []filer.VisibleInterval
reader *filer.ChunkReadAt
contentType string
handle uint64
@ -48,12 +48,12 @@ func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_p
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
fh.entry = &LockedEntry{
Entry: entry,
}
if entry != nil {
fh.SetEntry(entry)
}
if IsDebug {
var err error
@ -76,6 +76,17 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
if entry != nil {
fileSize := filer.FileSize(entry)
entry.Attributes.FileSize = fileSize
var resolveManifestErr error
fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
if resolveManifestErr != nil {
glog.Warningf("failed to resolve manifest chunks in %+v", entry)
}
} else {
glog.Fatalf("setting file handle entry to nil")
}
fh.entry.SetEntry(entry)
}
@ -92,14 +103,6 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
}
fh.entry.AppendChunks(chunks)
fh.entryViewCache = nil
}
func (fh *FileHandle) CloseReader() {
if fh.reader != nil {
_ = fh.reader.Close()
fh.reader = nil
}
}
func (fh *FileHandle) Release() {
@ -109,8 +112,14 @@ func (fh *FileHandle) Release() {
glog.V(4).Infof("Release %s fh %d", fh.entry.Name, fh.handle)
fh.dirtyPages.Destroy()
fh.CloseReader()
if IsDebug {
fh.mirrorFile.Close()
}
}
func lessThan(a, b *filer_pb.FileChunk) bool {
if a.ModifiedTsNs == b.ModifiedTsNs {
return a.Fid.FileKey < b.Fid.FileKey
}
return a.ModifiedTsNs < b.ModifiedTsNs
}

20
weed/mount/filehandle_read.go

@ -56,25 +56,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e
return int64(totalRead), 0, nil
}
var chunkResolveErr error
if fh.entryViewCache == nil {
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize)
if chunkResolveErr != nil {
return 0, 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
fh.CloseReader()
}
if fh.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize)
glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
//for _, chunkView := range chunkViews {
// glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
//}
fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
}
totalRead, ts, err := fh.reader.ReadAtWithTime(buff, offset)
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)

2
weed/mount/weedfs_attr.go

@ -75,7 +75,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
fh.entryViewCache = nil
fh.entryChunkGroup.SetChunks(chunks)
}
}
entry.Attributes.Mtime = time.Now().Unix()

38
weed/mount/weedfs_file_lseek.go

@ -56,17 +56,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
// refresh view cache if necessary
if fh.entryViewCache == nil {
var err error
fh.entryViewCache, err = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), fh.entry.GetChunks(), 0, fileSize)
if err != nil {
return fuse.EIO
}
}
// search chunks for the offset
found, offset := searchChunks(fh, offset, fileSize, in.Whence)
found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK
@ -82,30 +73,3 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return fuse.OK
}
// searchChunks goes through all chunks to find the correct offset
func searchChunks(fh *FileHandle, offset, fileSize int64, whence uint32) (found bool, out int64) {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, offset, fileSize)
for _, chunkView := range chunkViews {
if offset < chunkView.LogicOffset {
if whence == SEEK_HOLE {
out = offset
} else {
out = chunkView.LogicOffset
}
return true, out
}
if offset >= chunkView.LogicOffset && offset < chunkView.Offset+int64(chunkView.Size) && whence == SEEK_DATA {
out = offset
return true, out
}
offset += int64(chunkView.Size)
}
return
}
Loading…
Cancel
Save