From 25471d579adaf85b579c7a989786e26fbbdc7872 Mon Sep 17 00:00:00 2001 From: liubaojiang <1838095916@qq.com> Date: Wed, 19 Oct 2022 16:34:14 +0800 Subject: [PATCH] add md5 header when UploadData to replication in ReplicatedWrite (#3881) --- weed/operation/upload_content.go | 5 +++++ weed/server/volume_server_handlers_write.go | 2 +- weed/topology/store_replicate.go | 7 ++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index ee72d1285..c9b15da69 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -30,6 +30,7 @@ type UploadOption struct { PairMap map[string]string Jwt security.EncodedJwt RetryForever bool + Md5 string } type UploadResult struct { @@ -254,6 +255,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult MimeType: option.MimeType, PairMap: option.PairMap, Jwt: option.Jwt, + Md5: option.Md5, }) if uploadResult == nil { return @@ -284,6 +286,9 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize if option.IsInputCompressed { h.Set("Content-Encoding", "gzip") } + if option.Md5 != "" { + h.Set("Content-MD5", option.Md5) + } file_writer, cp_err := body_writer.CreatePart(h) if cp_err != nil { diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index ce639c43c..49848ddf3 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -45,7 +45,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r) + isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5) if writeError != nil { writeJsonError(w, r, http.StatusInternalServerError, writeError) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 1f6fc0568..d11a47f3b 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -21,7 +21,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { +func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -98,8 +98,13 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt MimeType: string(n.Mime), PairMap: pairMap, Jwt: jwt, + Md5: contentMd5, } + _, err := operation.UploadData(n.Data, uploadOption) + if err != nil { + glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) + } return err }) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())