|
@ -100,10 +100,11 @@ func (v *Volume) readNeedleMetaAt(n *needle.Needle, offset int64, size int32) (e |
|
|
|
|
|
|
|
|
// read fills in Needle content by looking up n.Id from NeedleMapper
|
|
|
// read fills in Needle content by looking up n.Id from NeedleMapper
|
|
|
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) { |
|
|
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) { |
|
|
v.dataFileAccessLock.RLock() |
|
|
|
|
|
defer v.dataFileAccessLock.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
v.dataFileAccessLock.RLock() |
|
|
nv, ok := v.nm.Get(n.Id) |
|
|
nv, ok := v.nm.Get(n.Id) |
|
|
|
|
|
v.dataFileAccessLock.RUnlock() |
|
|
|
|
|
|
|
|
if !ok || nv.Offset.IsZero() { |
|
|
if !ok || nv.Offset.IsZero() { |
|
|
return ErrorNotFound |
|
|
return ErrorNotFound |
|
|
} |
|
|
} |
|
@ -120,22 +121,36 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision { |
|
|
|
|
|
// the volume is compacted
|
|
|
|
|
|
readOption.IsOutOfRange = false |
|
|
|
|
|
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) |
|
|
|
|
|
} |
|
|
|
|
|
buf := mem.Allocate(min(1024*1024, int(size))) |
|
|
|
|
|
defer mem.Free(buf) |
|
|
|
|
|
actualOffset := nv.Offset.ToActualOffset() |
|
|
actualOffset := nv.Offset.ToActualOffset() |
|
|
if readOption.IsOutOfRange { |
|
|
if readOption.IsOutOfRange { |
|
|
actualOffset += int64(MaxPossibleVolumeSize) |
|
|
actualOffset += int64(MaxPossibleVolumeSize) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
buf := mem.Allocate(min(1024*1024, int(size))) |
|
|
|
|
|
defer mem.Free(buf) |
|
|
|
|
|
|
|
|
// read needle data
|
|
|
// read needle data
|
|
|
crc := needle.CRC(0) |
|
|
crc := needle.CRC(0) |
|
|
for x := offset; x < offset+size; x += int64(len(buf)) { |
|
|
for x := offset; x < offset+size; x += int64(len(buf)) { |
|
|
|
|
|
|
|
|
|
|
|
v.dataFileAccessLock.RLock() |
|
|
|
|
|
// possibly re-read needle offset if volume is compacted
|
|
|
|
|
|
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision { |
|
|
|
|
|
// the volume is compacted
|
|
|
|
|
|
readOption.IsOutOfRange = false |
|
|
|
|
|
nv, ok = v.nm.Get(n.Id) |
|
|
|
|
|
if !ok || nv.Offset.IsZero() { |
|
|
|
|
|
v.dataFileAccessLock.RUnlock() |
|
|
|
|
|
return ErrorNotFound |
|
|
|
|
|
} |
|
|
|
|
|
actualOffset := nv.Offset.ToActualOffset() |
|
|
|
|
|
if readOption.IsOutOfRange { |
|
|
|
|
|
actualOffset += int64(MaxPossibleVolumeSize) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
count, err := n.ReadNeedleData(v.DataBackend, actualOffset, buf, x) |
|
|
count, err := n.ReadNeedleData(v.DataBackend, actualOffset, buf, x) |
|
|
|
|
|
v.dataFileAccessLock.RUnlock() |
|
|
|
|
|
|
|
|
toWrite := min(count, int(offset+size-x)) |
|
|
toWrite := min(count, int(offset+size-x)) |
|
|
if toWrite > 0 { |
|
|
if toWrite > 0 { |
|
|
crc = crc.Update(buf[0:toWrite]) |
|
|
crc = crc.Update(buf[0:toWrite]) |
|
|