Browse Source

revert fasthttp changes

related to https://github.com/chrislusf/seaweedfs/issues/1907
pull/1910/head
Chris Lu 4 years ago
parent
commit
4b1ed227d1
  1. 2
      weed/command/benchmark.go
  2. 2
      weed/filer/filechunk_manifest.go
  3. 2
      weed/filer/read_write.go
  4. 2
      weed/filer/stream.go
  5. 2
      weed/replication/repl_util/replication_util.go
  6. 116
      weed/util/fasthttp_util.go
  7. 2
      weed/util/http_util.go

2
weed/command/benchmark.go

@ -305,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
} }
var bytes []byte var bytes []byte
for _, url := range urls { for _, url := range urls {
bytes, _, err = util.FastGet(url)
bytes, _, err = util.Get(url)
if err == nil { if err == nil {
break break
} }

2
weed/filer/filechunk_manifest.go

@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry { if !shouldRetry {

2
weed/filer/read_write.go

@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
data, _, err := util.FastGet(target)
data, _, err := util.Get(target)
return data, err return data, err
} }

2
weed/filer/stream.go

@ -185,7 +185,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool var shouldRetry bool
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry { if !shouldRetry {

2
weed/replication/repl_util/replication_util.go

@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool var shouldRetry bool
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data) writeErr = writeFunc(data)
}) })
if err != nil { if err != nil {

116
weed/util/fasthttp_util.go

@ -1,116 +0,0 @@
package util
import (
"bytes"
"fmt"
"github.com/valyala/fasthttp"
"sync"
"time"
)
var (
fastClient = &fasthttp.Client{
NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
MaxConnsPerHost: 1024,
ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once.
WriteBufferSize: 64 * 1024, // Same but for your response.
ReadTimeout: time.Second,
WriteTimeout: time.Second,
MaxIdleConnDuration: time.Minute,
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
DialDualStack: true,
}
// Put everything in pools to prevent garbage.
bytesPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 0)
return &b
},
}
responsePool = sync.Pool{
New: func() interface{} {
return make(chan *fasthttp.Response)
},
}
)
func FastGet(url string) ([]byte, bool, error) {
req := fasthttp.AcquireRequest()
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
req.SetRequestURIBytes([]byte(url))
req.Header.Add("Accept-Encoding", "gzip")
err := fastClient.Do(req, res)
if err != nil {
return nil, true, err
}
var data []byte
contentEncoding := res.Header.Peek("Content-Encoding")
if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
data, err = res.BodyGunzip()
} else {
data = res.Body()
}
out := make([]byte, len(data))
copy(out, data)
if res.StatusCode() >= 400 {
retryable := res.StatusCode() >= 500
return nil, retryable, fmt.Errorf("%s: %d", url, res.StatusCode())
}
if err != nil {
return nil, false, err
}
return out, false, nil
}
func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
req := fasthttp.AcquireRequest()
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(res)
req.SetRequestURIBytes([]byte(fileUrl))
if isFullChunk {
req.Header.Add("Accept-Encoding", "gzip")
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
if err = fastClient.Do(req, res); err != nil {
return true, err
}
if res.StatusCode() >= 400 {
retryable = res.StatusCode() >= 500
return retryable, fmt.Errorf("%s: %d", fileUrl, res.StatusCode())
}
contentEncoding := res.Header.Peek("Content-Encoding")
if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
bodyData, err := res.BodyGunzip()
if err != nil {
return false, err
}
fn(bodyData)
} else {
fn(res.Body())
}
return false, nil
}

2
weed/util/http_util.go

@ -313,7 +313,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
} }
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := FastGet(fileUrl)
encryptedData, retryable, err := Get(fileUrl)
if err != nil { if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
} }

Loading…
Cancel
Save