|
@ -7,11 +7,14 @@ import ( |
|
|
"sync" |
|
|
"sync" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
type LogicChunkIndex int |
|
|
|
|
|
type ActualChunkIndex int |
|
|
|
|
|
|
|
|
// ChunkedFileWriter assumes the write requests will come in within chunks
|
|
|
// ChunkedFileWriter assumes the write requests will come in within chunks
|
|
|
type ChunkedFileWriter struct { |
|
|
type ChunkedFileWriter struct { |
|
|
dir string |
|
|
dir string |
|
|
file *os.File |
|
|
file *os.File |
|
|
logicToActualChunkIndex map[int]int |
|
|
|
|
|
|
|
|
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex |
|
|
chunkUsages []*ChunkWrittenIntervalList |
|
|
chunkUsages []*ChunkWrittenIntervalList |
|
|
ChunkSize int64 |
|
|
ChunkSize int64 |
|
|
sync.Mutex |
|
|
sync.Mutex |
|
@ -23,7 +26,7 @@ func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { |
|
|
return &ChunkedFileWriter{ |
|
|
return &ChunkedFileWriter{ |
|
|
dir: dir, |
|
|
dir: dir, |
|
|
file: nil, |
|
|
file: nil, |
|
|
logicToActualChunkIndex: make(map[int]int), |
|
|
|
|
|
|
|
|
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), |
|
|
ChunkSize: chunkSize, |
|
|
ChunkSize: chunkSize, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -78,20 +81,20 @@ func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { |
|
|
func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { |
|
|
logicChunkIndex := int(logicOffset / cw.ChunkSize) |
|
|
|
|
|
|
|
|
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) |
|
|
offsetRemainder := logicOffset % cw.ChunkSize |
|
|
offsetRemainder := logicOffset % cw.ChunkSize |
|
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
if found { |
|
|
if found { |
|
|
return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] |
|
|
return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] |
|
|
} |
|
|
} |
|
|
cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages) |
|
|
|
|
|
|
|
|
cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) |
|
|
chunkUsage = newChunkWrittenIntervalList() |
|
|
chunkUsage = newChunkWrittenIntervalList() |
|
|
cw.chunkUsages = append(cw.chunkUsages, chunkUsage) |
|
|
cw.chunkUsages = append(cw.chunkUsages, chunkUsage) |
|
|
return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage |
|
|
return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *ChunkWrittenIntervalList) { |
|
|
|
|
|
logicChunkIndex := int(logicOffset / cw.ChunkSize) |
|
|
|
|
|
|
|
|
func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { |
|
|
|
|
|
logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) |
|
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
if found { |
|
|
if found { |
|
|
return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] |
|
|
return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] |
|
@ -99,7 +102,7 @@ func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkI |
|
|
return 0, nil |
|
|
return 0, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *ChunkWrittenInterval)) { |
|
|
|
|
|
|
|
|
func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { |
|
|
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { |
|
|
for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { |
|
|
chunkUsage := cw.chunkUsages[actualChunkIndex] |
|
|
chunkUsage := cw.chunkUsages[actualChunkIndex] |
|
|
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { |
|
|
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { |
|
@ -123,7 +126,7 @@ type FileIntervalReader struct { |
|
|
|
|
|
|
|
|
var _ = io.Reader(&FileIntervalReader{}) |
|
|
var _ = io.Reader(&FileIntervalReader{}) |
|
|
|
|
|
|
|
|
func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *ChunkWrittenInterval) *FileIntervalReader { |
|
|
|
|
|
|
|
|
func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { |
|
|
actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] |
|
|
if !found { |
|
|
if !found { |
|
|
// this should never happen
|
|
|
// this should never happen
|
|
|