From 86a7c562751fc89d52da30425f1513b4553dfa8c Mon Sep 17 00:00:00 2001 From: sparklxb Date: Sun, 8 Jan 2017 09:16:29 +0800 Subject: [PATCH] support additional header name-value pairs --- weed/operation/submit.go | 6 +-- weed/operation/upload_content.go | 28 ++++++++++--- weed/server/common.go | 4 +- weed/server/filer_server_handlers_write.go | 9 ++-- weed/server/volume_server_handlers_read.go | 13 ++++++ weed/storage/needle.go | 48 +++++++++++++++------- weed/storage/needle_read_write.go | 33 ++++++++++++++- weed/storage/store.go | 1 + weed/topology/store_replicate.go | 2 +- 9 files changed, 113 insertions(+), 31 deletions(-) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 1de6b544a..75d5afbde 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -155,7 +155,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret cm.DeleteChunks(master) } } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) + ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -180,7 +180,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "application/octet-stream", jwt) + "application/octet-stream", nil, jwt) if uploadError != nil { return fid, 0, uploadError } @@ -198,6 +198,6 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt) + _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a87784cad..b5784322a 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -36,13 +36,13 @@ func init() { var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { +func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairs []byte, jwt security.EncodedJwt) (*UploadResult, error) { return upload_content(uploadUrl, func(w io.Writer) (err error) { _, err = io.Copy(w, reader) return - }, filename, isGzipped, mtype, jwt) + }, filename, isGzipped, mtype, pairs, jwt) } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) { +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairs []byte, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) h := make(textproto.MIMEHeader) @@ -59,6 +59,14 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error if jwt != "" { h.Set("Authorization", "BEARER "+string(jwt)) } + pairMap := make(map[string]string) + if len(pairs) != 0 { + err := json.Unmarshal(pairs, &pairMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) + } + } + file_writer, cp_err := body_writer.CreatePart(h) if cp_err != nil { glog.V(0).Infoln("error creating form file", cp_err.Error()) @@ -73,7 +81,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error glog.V(0).Infoln("error closing body", err) return nil, err } - resp, post_err := client.Post(uploadUrl, content_type, body_buf) + + req, postErr := http.NewRequest("POST", uploadUrl, body_buf) + if postErr != nil { + glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error()) + return nil, postErr + } + req.Header.Set("Content-Type", content_type) + for k, v := range pairMap { + req.Header.Set(k, v) + } + resp, post_err := client.Do(req) if post_err != nil { glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) return nil, post_err @@ -86,7 +104,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error var ret UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body)) + glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) return nil, unmarshal_err } if ret.Error != "" { diff --git a/weed/server/common.go b/weed/server/common.go index dcd31f823..3c9e3014f 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r) + fname, data, mimeType, pairs, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return @@ -112,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairs, jwt) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 464cb81ef..aed393dcd 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -15,13 +15,14 @@ import ( "net/url" "strings" + "path" + "strconv" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "path" - "strconv" ) type FilerPostResult struct { @@ -112,7 +113,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re if r.Method == "PUT" { buf, _ := ioutil.ReadAll(r.Body) r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r) if pe != nil { glog.V(0).Infoln("failing to parse post body", pe.Error()) writeJsonError(w, r, http.StatusInternalServerError, pe) @@ -521,7 +522,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht err = nil ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) - uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId)) if uploadResult != nil { glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 6944e79e0..2a273c595 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "encoding/json" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" @@ -94,6 +96,17 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } w.Header().Set("Etag", etag) + if n.HasPairs() { + pairMap := make(map[string]string) + err = json.Unmarshal(n.Pairs, &pairMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) + } + for k, v := range pairMap { + w.Header().Set(k, v) + } + } + if vs.tryHandleChunkedFile(n, filename, w, r) { return } diff --git a/weed/storage/needle.go b/weed/storage/needle.go index 1d306395e..ea013e290 100644 --- a/weed/storage/needle.go +++ b/weed/storage/needle.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "encoding/json" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" @@ -22,6 +24,7 @@ const ( NeedleChecksumSize = 4 MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 TombstoneFileSize = math.MaxUint32 + PairNamePrefix = "Seaweed-" ) /* @@ -40,6 +43,8 @@ type Needle struct { Name []byte `comment:"maximum 256 characters"` //version2 MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 + PairsSize uint16 //version2 + Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"` LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk Ttl *TTL @@ -55,8 +60,17 @@ func (n *Needle) String() (str string) { } func ParseUpload(r *http.Request) ( - fileName string, data []byte, mimeType string, isGzipped bool, + fileName string, data []byte, mimeType string, pairs []byte, isGzipped bool, modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { + pairMap := make(map[string]string) + for k, v := range r.Header { + if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { + pairMap[k] = v[0] + } + } + if len(pairMap) != 0 { + pairs, _ = json.Marshal(pairMap) + } form, fe := r.MultipartReader() if fe != nil { glog.V(0).Infoln("MultipartReader [ERROR]", fe) @@ -109,19 +123,19 @@ func ParseUpload(r *http.Request) ( } isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) - isGzipped = false + dotIndex := strings.LastIndex(fileName, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = strings.ToLower(fileName[dotIndex:]) + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && mtype != contentType { + mimeType = contentType //only return mime type if not deductable + mtype = contentType + } + if !isChunkedFile { - dotIndex := strings.LastIndex(fileName, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = strings.ToLower(fileName[dotIndex:]) - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && mtype != contentType { - mimeType = contentType //only return mime type if not deductable - mtype = contentType - } if part.Header.Get("Content-Encoding") == "gzip" { isGzipped = true } else if operation.IsGzippable(ext, mtype) { @@ -144,9 +158,10 @@ func ParseUpload(r *http.Request) ( return } func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { + var pair []byte fname, mimeType, isGzipped, isChunkedFile := "", "", false, false n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) + fname, n.Data, mimeType, pair, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) if e != nil { return } @@ -158,6 +173,11 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { n.Mime = []byte(mimeType) n.SetHasMime() } + if len(pair) < 65536 { + n.Pairs = pair + n.PairsSize = uint16(len(pair)) + n.SetHasPairs() + } if isGzipped { n.SetGzipped() } diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 8baa325df..ff43effb3 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -16,6 +16,7 @@ const ( FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 FlagHasTtl = 0x10 + FlagHasPairs = 0x20 FlagIsChunkManifest = 0x80 LastModifiedBytesLength = 5 TtlBytesLength = 2 @@ -78,6 +79,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i if n.HasTtl() { n.Size = n.Size + TtlBytesLength } + if n.HasPairs() { + n.Size += 2 + uint32(n.PairsSize) + } } else { n.Size = 0 } @@ -128,6 +132,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i return } } + if n.HasPairs() { + util.Uint16toBytes(header[0:2], n.PairsSize) + if _, err = w.Write(header[0:2]); err != nil { + return + } + if _, err = w.Write(n.Pairs); err != nil { + return + } + } } padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) @@ -141,8 +154,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i } func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { - padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) - readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding + NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize + padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize) + readSize := NeedleWithoutPaddingSize + padding return getBytesForFileBlock(r, offset, int(readSize)) } @@ -213,6 +227,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) index = index + TtlBytesLength } + if index < lenBytes && n.HasPairs() { + n.PairsSize = util.BytesToUint16(bytes[index : index+2]) + index += 2 + end := index + int(n.PairsSize) + n.Pairs = bytes[index:end] + index = end + } } func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { @@ -296,3 +317,11 @@ func (n *Needle) IsChunkedManifest() bool { func (n *Needle) SetIsChunkManifest() { n.Flags = n.Flags | FlagIsChunkManifest } + +func (n *Needle) HasPairs() bool { + return n.Flags&FlagHasPairs != 0 +} + +func (n *Needle) SetHasPairs() { + n.Flags = n.Flags | FlagHasPairs +} diff --git a/weed/storage/store.go b/weed/storage/store.go index 614c87ace..37a3904bd 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -303,6 +303,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { err = fmt.Errorf("Volume %d is read only", i) return } + // TODO: count needle size ahead if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { size, err = v.writeNeedle(n) } else { diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index be5777167..e76771140 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -57,7 +57,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, u.RawQuery = q.Encode() _, err := operation.Upload(u.String(), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), - jwt) + needle.Pairs, jwt) return err }); err != nil { ret = 0