From 0e1434d1c75b4e8e2536c64d169e89f392405653 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 10 Feb 2020 13:43:41 -0800 Subject: [PATCH] add http client for streaming upload a bit lowver perofrmance. --- weed/operation/assign_file_id.go | 2 +- weed/operation/upload_content.go | 145 +++++++++++++++++-------------- weed/util/http_util.go | 31 +++++-- weed/util/http_util_test.go | 2 +- 4 files changed, 107 insertions(+), 73 deletions(-) diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 3f3bb13e0..8db9e0367 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -95,7 +95,7 @@ func LookupJwt(master string, fileId string) security.EncodedJwt { tokenStr := "" lookupUrl := fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId) - err := util.Head(lookupUrl, func(header fasthttp.ResponseHeader) { + err := util.Head(lookupUrl, func(header *fasthttp.ResponseHeader) { bearer := header.Peek("Authorization") if len(bearer) > 7 && string(bytes.ToUpper(bearer[0:6])) == "BEARER" { tokenStr = string(bearer[7:]) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index c387d0230..117da1c18 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -1,14 +1,14 @@ package operation import ( - "bytes" + "bufio" "compress/flate" "compress/gzip" + "crypto/rand" "encoding/json" "errors" "fmt" "io" - "io/ioutil" "mime" "mime/multipart" "net/http" @@ -16,6 +16,8 @@ import ( "path/filepath" "strings" + "github.com/valyala/fasthttp" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -77,76 +79,89 @@ func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped boo }, filename, contentIsGzipped, mtype, pairMap, jwt) } +func randomBoundary() string { + var buf [30]byte + _, err := io.ReadFull(rand.Reader, buf[:]) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x", buf[:]) +} + func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + if mtype == "" { mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) } - if mtype != "" { - h.Set("Content-Type", mtype) - } - if isGzipped { - h.Set("Content-Encoding", "gzip") - } - - file_writer, cp_err := body_writer.CreatePart(h) - if cp_err != nil { - glog.V(0).Infoln("error creating form file", cp_err.Error()) - return nil, cp_err - } - if err := fillBufferFunction(file_writer); err != nil { - glog.V(0).Infoln("error copying data", err) - return nil, err - } - content_type := body_writer.FormDataContentType() - if err := body_writer.Close(); err != nil { - glog.V(0).Infoln("error closing body", err) - return nil, err - } + boundary := randomBoundary() + contentType := "multipart/form-data; boundary=" + boundary - req, postErr := http.NewRequest("POST", uploadUrl, body_buf) - if postErr != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error()) - return nil, postErr - } - req.Header.Set("Content-Type", content_type) - for k, v := range pairMap { - req.Header.Set(k, v) - } - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } - resp, post_err := client.Do(req) - if post_err != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) - return nil, post_err - } - defer resp.Body.Close() - etag := getEtag(resp) - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - return nil, ra_err - } var ret UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) - return nil, unmarshal_err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) + var etag string + var writeErr error + err := util.PostContent(uploadUrl, func(w *bufio.Writer) { + h := make(textproto.MIMEHeader) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + if mtype != "" { + h.Set("Content-Type", mtype) + } + if isGzipped { + h.Set("Content-Encoding", "gzip") + } + + body_writer := multipart.NewWriter(w) + body_writer.SetBoundary(boundary) + file_writer, cp_err := body_writer.CreatePart(h) + if cp_err != nil { + glog.V(0).Infoln("error creating form file", cp_err.Error()) + writeErr = cp_err + return + } + if err := fillBufferFunction(file_writer); err != nil { + glog.V(0).Infoln("error copying data", err) + writeErr = err + return + } + if err := body_writer.Close(); err != nil { + glog.V(0).Infoln("error closing body", err) + writeErr = err + return + } + w.Flush() + + }, func(header *fasthttp.RequestHeader) { + header.Set("Content-Type", contentType) + for k, v := range pairMap { + header.Set(k, v) + } + if jwt != "" { + header.Set("Authorization", "BEARER "+string(jwt)) + } + }, func(resp *fasthttp.Response) error { + etagBytes := resp.Header.Peek("ETag") + lenEtagBytes := len(etagBytes) + if lenEtagBytes > 2 && etagBytes[0] == '"' && etagBytes[lenEtagBytes-1] == '"' { + etag = string(etagBytes[1 : len(etagBytes)-1]) + } + + unmarshal_err := json.Unmarshal(resp.Body(), &ret) + if unmarshal_err != nil { + glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp.Body())) + return unmarshal_err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil + }) + + if writeErr != nil { + return nil, writeErr } - ret.ETag = etag - return &ret, nil -} -func getEtag(r *http.Response) (etag string) { - etag = r.Header.Get("ETag") - if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") { - etag = etag[1 : len(etag)-1] + ret.ETag = etag + if err != nil { + return nil, err } - return + return &ret, nil } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index b74e30ad7..a1f58d482 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -1,6 +1,7 @@ package util import ( + "bufio" "bytes" "compress/gzip" "encoding/json" @@ -31,6 +32,25 @@ func init() { } } +func PostContent(url string, fnWriteBody func(w *bufio.Writer), fnReqHeader func(*fasthttp.RequestHeader), fnResp func(resp *fasthttp.Response) error) error { + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(url) + req.Header.SetMethod(fasthttp.MethodPost) + fnReqHeader(&req.Header) + req.SetBodyStreamWriter(fnWriteBody) + if err := fasthttp.Do(req, resp); err != nil { + return err + } + if resp.StatusCode() >= 400 { + return fmt.Errorf("%s: %d", url, resp.StatusCode()) + } + return fnResp(resp) +} + func PostBytes(url string, body []byte) ([]byte, error) { r, err := client.Post(url, "", bytes.NewReader(body)) if err != nil { @@ -85,22 +105,21 @@ func Get(url string) ([]byte, error) { return b, nil } -func Head(url string, fn func(header fasthttp.ResponseHeader)) error { +func Head(url string, fn func(*fasthttp.ResponseHeader)) error { req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseRequest(req) // <- do not forget to release - defer fasthttp.ReleaseResponse(resp) // <- do not forget to release + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) - c := fasthttp.Client{} req.SetRequestURI(url) req.Header.SetMethod(fasthttp.MethodHead) - if err := c.Do(req, resp); err != nil { + if err := fasthttp.Do(req, resp); err != nil { return err } if resp.StatusCode() >= 400 { return fmt.Errorf("%s: %d", url, resp.StatusCode()) } - fn(resp.Header) + fn(&resp.Header) return nil } diff --git a/weed/util/http_util_test.go b/weed/util/http_util_test.go index a8a1172d2..bf211bc4b 100644 --- a/weed/util/http_util_test.go +++ b/weed/util/http_util_test.go @@ -7,7 +7,7 @@ import ( ) func TestFasthttpClientHead(t *testing.T) { - err := Head("https://www.google.com", func(header fasthttp.ResponseHeader) { + err := Head("https://www.google.com", func(header *fasthttp.ResponseHeader) { header.VisitAll(func(key, value []byte) { println(string(key) + ": " + string(value)) })