From 20e2ac1add0e93710d54f41c8ba142918c60d620 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 10:04:17 -0700 Subject: [PATCH] filer: store md5 metadata for files uploaded by filer fix https://github.com/chrislusf/seaweedfs/issues/1412 --- weed/filesys/filehandle.go | 9 +++++-- weed/filesys/wfs.go | 2 +- weed/operation/upload_content.go | 26 +++++++------------ weed/server/filer_server_handlers_write.go | 21 +++++---------- .../filer_server_handlers_write_cipher.go | 1 + weed/storage/needle/needle.go | 3 ++- weed/util/bytes.go | 17 ++++++++++-- 7 files changed, 41 insertions(+), 38 deletions(-) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ca35bfd02..b9d224fb2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -9,11 +9,12 @@ import ( "os" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type FileHandle struct { @@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil + // special handling of one chunk md5 + if len(chunks) == 1 { + } + if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 68ad987be..9ef597024 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -82,7 +82,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, }, } - cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, 0755) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index cb129daa2..6fd8a60d1 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,7 +2,6 @@ package operation import ( "bytes" - "crypto/md5" "encoding/json" "errors" "fmt" @@ -23,14 +22,14 @@ import ( ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` - CipherKey []byte `json:"cipherKey,omitempty"` - Mime string `json:"mime,omitempty"` - Gzip uint32 `json:"gzip,omitempty"` - Md5 string `json:"md5,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + ContentMd5 string `json:"contentMd5,omitempty"` } func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { @@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") // Upload sends a POST request to a volume server to upload the content with adjustable compression level func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = util.Md5(data) - } return } // Upload sends a POST request to a volume server to upload the content with fast compression func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { - hash := md5.New() - reader = io.TeeReader(reader, hash) uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) - } return } @@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, errors.New(ret.Error) } ret.ETag = etag + ret.ContentMd5 = resp.Header.Get("Content-MD5") return &ret, nil } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index da66178ce..c7833a85e 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,7 +2,6 @@ package weed_server import ( "context" - "crypto/md5" "encoding/json" "errors" "fmt" @@ -124,12 +123,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) u, _ := url.Parse(urlLocation) - ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) if err != nil { return } - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil { return } @@ -147,7 +146,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { // update metadata in filer store func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { + collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) { stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() start := time.Now() @@ -188,7 +187,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w Collection: collection, TtlSec: ttlSeconds, Mime: ret.Mime, - Md5: md5value, + Md5: util.Base64Md5ToBytes(ret.ContentMd5), }, Chunks: []*filer_pb.FileChunk{{ FileId: fileId, @@ -215,7 +214,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) { stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() start := time.Now() @@ -223,12 +222,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se ret = &operation.UploadResult{} - md5Hash := md5.New() body := r.Body - if r.Method == "PUT" { - // only PUT or large chunked files has Md5 in attributes - body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) - } request := &http.Request{ Method: r.Method, @@ -292,11 +286,8 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se return } } - // use filer calculated md5 ETag, instead of the volume server crc ETag - if r.Method == "PUT" { - md5value = md5Hash.Sum(nil) - } ret.ETag = getEtag(resp) + ret.ContentMd5 = resp.Header.Get("Content-MD5") return } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8413496b8..6ec06d3de 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Collection: collection, TtlSec: ttlSeconds, Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index eb1d9537b..7c7aa3feb 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -48,7 +48,7 @@ func (n *Needle) String() (str string) { return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, md5 string, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) { n = new(Needle) pu, e := ParseUpload(r, sizeLimit) if e != nil { @@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit originalSize = pu.OriginalDataSize n.LastModified = pu.ModifiedTime n.Ttl = pu.Ttl + contentMd5 = pu.ContentMd5 if len(pu.FileName) < 256 { n.Name = []byte(pu.FileName) diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 0650919c0..5076c3e67 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "encoding/base64" "fmt" "io" ) @@ -109,8 +110,20 @@ func HashToInt32(data []byte) (v int32) { return } -func Md5(data []byte) string { +func Base64Encode(data []byte) string { + return base64.StdEncoding.EncodeToString(data) +} + +func Base64Md5(data []byte) string { hash := md5.New() hash.Write(data) - return fmt.Sprintf("%x", hash.Sum(nil)) + return Base64Encode(hash.Sum(nil)) +} + +func Base64Md5ToBytes(contentMd5 string) []byte { + data, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + return nil + } + return data }