Browse Source

refactor, change file locations

pull/2532/head
chrislu 3 years ago
parent
commit
bc96682760
  1. 5
      weed/filesys/dirty_pages_continuous.go
  2. 19
      weed/filesys/dirty_pages_temp_file.go
  3. 3
      weed/filesys/filehandle.go
  4. 2
      weed/filesys/page_writer/dirty_page_interval.go
  5. 2
      weed/filesys/page_writer/dirty_page_interval_test.go
  6. 2
      weed/filesys/page_writer/dirty_pages.go
  7. 55
      weed/filesys/page_writer/dirty_pages_temp_interval.go

5
weed/filesys/dirty_pages_continuous.go

@ -3,6 +3,7 @@ package filesys
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"io" "io"
"sync" "sync"
"time" "time"
@ -12,7 +13,7 @@ import (
) )
type ContinuousDirtyPages struct { type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
intervals *page_writer.ContinuousIntervals
f *File f *File
writeOnly bool writeOnly bool
writeWaitGroup sync.WaitGroup writeWaitGroup sync.WaitGroup
@ -24,7 +25,7 @@ type ContinuousDirtyPages struct {
func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages { func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{ dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
intervals: &page_writer.ContinuousIntervals{},
f: file, f: file,
writeOnly: writeOnly, writeOnly: writeOnly,
} }

19
weed/filesys/dirty_pages_temp_file.go

@ -2,6 +2,7 @@ package filesys
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io" "io"
@ -13,7 +14,7 @@ import (
type TempFileDirtyPages struct { type TempFileDirtyPages struct {
f *File f *File
tf *os.File tf *os.File
writtenIntervals *WrittenContinuousIntervals
writtenIntervals *page_writer.WrittenContinuousIntervals
writeOnly bool writeOnly bool
writeWaitGroup sync.WaitGroup writeWaitGroup sync.WaitGroup
pageAddLock sync.Mutex pageAddLock sync.Mutex
@ -28,7 +29,7 @@ func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{ tempFile := &TempFileDirtyPages{
f: file, f: file,
writeOnly: writeOnly, writeOnly: writeOnly,
writtenIntervals: &WrittenContinuousIntervals{},
writtenIntervals: &page_writer.WrittenContinuousIntervals{},
} }
return tempFile return tempFile
@ -47,11 +48,11 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
return return
} }
pages.tf = tf pages.tf = tf
pages.writtenIntervals.tempFile = tf
pages.writtenIntervals.lastOffset = 0
pages.writtenIntervals.TempFile = tf
pages.writtenIntervals.LastOffset = 0
} }
writtenOffset := pages.writtenIntervals.lastOffset
writtenOffset := pages.writtenIntervals.LastOffset
dataSize := int64(len(data)) dataSize := int64(len(data))
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
@ -60,7 +61,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.lastErr = err pages.lastErr = err
} else { } else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
pages.writtenIntervals.lastOffset += dataSize
pages.writtenIntervals.LastOffset += dataSize
} }
// pages.writtenIntervals.debug() // pages.writtenIntervals.debug()
@ -79,8 +80,8 @@ func (pages *TempFileDirtyPages) FlushData() error {
defer pages.pageAddLock.Unlock() defer pages.pageAddLock.Unlock()
if pages.tf != nil { if pages.tf != nil {
pages.writtenIntervals.tempFile = nil
pages.writtenIntervals.lists = nil
pages.writtenIntervals.TempFile = nil
pages.writtenIntervals.Lists = nil
pages.tf.Close() pages.tf.Close()
os.Remove(pages.tf.Name()) os.Remove(pages.tf.Name())
@ -95,7 +96,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
for _, list := range pages.writtenIntervals.lists {
for _, list := range pages.writtenIntervals.Lists {
listStopOffset := list.Offset() + list.Size() listStopOffset := list.Offset() + list.Size()
for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)

3
weed/filesys/filehandle.go

@ -3,6 +3,7 @@ package filesys
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"io" "io"
"math" "math"
"net/http" "net/http"
@ -20,7 +21,7 @@ import (
type FileHandle struct { type FileHandle struct {
// cache file has been written to // cache file has been written to
dirtyPages DirtyPages
dirtyPages page_writer.DirtyPages
entryViewCache []filer.VisibleInterval entryViewCache []filer.VisibleInterval
reader io.ReaderAt reader io.ReaderAt
contentType string contentType string

2
weed/filesys/dirty_page_interval.go → weed/filesys/page_writer/dirty_page_interval.go

@ -1,4 +1,4 @@
package filesys
package page_writer
import ( import (
"io" "io"

2
weed/filesys/dirty_page_interval_test.go → weed/filesys/page_writer/dirty_page_interval_test.go

@ -1,4 +1,4 @@
package filesys
package page_writer
import ( import (
"bytes" "bytes"

2
weed/filesys/dirty_pages.go → weed/filesys/page_writer/dirty_pages.go

@ -1,4 +1,4 @@
package filesys
package page_writer
type DirtyPages interface { type DirtyPages interface {
AddPage(offset int64, data []byte) AddPage(offset int64, data []byte)

55
weed/filesys/dirty_pages_temp_interval.go → weed/filesys/page_writer/dirty_pages_temp_interval.go

@ -1,4 +1,4 @@
package filesys
package page_writer
import ( import (
"io" "io"
@ -20,9 +20,9 @@ type WrittenIntervalLinkedList struct {
} }
type WrittenContinuousIntervals struct { type WrittenContinuousIntervals struct {
tempFile *os.File
lastOffset int64
lists []*WrittenIntervalLinkedList
TempFile *os.File
LastOffset int64
Lists []*WrittenIntervalLinkedList
} }
func (list *WrittenIntervalLinkedList) Offset() int64 { func (list *WrittenIntervalLinkedList) Offset() int64 {
@ -65,7 +65,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
} }
func (c *WrittenContinuousIntervals) TotalSize() (total int64) { func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
for _, list := range c.lists {
for _, list := range c.Lists {
total += list.Size() total += list.Size()
} }
return return
@ -98,7 +98,7 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv
func (c *WrittenContinuousIntervals) debug() { func (c *WrittenContinuousIntervals) debug() {
log.Printf("++") log.Printf("++")
for _, l := range c.lists {
for _, l := range c.Lists {
log.Printf("++++") log.Printf("++++")
for t := l.Head; ; t = t.Next { for t := l.Head; ; t = t.Next {
log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
@ -116,8 +116,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
// append to the tail and return // append to the tail and return
if len(c.lists) == 1 {
lastSpan := c.lists[0]
if len(c.Lists) == 1 {
lastSpan := c.Lists[0]
if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset { if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
lastSpan.addNodeToTail(interval) lastSpan.addNodeToTail(interval)
return return
@ -125,7 +125,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
} }
var newLists []*WrittenIntervalLinkedList var newLists []*WrittenIntervalLinkedList
for _, list := range c.lists {
for _, list := range c.Lists {
// if list is to the left of new interval, add to the new list // if list is to the left of new interval, add to the new list
if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset { if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
newLists = append(newLists, list) newLists = append(newLists, list)
@ -147,18 +147,18 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
// skip anything that is fully overwritten by the new interval // skip anything that is fully overwritten by the new interval
} }
c.lists = newLists
c.Lists = newLists
// add the new interval to the lists, connecting neighbor lists // add the new interval to the lists, connecting neighbor lists
var prevList, nextList *WrittenIntervalLinkedList var prevList, nextList *WrittenIntervalLinkedList
for _, list := range c.lists {
for _, list := range c.Lists {
if list.Head.DataOffset == interval.DataOffset+interval.Size { if list.Head.DataOffset == interval.DataOffset+interval.Size {
nextList = list nextList = list
break break
} }
} }
for _, list := range c.lists {
for _, list := range c.Lists {
if list.Head.DataOffset+list.Size() == dataOffset { if list.Head.DataOffset+list.Size() == dataOffset {
list.addNodeToTail(interval) list.addNodeToTail(interval)
prevList = list prevList = list
@ -176,8 +176,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
nextList.addNodeToHead(interval) nextList.addNodeToHead(interval)
} }
if prevList == nil && nextList == nil { if prevList == nil && nextList == nil {
c.lists = append(c.lists, &WrittenIntervalLinkedList{
tempFile: c.tempFile,
c.Lists = append(c.Lists, &WrittenIntervalLinkedList{
tempFile: c.TempFile,
Head: interval, Head: interval,
Tail: interval, Tail: interval,
}) })
@ -189,7 +189,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList { func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
var maxSize int64 var maxSize int64
maxIndex := -1 maxIndex := -1
for k, list := range c.lists {
for k, list := range c.Lists {
if maxSize <= list.Size() { if maxSize <= list.Size() {
maxSize = list.Size() maxSize = list.Size()
maxIndex = k maxIndex = k
@ -199,16 +199,16 @@ func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenI
return nil return nil
} }
t := c.lists[maxIndex]
t.tempFile = c.tempFile
c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
t := c.Lists[maxIndex]
t.tempFile = c.TempFile
c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...)
return t return t
} }
func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) { func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
index := -1 index := -1
for k, list := range c.lists {
for k, list := range c.Lists {
if list.Offset() == target.Offset() { if list.Offset() == target.Offset() {
index = k index = k
} }
@ -217,12 +217,12 @@ func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedLis
return return
} }
c.lists = append(c.lists[0:index], c.lists[index+1:]...)
c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...)
} }
func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
for _, list := range c.lists {
for _, list := range c.Lists {
start := max(startOffset, list.Offset()) start := max(startOffset, list.Offset())
stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
if start < stop { if start < stop {
@ -287,3 +287,16 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) {
} }
return return
} }
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
Loading…
Cancel
Save