diff --git a/go/storage/volume.go b/go/storage/volume.go index 6ef72cfcd..1988c9aac 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -123,16 +123,21 @@ func (v *Volume) Size() int64 { glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) return -1 } + +// Close cleanly shuts down this volume func (v *Volume) Close() { v.accessLock.Lock() defer v.accessLock.Unlock() v.nm.Close() _ = v.dataFile.Close() } + func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } +// isFileUnchanged checks whether this needle to write is same as last one. +// It requires serialized access in the same volume. func (v *Volume) isFileUnchanged(n *Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && nv.Offset > 0 { @@ -146,6 +151,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { return false } +// Destroy removes everything related to this volume func (v *Volume) Destroy() (err error) { if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) @@ -223,7 +229,7 @@ func (v *Volume) delete(n *Needle) (uint32, error) { if _, err := v.dataFile.Seek(0, 2); err != nil { return size, err } - n.Data = make([]byte, 0) + n.Data = nil _, err := n.Append(v.dataFile, v.Version()) return size, err } @@ -304,20 +310,36 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, for n != nil { if readNeedleBody { if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { - err = fmt.Errorf("cannot read needle body: %v", err) - return + glog.V(0).Infof("cannot read needle body: %v", err) + //err = fmt.Errorf("cannot read needle body: %v", err) + //return + } + if n.DataSize >= n.Size { + // this should come from a bug reported on #87 and #93 + // fixed in v0.69 + // remove this whole "if" clause later, long after 0.69 + oldRest, oldSize := rest, n.Size + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + n.Size = 0 + rest = n.Size + NeedleChecksumSize + padding + if rest%NeedlePaddingSize != 0 { + rest += (NeedlePaddingSize - rest%NeedlePaddingSize) + } + glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n) } } if err = visitNeedle(n, offset); err != nil { - return + glog.V(0).Infof("visit needle error: %v", err) } offset += int64(NeedleHeaderSize) + int64(rest) + glog.V(4).Infof("==> new entry offset %d", offset) if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil { if err == io.EOF { return nil } return fmt.Errorf("cannot read needle header: %v", err) } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) } return