Browse Source

filer: store md5 metadata for files uploaded by filer

fix https://github.com/chrislusf/seaweedfs/issues/1412
random_access_file
Chris Lu 4 years ago
parent
commit
20e2ac1add
  1. 9
      weed/filesys/filehandle.go
  2. 2
      weed/filesys/wfs.go
  3. 26
      weed/operation/upload_content.go
  4. 21
      weed/server/filer_server_handlers_write.go
  5. 1
      weed/server/filer_server_handlers_write_cipher.go
  6. 3
      weed/storage/needle/needle.go
  7. 17
      weed/util/bytes.go

9
weed/filesys/filehandle.go

@ -9,11 +9,12 @@ import (
"os" "os"
"time" "time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
) )
type FileHandle struct { type FileHandle struct {
@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Chunks = chunks fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil // fh.f.entryViewCache = nil
// special handling of one chunk md5
if len(chunks) == 1 {
}
if err := filer_pb.CreateEntry(client, request); err != nil { if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)

2
weed/filesys/wfs.go

@ -82,7 +82,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}, },
}, },
} }
cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
cacheDir := path.Join(option.CacheDir, cacheUniqueId) cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 { if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, 0755) os.MkdirAll(cacheDir, 0755)

26
weed/operation/upload_content.go

@ -2,7 +2,6 @@ package operation
import ( import (
"bytes" "bytes"
"crypto/md5"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -23,14 +22,14 @@ import (
) )
type UploadResult struct { type UploadResult struct {
Name string `json:"name,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
ETag string `json:"eTag,omitempty"`
CipherKey []byte `json:"cipherKey,omitempty"`
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
Md5 string `json:"md5,omitempty"`
Name string `json:"name,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
ETag string `json:"eTag,omitempty"`
CipherKey []byte `json:"cipherKey,omitempty"`
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
ContentMd5 string `json:"contentMd5,omitempty"`
} }
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level // Upload sends a POST request to a volume server to upload the content with adjustable compression level
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if uploadResult != nil {
uploadResult.Md5 = util.Md5(data)
}
return return
} }
// Upload sends a POST request to a volume server to upload the content with fast compression // Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
hash := md5.New()
reader = io.TeeReader(reader, hash)
uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
if uploadResult != nil {
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
}
return return
} }
@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, errors.New(ret.Error) return nil, errors.New(ret.Error)
} }
ret.ETag = etag ret.ETag = etag
ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil return &ret, nil
} }

21
weed/server/filer_server_handlers_write.go

@ -2,7 +2,6 @@ package weed_server
import ( import (
"context" "context"
"crypto/md5"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -124,12 +123,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
u, _ := url.Parse(urlLocation) u, _ := url.Parse(urlLocation)
ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
if err != nil { if err != nil {
return return
} }
if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil {
return return
} }
@ -147,7 +146,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// update metadata in filer store // update metadata in filer store
func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) {
stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
start := time.Now() start := time.Now()
@ -188,7 +187,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
Collection: collection, Collection: collection,
TtlSec: ttlSeconds, TtlSec: ttlSeconds,
Mime: ret.Mime, Mime: ret.Mime,
Md5: md5value,
Md5: util.Base64Md5ToBytes(ret.ContentMd5),
}, },
Chunks: []*filer_pb.FileChunk{{ Chunks: []*filer_pb.FileChunk{{
FileId: fileId, FileId: fileId,
@ -215,7 +214,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
} }
// send request to volume server // send request to volume server
func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) {
stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
start := time.Now() start := time.Now()
@ -223,12 +222,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
ret = &operation.UploadResult{} ret = &operation.UploadResult{}
md5Hash := md5.New()
body := r.Body body := r.Body
if r.Method == "PUT" {
// only PUT or large chunked files has Md5 in attributes
body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
}
request := &http.Request{ request := &http.Request{
Method: r.Method, Method: r.Method,
@ -292,11 +286,8 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
return return
} }
} }
// use filer calculated md5 ETag, instead of the volume server crc ETag
if r.Method == "PUT" {
md5value = md5Hash.Sum(nil)
}
ret.ETag = getEtag(resp) ret.ETag = getEtag(resp)
ret.ContentMd5 = resp.Header.Get("Content-MD5")
return return
} }

1
weed/server/filer_server_handlers_write_cipher.go

@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Collection: collection, Collection: collection,
TtlSec: ttlSeconds, TtlSec: ttlSeconds,
Mime: pu.MimeType, Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
}, },
Chunks: fileChunks, Chunks: fileChunks,
} }

3
weed/storage/needle/needle.go

@ -48,7 +48,7 @@ func (n *Needle) String() (str string) {
return return
} }
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, md5 string, e error) {
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle) n = new(Needle)
pu, e := ParseUpload(r, sizeLimit) pu, e := ParseUpload(r, sizeLimit)
if e != nil { if e != nil {
@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
originalSize = pu.OriginalDataSize originalSize = pu.OriginalDataSize
n.LastModified = pu.ModifiedTime n.LastModified = pu.ModifiedTime
n.Ttl = pu.Ttl n.Ttl = pu.Ttl
contentMd5 = pu.ContentMd5
if len(pu.FileName) < 256 { if len(pu.FileName) < 256 {
n.Name = []byte(pu.FileName) n.Name = []byte(pu.FileName)

17
weed/util/bytes.go

@ -2,6 +2,7 @@ package util
import ( import (
"crypto/md5" "crypto/md5"
"encoding/base64"
"fmt" "fmt"
"io" "io"
) )
@ -109,8 +110,20 @@ func HashToInt32(data []byte) (v int32) {
return return
} }
func Md5(data []byte) string {
func Base64Encode(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}
func Base64Md5(data []byte) string {
hash := md5.New() hash := md5.New()
hash.Write(data) hash.Write(data)
return fmt.Sprintf("%x", hash.Sum(nil))
return Base64Encode(hash.Sum(nil))
}
func Base64Md5ToBytes(contentMd5 string) []byte {
data, err := base64.StdEncoding.DecodeString(contentMd5)
if err != nil {
return nil
}
return data
} }
Loading…
Cancel
Save