Browse Source

Only wait on retryable requests

pull/1537/head
Chris Lu 4 years ago
parent
commit
3f7d1d1bf1
  1. 2
      unmaintained/repeated_vacuum/repeated_vacuum.go
  2. 2
      weed/command/benchmark.go
  3. 8
      weed/filer/filechunk_manifest.go
  4. 6
      weed/filer/stream.go
  5. 2
      weed/replication/repl_util/replication_utli.go
  6. 38
      weed/util/http_util.go

2
unmaintained/repeated_vacuum/repeated_vacuum.go

@ -32,7 +32,7 @@ func main() {
go func() { go func() {
for { for {
println("vacuum threshold", *garbageThreshold) println("vacuum threshold", *garbageThreshold)
_, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
_, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
if err != nil { if err != nil {
log.Fatalf("vacuum: %v", err) log.Fatalf("vacuum: %v", err)
} }

2
weed/command/benchmark.go

@ -290,7 +290,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
} }
var bytes []byte var bytes []byte
for _, url := range urls { for _, url := range urls {
bytes, err = util.Get(url)
bytes, _, err = util.Get(url)
if err == nil { if err == nil {
break break
} }

8
weed/filer/filechunk_manifest.go

@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var err error var err error
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 { for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry {
break
}
if err != nil { if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err) glog.V(0).Infof("read %s failed, err: %v", urlString, err)
buffer.Reset() buffer.Reset()
@ -110,7 +114,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
break break
} }
} }
if err != nil {
if err != nil && shouldRetry{
glog.V(0).Infof("sleep for %v before retrying reading", waitTime) glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
time.Sleep(waitTime) time.Sleep(waitTime)
} else { } else {

6
weed/filer/stream.go

@ -174,10 +174,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err return err
} }
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry {
break
}
if err != nil { if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
buffer.Reset() buffer.Reset()

2
weed/replication/repl_util/replication_utli.go

@ -19,7 +19,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var writeErr error var writeErr error
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data) writeErr = writeFunc(data)
}) })
if err != nil { if err != nil {

38
weed/util/http_util.go

@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout // may need increasing http.Client.Timeout
func Get(url string) ([]byte, error) {
func Get(url string) ([]byte, bool, error) {
request, err := http.NewRequest("GET", url, nil) request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip") request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request) response, err := client.Do(request)
if err != nil { if err != nil {
return nil, err
return nil, true, err
} }
defer response.Body.Close() defer response.Body.Close()
@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) {
b, err := ioutil.ReadAll(reader) b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 { if response.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, response.Status)
retryable := response.StatusCode >= 500
return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
} }
if err != nil { if err != nil {
return nil, err
return nil, false, err
} }
return b, nil
return b, false, nil
} }
func Head(url string) (http.Header, error) { func Head(url string) (http.Header, error) {
@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if cipherKey != nil { if cipherKey != nil {
var n int var n int
err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
_, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data) n = copy(buf, data)
}) })
return int64(n), err return int64(n), err
@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err return n, err
} }
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil { if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
req, err := http.NewRequest("GET", fileUrl, nil) req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil { if err != nil {
return err
return false, err
} }
if isFullChunk { if isFullChunk {
@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
r, err := client.Do(req) r, err := client.Do(req)
if err != nil { if err != nil {
return err
return true, err
} }
defer CloseResponse(r) defer CloseResponse(r)
if r.StatusCode >= 400 { if r.StatusCode >= 400 {
return fmt.Errorf("%s: %s", fileUrl, r.Status)
retryable = r.StatusCode >= 500
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
} }
var reader io.ReadCloser var reader io.ReadCloser
@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
m, err = reader.Read(buf) m, err = reader.Read(buf)
fn(buf[:m]) fn(buf[:m])
if err == io.EOF { if err == io.EOF {
return nil
return false, nil
} }
if err != nil { if err != nil {
return err
return false, err
} }
} }
} }
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
encryptedData, err := Get(fileUrl)
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := Get(fileUrl)
if err != nil { if err != nil {
return fmt.Errorf("fetch %s: %v", fileUrl, err)
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
} }
decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
if err != nil { if err != nil {
return fmt.Errorf("decrypt %s: %v", fileUrl, err)
return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
} }
if isContentCompressed { if isContentCompressed {
decryptedData, err = DecompressData(decryptedData) decryptedData, err = DecompressData(decryptedData)
@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
} }
} }
if len(decryptedData) < int(offset)+size { if len(decryptedData) < int(offset)+size {
return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
} }
if isFullChunk { if isFullChunk {
fn(decryptedData) fn(decryptedData)
} else { } else {
fn(decryptedData[int(offset) : int(offset)+size]) fn(decryptedData[int(offset) : int(offset)+size])
} }
return nil
return false, nil
} }
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {

Loading…
Cancel
Save