Browse Source

add md5 header when UploadData to replication in ReplicatedWrite (#3881)

pull/3887/head
liubaojiang 2 years ago
committed by GitHub
parent
commit
25471d579a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      weed/operation/upload_content.go
  2. 2
      weed/server/volume_server_handlers_write.go
  3. 7
      weed/topology/store_replicate.go

5
weed/operation/upload_content.go

@ -30,6 +30,7 @@ type UploadOption struct {
PairMap map[string]string PairMap map[string]string
Jwt security.EncodedJwt Jwt security.EncodedJwt
RetryForever bool RetryForever bool
Md5 string
} }
type UploadResult struct { type UploadResult struct {
@ -254,6 +255,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
MimeType: option.MimeType, MimeType: option.MimeType,
PairMap: option.PairMap, PairMap: option.PairMap,
Jwt: option.Jwt, Jwt: option.Jwt,
Md5: option.Md5,
}) })
if uploadResult == nil { if uploadResult == nil {
return return
@ -284,6 +286,9 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
if option.IsInputCompressed { if option.IsInputCompressed {
h.Set("Content-Encoding", "gzip") h.Set("Content-Encoding", "gzip")
} }
if option.Md5 != "" {
h.Set("Content-MD5", option.Md5)
}
file_writer, cp_err := body_writer.CreatePart(h) file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil { if cp_err != nil {

2
weed/server/volume_server_handlers_write.go

@ -45,7 +45,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r)
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
if writeError != nil { if writeError != nil {
writeJsonError(w, r, http.StatusInternalServerError, writeError) writeJsonError(w, r, http.StatusInternalServerError, writeError)
} }

7
weed/topology/store_replicate.go

@ -21,7 +21,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
@ -98,8 +98,13 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
MimeType: string(n.Mime), MimeType: string(n.Mime),
PairMap: pairMap, PairMap: pairMap,
Jwt: jwt, Jwt: jwt,
Md5: contentMd5,
} }
_, err := operation.UploadData(n.Data, uploadOption) _, err := operation.UploadData(n.Data, uploadOption)
if err != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
}
return err return err
}) })
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds()) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())

Loading…
Cancel
Save