diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index c3b66c5e7..2dde5b69c 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, err := vs.store.WriteVolumeNeedle(v.Id, n) + _, err := vs.store.WriteVolumeNeedle(v.Id, n, false) return err }) diff --git a/weed/storage/store.go b/weed/storage/store.go index d6b623e63..64e437add 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -252,7 +252,7 @@ func (s *Store) Close() { } } -func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.IsReadOnly() { err = fmt.Errorf("volume %d is read only", i) @@ -260,7 +260,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchan } // using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n) if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) { - _, _, isUnchanged, err = v.writeNeedle(n) + _, _, isUnchanged, err = v.writeNeedle(n, fsync) } else { err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index ac6154cef..bb0421724 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -63,7 +63,7 @@ func (v *Volume) Destroy() (err error) { return } -func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { +func (v *Volume) writeNeedle(n *needle.Needle, fsync bool) (offset uint64, size uint32, isUnchanged bool, err error) { // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -98,6 +98,11 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { return } + if fsync { + if err = v.DataBackend.Sync(); err != nil { + return + } + } v.lastAppendAtNs = n.AppendAtNs // add to needle map diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 51f04c8b1..5d0fcbe31 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) { } func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) - _, size, _, err := v.writeNeedle(n) + _, size, _, err := v.writeNeedle(n, false) if err != nil { t.Fatalf("write file %d: %v", i, err) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 6f043a601..236f8d773 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -22,8 +22,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume //check JWT jwt := security.GetJwt(r) + // check whether this is a replicated write request var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { + // this is the initial request remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) if err != nil { glog.V(0).Infoln(err) @@ -31,8 +33,14 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume } } + // read fsync value + fsync := false + if r.FormValue("fsync") == "true" { + fsync = true + } + if s.GetVolume(volumeId) != nil { - isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync) if err != nil { err = fmt.Errorf("failed to write to local disk: %v", err) glog.V(0).Infoln(err)