diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 962bbf30a..05db3f4dd 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "path/filepath" "strings" @@ -241,17 +242,44 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, } eofBackoff := time.Duration(0) + var partialData []byte + var savedFilename string + var savedHeader http.Header + var savedSourceUrl string retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString()) err = util.RetryUntil(retryName, func() error { - filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) + filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString(), int64(len(partialData))) if readErr != nil { return fmt.Errorf("read part %s: %w", sourceChunk.GetFileIdString(), readErr) } defer util_http.CloseResponse(resp) - sourceUrl := "" - if resp.Request != nil && resp.Request.URL != nil { - sourceUrl = resp.Request.URL.String() + // Save metadata from first successful response + if len(partialData) == 0 { + savedFilename = filename + savedHeader = header + if resp.Request != nil && resp.Request.URL != nil { + savedSourceUrl = resp.Request.URL.String() + } + } + + // Read the response body + data, readBodyErr := io.ReadAll(resp.Body) + if readBodyErr != nil { + // Keep whatever bytes we received before the error + partialData = append(partialData, data...) + return fmt.Errorf("read body: %w", readBodyErr) + } + + // Combine with previously accumulated partial data + var fullData []byte + if len(partialData) > 0 { + fullData = append(partialData, data...) + glog.V(0).Infof("resumed reading %s, got %d + %d = %d bytes", + sourceChunk.GetFileIdString(), len(partialData), len(data), len(fullData)) + partialData = nil + } else { + fullData = data } currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( @@ -266,20 +294,20 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, Path: path, }, &operation.UploadOption{ - Filename: filename, + Filename: savedFilename, Cipher: false, - IsInputCompressed: "gzip" == header.Get("Content-Encoding"), - MimeType: header.Get("Content-Type"), + IsInputCompressed: "gzip" == savedHeader.Get("Content-Encoding"), + MimeType: savedHeader.Get("Content-Type"), PairMap: nil, RetryForever: false, - SourceUrl: sourceUrl, + SourceUrl: savedSourceUrl, }, func(host, fileId string) string { fileUrl := fs.buildUploadUrl(host, fileId) - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + glog.V(4).Infof("replicating %s to %s header:%+v", savedFilename, fileUrl, savedHeader) return fileUrl }, - resp.Body, + util.NewBytesReader(fullData), ) if uploadErr != nil { return fmt.Errorf("upload data: %w", uploadErr) @@ -291,17 +319,18 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, eofBackoff = 0 fileId = currentFileId return nil - }, func(uploadErr error) (shouldContinue bool) { + }, func(retryErr error) (shouldContinue bool) { if fs.hasSourceNewerVersion(path, sourceMtime) { - glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) + glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } - if isEofError(uploadErr) { + if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) - glog.V(0).Infof("source connection interrupted while replicating %s for %s, backing off %v: %v", sourceChunk.GetFileIdString(), path, eofBackoff, uploadErr) + glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", + sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) } else { - glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr) + glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } return true }) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index bb110d1cf..0dd3ea5b9 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -3,7 +3,6 @@ package source import ( "context" "fmt" - "io" "net/http" "strings" @@ -18,10 +17,6 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -type ReplicationSource interface { - ReadPart(part string) io.ReadCloser -} - type FilerSource struct { grpcAddress string grpcDialOption grpc.DialOption @@ -104,14 +99,14 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls return } -func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { +func (fs *FilerSource) ReadPart(fileId string, offset int64) (filename string, header http.Header, resp *http.Response, err error) { if fs.proxyByFiler { - filename, header, resp, err = util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + filename, header, resp, err = util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "", offset) if err != nil { - glog.V(0).Infof("read part %s via filer proxy %s: %v", fileId, fs.address, err) + glog.V(0).Infof("read part %s via filer proxy %s offset %d: %v", fileId, fs.address, offset, err) } else { - glog.V(4).Infof("read part %s via filer proxy %s content-length:%s", fileId, fs.address, header.Get("Content-Length")) + glog.V(4).Infof("read part %s via filer proxy %s offset %d content-length:%s", fileId, fs.address, offset, header.Get("Content-Length")) } return } @@ -122,11 +117,11 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea } for _, fileUrl := range fileUrls { - filename, header, resp, err = util_http.DownloadFile(fileUrl, "") + filename, header, resp, err = util_http.DownloadFile(fileUrl, "", offset) if err != nil { - glog.V(0).Infof("fail to read part %s from %s: %v", fileId, fileUrl, err) + glog.V(0).Infof("fail to read part %s from %s offset %d: %v", fileId, fileUrl, offset, err) } else { - glog.V(4).Infof("read part %s from %s content-length:%s", fileId, fileUrl, header.Get("Content-Length")) + glog.V(4).Infof("read part %s from %s offset %d content-length:%s", fileId, fileUrl, offset, header.Get("Content-Length")) break } } diff --git a/weed/replication/source/filer_source_test.go b/weed/replication/source/filer_source_test.go new file mode 100644 index 000000000..72ac122aa --- /dev/null +++ b/weed/replication/source/filer_source_test.go @@ -0,0 +1,284 @@ +package source + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "sync/atomic" + "testing" + + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" +) + +func TestMain(m *testing.M) { + util_http.InitGlobalHttpClient() + os.Exit(m.Run()) +} + +func TestDownloadFile_NoOffset(t *testing.T) { + testData := []byte("0123456789abcdefghij") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Range") != "" { + t.Error("Range header should not be set when offset is 0") + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(testData) + })) + defer server.Close() + + _, _, resp, err := util_http.DownloadFile(server.URL, "") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, testData) { + t.Fatalf("expected %q, got %q", testData, data) + } +} + +func TestDownloadFile_WithOffset(t *testing.T) { + testData := []byte("0123456789abcdefghij") + + var receivedRange string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedRange = r.Header.Get("Range") + var offset int + fmt.Sscanf(receivedRange, "bytes=%d-", &offset) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, len(testData)-1, len(testData))) + w.WriteHeader(http.StatusPartialContent) + w.Write(testData[offset:]) + })) + defer server.Close() + + _, _, resp, err := util_http.DownloadFile(server.URL, "", 10) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if receivedRange != "bytes=10-" { + t.Fatalf("expected Range header %q, got %q", "bytes=10-", receivedRange) + } + if resp.StatusCode != http.StatusPartialContent { + t.Fatalf("expected status 206, got %d", resp.StatusCode) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, testData[10:]) { + t.Fatalf("expected %q, got %q", testData[10:], data) + } +} + +func TestDownloadFile_RejectsIgnoredRange(t *testing.T) { + // Server ignores Range header and returns 200 OK with full body + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("full body")) + })) + defer server.Close() + + _, _, _, err := util_http.DownloadFile(server.URL, "", 100) + if err == nil { + t.Fatal("expected error when server ignores Range and returns 200") + } +} + +func TestDownloadFile_ContentDisposition(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Disposition", `attachment; filename="test.txt"`) + w.Write([]byte("data")) + })) + defer server.Close() + + filename, _, resp, err := util_http.DownloadFile(server.URL, "") + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + + if filename != "test.txt" { + t.Fatalf("expected filename %q, got %q", "test.txt", filename) + } +} + +// TestDownloadFile_PartialReadThenResume simulates a connection drop +// after partial data, then resumes from the offset. Verifies the combined +// data matches the original. +func TestDownloadFile_PartialReadThenResume(t *testing.T) { + testData := bytes.Repeat([]byte("abcdefghij"), 100) // 1000 bytes + dropAfter := 500 + + var requestNum atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := requestNum.Add(1) + rangeHeader := r.Header.Get("Range") + + if n == 1 && rangeHeader == "" { + // First request: write partial data then kill the connection + hj, ok := w.(http.Hijacker) + if !ok { + t.Error("server doesn't support hijacking") + return + } + conn, buf, _ := hj.Hijack() + fmt.Fprintf(buf, "HTTP/1.1 200 OK\r\n") + fmt.Fprintf(buf, "Content-Length: %d\r\n", len(testData)) + fmt.Fprintf(buf, "Content-Type: application/octet-stream\r\n") + fmt.Fprintf(buf, "\r\n") + buf.Write(testData[:dropAfter]) + buf.Flush() + conn.Close() + return + } + + // Resume request with Range + var offset int + fmt.Sscanf(rangeHeader, "bytes=%d-", &offset) + w.Header().Set("Content-Range", + fmt.Sprintf("bytes %d-%d/%d", offset, len(testData)-1, len(testData))) + w.WriteHeader(http.StatusPartialContent) + w.Write(testData[offset:]) + })) + defer server.Close() + + // First read — should get partial data + error + _, _, resp1, err := util_http.DownloadFile(server.URL, "") + if err != nil { + t.Fatal(err) + } + partialData, readErr := io.ReadAll(resp1.Body) + resp1.Body.Close() + if readErr == nil { + t.Fatal("expected error from truncated response") + } + if len(partialData) == 0 { + t.Fatal("expected some partial data") + } + if !bytes.Equal(partialData, testData[:len(partialData)]) { + t.Fatal("partial data doesn't match beginning of original") + } + + // Resume from where we left off + _, _, resp2, err := util_http.DownloadFile(server.URL, "", int64(len(partialData))) + if err != nil { + t.Fatal(err) + } + defer resp2.Body.Close() + remainingData, readErr := io.ReadAll(resp2.Body) + if readErr != nil { + t.Fatalf("unexpected error on resume: %v", readErr) + } + + // Combined data must match original + fullData := append(partialData, remainingData...) + if !bytes.Equal(fullData, testData) { + t.Fatalf("combined data mismatch: got %d bytes, want %d", len(fullData), len(testData)) + } +} + +// TestDownloadFile_GzipPartialReadThenResume verifies the tricky case +// where the first response is gzip-encoded (and Go's HTTP client auto- +// decompresses it), but the resume request gets uncompressed data (because +// Go doesn't add Accept-Encoding when Range is set). The combined +// decompressed bytes must still match the original. +func TestDownloadFile_GzipPartialReadThenResume(t *testing.T) { + testData := bytes.Repeat([]byte("hello world gzip test "), 100) // ~2200 bytes + + // Pre-compress + var compressed bytes.Buffer + gz := gzip.NewWriter(&compressed) + gz.Write(testData) + gz.Close() + compressedData := compressed.Bytes() + dropAfter := len(compressedData) / 2 + + var requestNum atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := requestNum.Add(1) + rangeHeader := r.Header.Get("Range") + acceptsGzip := r.Header.Get("Accept-Encoding") != "" + + if n == 1 && acceptsGzip && rangeHeader == "" { + // First request: serve gzip-encoded response, drop mid-stream. + // Go's HTTP client auto-added Accept-Encoding: gzip (no Range), + // so it will auto-decompress and strip the Content-Encoding header. + hj, ok := w.(http.Hijacker) + if !ok { + t.Error("server doesn't support hijacking") + return + } + conn, buf, _ := hj.Hijack() + fmt.Fprintf(buf, "HTTP/1.1 200 OK\r\n") + fmt.Fprintf(buf, "Content-Encoding: gzip\r\n") + fmt.Fprintf(buf, "Content-Length: %d\r\n", len(compressedData)) + fmt.Fprintf(buf, "Content-Type: application/octet-stream\r\n") + fmt.Fprintf(buf, "\r\n") + buf.Write(compressedData[:dropAfter]) + buf.Flush() + conn.Close() + return + } + + // Resume request: Range is set, Go did NOT add Accept-Encoding, + // so we serve decompressed data from the requested offset — + // mimicking what the volume server does when the client doesn't + // advertise gzip support. + var offset int + fmt.Sscanf(rangeHeader, "bytes=%d-", &offset) + remaining := testData[offset:] + w.Header().Set("Content-Range", + fmt.Sprintf("bytes %d-%d/%d", offset, len(testData)-1, len(testData))) + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(http.StatusPartialContent) + w.Write(remaining) + })) + defer server.Close() + + // First read: Go auto-decompresses; truncated stream → error + partial data + _, _, resp1, err := util_http.DownloadFile(server.URL, "") + if err != nil { + t.Fatal(err) + } + partialData, readErr := io.ReadAll(resp1.Body) + resp1.Body.Close() + if readErr == nil { + t.Fatal("expected error from truncated gzip response") + } + if len(partialData) == 0 { + t.Fatal("expected some decompressed partial data") + } + // Partial decompressed data must match the beginning of the original + if !bytes.Equal(partialData, testData[:len(partialData)]) { + t.Fatalf("partial decompressed data doesn't match original (got %d bytes)", len(partialData)) + } + + // Resume: offset is in the *decompressed* domain. + // The server (like the volume server) decompresses and serves from that offset. + _, _, resp2, err := util_http.DownloadFile(server.URL, "", int64(len(partialData))) + if err != nil { + t.Fatal(err) + } + defer resp2.Body.Close() + remainingData, readErr := io.ReadAll(resp2.Body) + if readErr != nil { + t.Fatalf("unexpected error on resume: %v", readErr) + } + + fullData := append(partialData, remainingData...) + if !bytes.Equal(fullData, testData) { + t.Fatalf("combined data mismatch: got %d bytes, want %d", len(fullData), len(testData)) + } +} diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 612d538ec..62e723d38 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -201,7 +201,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e return readFn(r.Body) } -func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) { +func DownloadFile(fileUrl string, jwt string, offset ...int64) (filename string, header http.Header, resp *http.Response, e error) { req, err := http.NewRequest(http.MethodGet, fileUrl, nil) if err != nil { return "", nil, nil, err @@ -209,10 +209,29 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head maybeAddAuth(req, jwt) + var rangeOffset int64 + if len(offset) > 0 { + rangeOffset = offset[0] + } + if rangeOffset > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", rangeOffset)) + } + response, err := GetGlobalHttpClient().Do(req) if err != nil { return "", nil, nil, err } + + if rangeOffset > 0 { + expected := fmt.Sprintf("bytes %d-", rangeOffset) + if response.StatusCode != http.StatusPartialContent || + !strings.HasPrefix(response.Header.Get("Content-Range"), expected) { + CloseResponse(response) + return "", nil, nil, fmt.Errorf("range request %q to %s returned %s with Content-Range %q", + req.Header.Get("Range"), fileUrl, response.Status, response.Header.Get("Content-Range")) + } + } + header = response.Header contentDisposition := response.Header["Content-Disposition"] if len(contentDisposition) > 0 {