Browse Source

support additional header name-value pairs

pull/432/head
sparklxb 8 years ago
parent
commit
86a7c56275
  1. 6
      weed/operation/submit.go
  2. 28
      weed/operation/upload_content.go
  3. 4
      weed/server/common.go
  4. 9
      weed/server/filer_server_handlers_write.go
  5. 13
      weed/server/volume_server_handlers_read.go
  6. 48
      weed/storage/needle.go
  7. 33
      weed/storage/needle_read_write.go
  8. 1
      weed/storage/store.go
  9. 2
      weed/topology/store_replicate.go

6
weed/operation/submit.go

@ -155,7 +155,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
cm.DeleteChunks(master) cm.DeleteChunks(master)
} }
} else { } else {
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
if e != nil { if e != nil {
return 0, e return 0, e
} }
@ -180,7 +180,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
"application/octet-stream", jwt)
"application/octet-stream", nil, jwt)
if uploadError != nil { if uploadError != nil {
return fid, 0, uploadError return fid, 0, uploadError
} }
@ -198,6 +198,6 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
q := u.Query() q := u.Query()
q.Set("cm", "true") q.Set("cm", "true")
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
return e return e
} }

28
weed/operation/upload_content.go

@ -36,13 +36,13 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairs []byte, jwt security.EncodedJwt) (*UploadResult, error) {
return upload_content(uploadUrl, func(w io.Writer) (err error) { return upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = io.Copy(w, reader) _, err = io.Copy(w, reader)
return return
}, filename, isGzipped, mtype, jwt)
}, filename, isGzipped, mtype, pairs, jwt)
} }
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairs []byte, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("") body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf) body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader) h := make(textproto.MIMEHeader)
@ -59,6 +59,14 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" { if jwt != "" {
h.Set("Authorization", "BEARER "+string(jwt)) h.Set("Authorization", "BEARER "+string(jwt))
} }
pairMap := make(map[string]string)
if len(pairs) != 0 {
err := json.Unmarshal(pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
}
file_writer, cp_err := body_writer.CreatePart(h) file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil { if cp_err != nil {
glog.V(0).Infoln("error creating form file", cp_err.Error()) glog.V(0).Infoln("error creating form file", cp_err.Error())
@ -73,7 +81,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
glog.V(0).Infoln("error closing body", err) glog.V(0).Infoln("error closing body", err)
return nil, err return nil, err
} }
resp, post_err := client.Post(uploadUrl, content_type, body_buf)
req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
if postErr != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
return nil, postErr
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
req.Header.Set(k, v)
}
resp, post_err := client.Do(req)
if post_err != nil { if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err return nil, post_err
@ -86,7 +104,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
var ret UploadResult var ret UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret) unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil { if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
return nil, unmarshal_err return nil, unmarshal_err
} }
if ret.Error != "" { if ret.Error != "" {

4
weed/server/common.go

@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
} }
debug("parsing upload file...") debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
fname, data, mimeType, pairs, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
if pe != nil { if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe) writeJsonError(w, r, http.StatusBadRequest, pe)
return return
@ -112,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
} }
debug("upload file to store", url) debug("upload file to store", url)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairs, jwt)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return

9
weed/server/filer_server_handlers_write.go

