Browse Source

volume: support http status 304 for the same file id

pull/942/head
Chris Lu 6 years ago
parent
commit
440111a349
  1. 2
      weed/server/volume_grpc_tail.go
  2. 9
      weed/server/volume_server_handlers_write.go
  3. 4
      weed/storage/store.go
  4. 4
      weed/storage/volume_read_write.go
  5. 2
      weed/storage/volume_vacuum_test.go
  6. 14
      weed/topology/store_replicate.go

2
weed/server/volume_grpc_tail.go

@ -111,7 +111,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
_, err := vs.store.Write(v.Id, n)
_, _, err := vs.store.Write(v.Id, n)
return err return err
}) })

9
weed/server/volume_server_handlers_write.go

@ -41,11 +41,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
_, errorStatus := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated httpStatus := http.StatusCreated
if errorStatus != "" {
if isUnchanged {
httpStatus = http.StatusNotModified
}
if writeError != nil {
httpStatus = http.StatusInternalServerError httpStatus = http.StatusInternalServerError
ret.Error = errorStatus
ret.Error = writeError.Error()
} }
if needle.HasName() { if needle.HasName() {
ret.Name = string(needle.Name) ret.Name = string(needle.Name)

4
weed/storage/store.go

@ -192,7 +192,7 @@ func (s *Store) Close() {
} }
} }
func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err error) {
func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil { if v := s.findVolume(i); v != nil {
if v.readOnly { if v.readOnly {
err = fmt.Errorf("Volume %d is read only", i) err = fmt.Errorf("Volume %d is read only", i)
@ -200,7 +200,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err err
} }
// TODO: count needle size ahead // TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
_, size, err = v.writeNeedle(n)
_, size, isUnchanged, err = v.writeNeedle(n)
} else { } else {
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
} }

4
weed/storage/volume_read_write.go

@ -77,7 +77,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
return return
} }
func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err error) {
func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly { if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name()) err = fmt.Errorf("%s is read-only", v.dataFile.Name())
@ -87,7 +87,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err
defer v.dataFileAccessLock.Unlock() defer v.dataFileAccessLock.Unlock()
if v.isFileUnchanged(n) { if v.isFileUnchanged(n) {
size = n.DataSize size = n.DataSize
glog.V(4).Infof("needle is unchanged!")
isUnchanged = true
return return
} }

2
weed/storage/volume_vacuum_test.go

@ -124,7 +124,7 @@ func TestCompaction(t *testing.T) {
} }
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
n := newRandomNeedle(uint64(i)) n := newRandomNeedle(uint64(i))
_, size, err := v.writeNeedle(n)
_, size, _, err := v.writeNeedle(n)
if err != nil { if err != nil {
t.Fatalf("write file %d: %v", i, err) t.Fatalf("write file %d: %v", i, err)
} }

14
weed/topology/store_replicate.go

@ -20,19 +20,18 @@ import (
func ReplicatedWrite(masterNode string, s *storage.Store, func ReplicatedWrite(masterNode string, s *storage.Store,
volumeId needle.VolumeId, n *needle.Needle, volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, errorStatus string) {
r *http.Request) (size uint32, isUnchanged bool, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, n)
needToReplicate := !s.HasVolume(volumeId)
size, isUnchanged, err = s.Write(volumeId, n)
if err != nil { if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
size = ret
err = fmt.Errorf("failed to write to local disk: %v", err)
return return
} }
needToReplicate := !s.HasVolume(volumeId)
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
if !needToReplicate { if !needToReplicate {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate() needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
@ -75,12 +74,11 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
pairMap, jwt) pairMap, jwt)
return err return err
}); err != nil { }); err != nil {
ret = 0
errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
size = 0
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
} }
} }
} }
size = ret
return return
} }

Loading…
Cancel
Save