Browse Source

Merge pull request #1785 from bingoohuang/master

pull/1790/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
928efc642c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      unmaintained/diff_volume_servers/diff_volume_servers.go
  2. 2
      weed/command/export.go
  3. 2
      weed/server/volume_grpc_copy_incremental.go
  4. 2
      weed/server/volume_grpc_tail.go
  5. 2
      weed/storage/erasure_coding/ec_decoder.go
  6. 4
      weed/storage/erasure_coding/ec_test.go
  7. 2
      weed/storage/erasure_coding/ec_volume.go
  8. 6
      weed/storage/erasure_coding/ec_volume_test.go
  9. 4
      weed/storage/store_ec.go
  10. 2
      weed/storage/types/offset_4bytes.go
  11. 8
      weed/storage/volume_backup.go
  12. 2
      weed/storage/volume_checking.go
  13. 16
      weed/storage/volume_read_write.go
  14. 10
      weed/storage/volume_vacuum.go
  15. 4
      weed/util/chunk_cache/chunk_cache_on_disk.go

2
unmaintained/diff_volume_servers/diff_volume_servers.go

@ -168,7 +168,7 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6
size: size, size: size,
} }
} }
if actual := offset.ToAcutalOffset(); actual > maxOffset {
if actual := offset.ToActualOffset(); actual > maxOffset {
maxOffset = actual maxOffset = actual
} }
return nil return nil

2
weed/command/export.go

@ -113,7 +113,7 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
nv, ok := needleMap.Get(n.Id) nv, ok := needleMap.Get(n.Id)
glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v", glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv) n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
if *showDeleted && n.Size > 0 || ok && nv.Size.IsValid() && nv.Offset.ToAcutalOffset() == offset {
if *showDeleted && n.Size > 0 || ok && nv.Size.IsValid() && nv.Offset.ToActualOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) { if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d", glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix) n.LastModified, newerThanUnix)

2
weed/server/volume_grpc_copy_incremental.go

@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
return nil return nil
} }
startOffset := foundOffset.ToAcutalOffset()
startOffset := foundOffset.ToActualOffset()
buf := make([]byte, 1024*1024*2) buf := make([]byte, 1024*1024*2)
return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream) return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)

2
weed/server/volume_grpc_tail.go

@ -72,7 +72,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
stream: stream, stream: stream,
} }
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
return scanner.lastProcessedTimestampNs, err return scanner.lastProcessedTimestampNs, err

2
weed/storage/erasure_coding/ec_decoder.go

@ -58,7 +58,7 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64,
return nil return nil
} }
entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset { if datSize < entryStopOffset {
datSize = entryStopOffset datSize = entryStopOffset
} }

4
weed/storage/erasure_coding/ec_test.go

@ -93,7 +93,7 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) { func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
data := make([]byte, size) data := make([]byte, size)
n, err := datFile.ReadAt(data, offset.ToAcutalOffset())
n, err := datFile.ReadAt(data, offset.ToActualOffset())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to ReadAt dat file: %v", err) return nil, fmt.Errorf("failed to ReadAt dat file: %v", err)
} }
@ -105,7 +105,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte
func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) { func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size)
for i, interval := range intervals { for i, interval := range intervals {
if d, e := readOneInterval(interval, ecFiles); e != nil { if d, e := readOneInterval(interval, ecFiles); e != nil {

2
weed/storage/erasure_coding/ec_volume.go

@ -211,7 +211,7 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0] shard := ev.Shards[0]
// calculate the locations in the ec shards // calculate the locations in the ec shards
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version)))
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return return
} }

6
weed/storage/erasure_coding/ec_volume_test.go

@ -35,16 +35,16 @@ func TestPositioning(t *testing.T) {
needleId, _ := types.ParseNeedleId(test.needleId) needleId, _ := types.ParseNeedleId(test.needleId)
offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size)
fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
} }
needleId, _ := types.ParseNeedleId("0f087622") needleId, _ := types.ParseNeedleId("0f087622")
offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size)
fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3 var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
for _, interval := range intervals { for _, interval := range intervals {
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)

4
weed/storage/store_ec.go

@ -131,7 +131,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted return 0, ErrorDeleted
} }
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 { if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
@ -144,7 +144,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted return 0, ErrorDeleted
} }
err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
if err != nil { if err != nil {
return 0, fmt.Errorf("readbytes: %v", err) return 0, fmt.Errorf("readbytes: %v", err)
} }

2
weed/storage/types/offset_4bytes.go

@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset {
return Uint32ToOffset(smaller) return Uint32ToOffset(smaller)
} }
func (offset Offset) ToAcutalOffset() (actualOffset int64) {
func (offset Offset) ToActualOffset() (actualOffset int64) {
return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize) return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize)
} }

8
weed/storage/volume_backup.go

@ -154,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset())
if err != nil { if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err)
return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err)
} }
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength)
_, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength)
if err != nil { if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err)
} }
return n.AppendAtNs, nil return n.AppendAtNs, nil

2
weed/storage/volume_checking.go

@ -58,7 +58,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
} }
} else { } else {
if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil {
return lastAppendAtNs, err return lastAppendAtNs, err
} }
} }

16
weed/storage/volume_read_write.go

@ -41,9 +41,9 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size.IsValid() { if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
oldNeedle := new(needle.Needle) oldNeedle := new(needle.Needle)
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
if err != nil { if err != nil {
glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err)
glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err)
return false return false
} }
if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
@ -113,7 +113,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
// check whether existing needle cookie matches // check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok { if ok {
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
if existingNeedleReadErr != nil { if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return return
@ -136,7 +136,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
v.lastAppendAtNs = n.AppendAtNs v.lastAppendAtNs = n.AppendAtNs
// add to needle map // add to needle map
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
} }
@ -179,7 +179,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
// check whether existing needle cookie matches // check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok { if ok {
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
if existingNeedleReadErr != nil { if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return return
@ -201,7 +201,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
v.lastAppendAtNs = n.AppendAtNs v.lastAppendAtNs = n.AppendAtNs
// add to needle map // add to needle map
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
} }
@ -303,9 +303,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if readSize == 0 { if readSize == 0 {
return 0, nil return 0, nil
} }
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 { if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
} }
v.checkReadWriteError(err) v.checkReadWriteError(err)
if err != nil { if err != nil {

10
weed/storage/volume_vacuum.go

@ -280,11 +280,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
//updated needle //updated needle
if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() { if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() {
//even the needle cache in memory is hit, the need_bytes is correct //even the needle cache in memory is hit, the need_bytes is correct
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size)
var needleBytes []byte var needleBytes []byte
needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version())
if err != nil { if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err)
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
} }
dst.Write(needleBytes) dst.Write(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
@ -339,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
} }
nv, ok := scanner.v.nm.Get(n.Id) nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
@ -422,7 +422,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
} }
n := new(needle.Needle) n := new(needle.Needle)
err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version)
err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil { if err != nil {
return nil return nil
} }

4
weed/util/chunk_cache/chunk_cache_on_disk.go

@ -107,9 +107,9 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
return nil, storage.ErrorNotFound return nil, storage.ErrorNotFound
} }
data := make([]byte, nv.Size) data := make([]byte, nv.Size)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil {
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v", return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr)
v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr)
} else { } else {
if readSize != int(nv.Size) { if readSize != int(nv.Size) {
return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)

Loading…
Cancel
Save