@ -15,13 +15,14 @@ import (
"net/url" "net/url"
"strings" "strings"
"path"
"strconv"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"path"
"strconv"
) )
type FilerPostResult struct { type FilerPostResult struct {
@ -112,7 +113,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re
if r.Method == "PUT" { if r.Method == "PUT" {
buf, _ := ioutil.ReadAll(r.Body) buf, _ := ioutil.ReadAll(r.Body)
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
if pe != nil { if pe != nil {
glog.V(0).Infoln("failing to parse post body", pe.Error()) glog.V(0).Infoln("failing to parse post body", pe.Error())
writeJsonError(w, r, http.StatusInternalServerError, pe) writeJsonError(w, r, http.StatusInternalServerError, pe)
@ -521,7 +522,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
err = nil err = nil
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
if uploadResult != nil { if uploadResult != nil {
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
} }

13
weed/server/volume_server_handlers_read.go

@ -12,6 +12,8 @@ import (
"strings" "strings"
"time" "time"
"encoding/json"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
@ -94,6 +96,17 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} }
w.Header().Set("Etag", etag) w.Header().Set("Etag", etag)
if n.HasPairs() {
pairMap := make(map[string]string)
err = json.Unmarshal(n.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range pairMap {
w.Header().Set(k, v)
}
}
if vs.tryHandleChunkedFile(n, filename, w, r) { if vs.tryHandleChunkedFile(n, filename, w, r) {
return return
} }

48
weed/storage/needle.go

@ -11,6 +11,8 @@ import (
"strings" "strings"
"time" "time"
"encoding/json"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
@ -22,6 +24,7 @@ const (
NeedleChecksumSize = 4 NeedleChecksumSize = 4
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
TombstoneFileSize = math.MaxUint32 TombstoneFileSize = math.MaxUint32
PairNamePrefix = "Seaweed-"
) )
/* /*
@ -40,6 +43,8 @@ type Needle struct {
Name []byte `comment:"maximum 256 characters"` //version2 Name []byte `comment:"maximum 256 characters"` //version2
MimeSize uint8 //version2 MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2 Mime []byte `comment:"maximum 256 characters"` //version2
PairsSize uint16 //version2
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL Ttl *TTL
@ -55,8 +60,17 @@ func (n *Needle) String() (str string) {
} }
func ParseUpload(r *http.Request) ( func ParseUpload(r *http.Request) (
fileName string, data []byte, mimeType string, isGzipped bool,
fileName string, data []byte, mimeType string, pairs []byte, isGzipped bool,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
pairMap := make(map[string]string)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
pairMap[k] = v[0]
}
}
if len(pairMap) != 0 {
pairs, _ = json.Marshal(pairMap)
}
form, fe := r.MultipartReader() form, fe := r.MultipartReader()
if fe != nil { if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe) glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@ -109,19 +123,19 @@ func ParseUpload(r *http.Request) (
} }
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
isGzipped = false
dotIndex := strings.LastIndex(fileName, ".")
ext, mtype := "", ""
if dotIndex > 0 {
ext = strings.ToLower(fileName[dotIndex:])
mtype = mime.TypeByExtension(ext)
}
contentType := part.Header.Get("Content-Type")
if contentType != "" && mtype != contentType {
mimeType = contentType //only return mime type if not deductable
mtype = contentType
}
if !isChunkedFile { if !isChunkedFile {
dotIndex := strings.LastIndex(fileName, ".")
ext, mtype := "", ""
if dotIndex > 0 {
ext = strings.ToLower(fileName[dotIndex:])
mtype = mime.TypeByExtension(ext)
}
contentType := part.Header.Get("Content-Type")
if contentType != "" && mtype != contentType {
mimeType = contentType //only return mime type if not deductable
mtype = contentType
}
if part.Header.Get("Content-Encoding") == "gzip" { if part.Header.Get("Content-Encoding") == "gzip" {
isGzipped = true isGzipped = true
} else if operation.IsGzippable(ext, mtype) { } else if operation.IsGzippable(ext, mtype) {
@ -144,9 +158,10 @@ func ParseUpload(r *http.Request) (
return return
} }
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
var pair []byte
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle) n = new(Needle)
fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
fname, n.Data, mimeType, pair, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
if e != nil { if e != nil {
return return
} }
@ -158,6 +173,11 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.Mime = []byte(mimeType) n.Mime = []byte(mimeType)
n.SetHasMime() n.SetHasMime()
} }
if len(pair) < 65536 {
n.Pairs = pair
n.PairsSize = uint16(len(pair))
n.SetHasPairs()
}
if isGzipped { if isGzipped {
n.SetGzipped() n.SetGzipped()
} }

33
weed/storage/needle_read_write.go

@ -16,6 +16,7 @@ const (
FlagHasMime = 0x04 FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08 FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10 FlagHasTtl = 0x10
FlagHasPairs = 0x20
FlagIsChunkManifest = 0x80 FlagIsChunkManifest = 0x80
LastModifiedBytesLength = 5 LastModifiedBytesLength = 5
TtlBytesLength = 2 TtlBytesLength = 2
@ -78,6 +79,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
if n.HasTtl() { if n.HasTtl() {
n.Size = n.Size + TtlBytesLength n.Size = n.Size + TtlBytesLength
} }
if n.HasPairs() {
n.Size += 2 + uint32(n.PairsSize)
}
} else { } else {
n.Size = 0 n.Size = 0
} }
@ -128,6 +132,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
return return
} }
} }
if n.HasPairs() {
util.Uint16toBytes(header[0:2], n.PairsSize)
if _, err = w.Write(header[0:2]); err != nil {
return
}
if _, err = w.Write(n.Pairs); err != nil {
return
}
}
} }
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@ -141,8 +154,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
} }
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize
padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize)
readSize := NeedleWithoutPaddingSize + padding
return getBytesForFileBlock(r, offset, int(readSize)) return getBytesForFileBlock(r, offset, int(readSize))
} }
@ -213,6 +227,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
index = index + TtlBytesLength index = index + TtlBytesLength
} }
if index < lenBytes && n.HasPairs() {
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
index += 2
end := index + int(n.PairsSize)
n.Pairs = bytes[index:end]
index = end
}
} }
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@ -296,3 +317,11 @@ func (n *Needle) IsChunkedManifest() bool {
func (n *Needle) SetIsChunkManifest() { func (n *Needle) SetIsChunkManifest() {
n.Flags = n.Flags | FlagIsChunkManifest n.Flags = n.Flags | FlagIsChunkManifest
} }
func (n *Needle) HasPairs() bool {
return n.Flags&FlagHasPairs != 0
}
func (n *Needle) SetHasPairs() {
n.Flags = n.Flags | FlagHasPairs
}

1
weed/storage/store.go

@ -303,6 +303,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
err = fmt.Errorf("Volume %d is read only", i) err = fmt.Errorf("Volume %d is read only", i)
return return
} }
// TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
size, err = v.writeNeedle(n) size, err = v.writeNeedle(n)
} else { } else {

2
weed/topology/store_replicate.go

@ -57,7 +57,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
_, err := operation.Upload(u.String(), _, err := operation.Upload(u.String(),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
needle.Pairs, jwt)
return err return err
}); err != nil { }); err != nil {
ret = 0 ret = 0

Loading…
Cancel
Save