From 9383c91eb1ce9f20cad8706a70cec16ab15c453b Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 5 Jan 2019 19:52:17 -0800
Subject: [PATCH] wait to read again if the volume is compacting

---
 weed/storage/volume.go            | 1 +
 weed/storage/volume_read_write.go | 8 +++++++-
 weed/storage/volume_vacuum.go     | 2 ++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 17917cea8..8ca6d3ffa 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -16,6 +16,7 @@ type Volume struct {
 	Collection    string
 	dataFile      *os.File
 	nm            NeedleMapper
+	compactingWg  sync.WaitGroup
 	needleMapKind NeedleMapType
 	readOnly      bool
 
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index e90e26144..8f628c178 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -12,6 +12,8 @@ import (
 	. "github.com/chrislusf/seaweedfs/weed/storage/types"
 )
 
+var ErrorNotFound = errors.New("not found")
+
 // isFileUnchanged checks whether this needle to write is same as last one.
 // It requires serialized access in the same volume.
 func (v *Volume) isFileUnchanged(n *Needle) bool {
@@ -134,7 +136,11 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
 func (v *Volume) readNeedle(n *Needle) (int, error) {
 	nv, ok := v.nm.Get(n.Id)
 	if !ok || nv.Offset == 0 {
-		return -1, errors.New("Not Found")
+		v.compactingWg.Wait()
+		nv, ok = v.nm.Get(n.Id)
+		if !ok || nv.Offset == 0 {
+			return -1, ErrorNotFound
+		}
 	}
 	if nv.Size == TombstoneFileSize {
 		return -1, errors.New("Already Deleted")
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 5e0b19b66..642114d01 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -43,6 +43,8 @@ func (v *Volume) commitCompact() error {
 	v.dataFileAccessLock.Lock()
 	defer v.dataFileAccessLock.Unlock()
 	glog.V(3).Infof("Got volume %d committing lock...", v.Id)
+	v.compactingWg.Add(1)
+	defer v.compactingWg.Done()
 	v.nm.Close()
 	if err := v.dataFile.Close(); err != nil {
 		glog.V(0).Infof("fail to close volume %d", v.Id)