From 81f403d3a21a5a5549294a73fd13c57a46c28f04 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 06:12:23 -0800 Subject: [PATCH] implement S3 remote storage backend and wire into FetchAndWriteNeedle - Add remote_storage module with RemoteStorageClient trait - Implement S3RemoteStorageClient using aws-sdk-s3 (covers all S3-compatible providers: AWS, Wasabi, Backblaze, Aliyun, etc.) - FetchAndWriteNeedle now fetches data from S3, writes locally as needle, and replicates to peers - Add 3 integration tests using weed mini as S3 backend: - Full round-trip fetch from S3 - Byte-range (partial) read from S3 - Error handling for non-existent S3 objects --- seaweed-volume/Cargo.lock | 1002 +++++++++++++++-- seaweed-volume/Cargo.toml | 8 +- seaweed-volume/src/lib.rs | 1 + seaweed-volume/src/remote_storage/mod.rs | 155 +++ seaweed-volume/src/remote_storage/s3.rs | 178 +++ seaweed-volume/src/server/grpc_server.rs | 83 +- .../grpc/fetch_remote_s3_test.go | 288 +++++ 7 files changed, 1620 insertions(+), 95 deletions(-) create mode 100644 seaweed-volume/src/remote_storage/mod.rs create mode 100644 seaweed-volume/src/remote_storage/s3.rs create mode 100644 test/volume_server/grpc/fetch_remote_s3_test.go diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 2878c97b3..3b997f743 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -28,6 +28,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -138,6 +144,48 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-config" +version = "1.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.5", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.4.0", + "sha1", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + [[package]] name = "aws-lc-rs" version = "1.16.1" @@ -160,6 +208,412 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-runtime" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http 0.63.6", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "bytes-utils", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.119.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http 0.62.6", + "aws-smithy-json 0.61.9", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "lru 0.12.5", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.96.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64a6eded248c6b453966e915d32aeddb48ea63ad17932682774eb026fbef5b1" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.5", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.98.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db96d720d3c622fcbe08bae1c4b04a72ce6257d8b0584cb5418da00ae20a344f" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.5", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.100.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fafbdda43b93f57f699c5dfe8328db590b967b8a820a13ccdd6687355dfcc7ca" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.5", + "aws-smithy-observability", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http 0.63.6", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.4.0", + "p256 0.11.1", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.63.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87294a084b43d649d967efe58aa1f9e0adc260e13a6938eb904c0ae9b45824ae" +dependencies = [ + "aws-smithy-http 0.62.6", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf09d74e5e32f76b8762da505a3cd59303e367a664ca67295387baa8c1d7548" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http" +version = "0.63.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1ab2dc1c2c3749ead27180d333c42f11be8b0e934058fb4b2258ee8dbe5231" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a2f165a7feee6f263028b899d0a181987f4fa7179a6411a32a439fba7c5f769" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.27", + "h2 0.4.13", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.8.1", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.37", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tower 0.5.3", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-json" +version = "0.62.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06c2315d173edbf1920da8ba3a7189695827002e4c0fc961973ab1c54abca9c" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a56d79744fb3edb5d722ef79d86081e121d3b9422cb209eb03aea6aa4f21ebd" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.4.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b1117b3b2bbe166d11199b540ceed0d0f7676e36e7b962b5a437a9971eac75" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.9" @@ -170,10 +624,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "itoa", "matchit", @@ -204,8 +658,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -216,6 +670,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base16ct" version = "0.2.0" @@ -228,6 +688,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.8.3" @@ -279,6 +749,16 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cc" version = "1.2.56" @@ -433,6 +913,19 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc-fast" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" +dependencies = [ + "crc", + "digest", + "rand 0.9.2", + "regex", + "rustversion", +] + [[package]] name = "crc32c" version = "0.6.8" @@ -476,6 +969,18 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -539,6 +1044,16 @@ dependencies = [ "parking_lot_core 0.9.12", ] +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.10" @@ -588,18 +1103,30 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve 0.12.3", + "rfc6979 0.3.1", + "signature 1.6.4", +] + [[package]] name = "ecdsa" version = "0.16.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" dependencies = [ - "der", + "der 0.7.10", "digest", - "elliptic-curve", - "rfc6979", - "signature", - "spki", + "elliptic-curve 0.13.8", + "rfc6979 0.4.0", + "signature 2.2.0", + "spki 0.7.3", ] [[package]] @@ -608,8 +1135,8 @@ version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ - "pkcs8", - "signature", + "pkcs8 0.10.2", + "signature 2.2.0", ] [[package]] @@ -632,23 +1159,43 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct 0.1.1", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff 0.12.1", + "generic-array", + "group 0.12.1", + "pkcs8 0.9.0", + "rand_core 0.6.4", + "sec1 0.3.0", + "subtle", + "zeroize", +] + [[package]] name = "elliptic-curve" version = "0.13.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" dependencies = [ - "base16ct", - "crypto-bigint", + "base16ct 0.2.0", + "crypto-bigint 0.5.5", "digest", - "ff", + "ff 0.13.1", "generic-array", - "group", + "group 0.13.0", "hkdf", "pem-rfc7468", - "pkcs8", + "pkcs8 0.10.2", "rand_core 0.6.4", - "sec1", + "sec1 0.7.3", "subtle", "zeroize", ] @@ -714,6 +1261,16 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "ff" version = "0.13.1" @@ -953,17 +1510,47 @@ dependencies = [ "weezl", ] +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff 0.12.1", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "group" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" dependencies = [ - "ff", + "ff 0.13.1", "rand_core 0.6.4", "subtle", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.13.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.13" @@ -975,7 +1562,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.4.0", "indexmap 2.13.0", "slab", "tokio", @@ -1004,6 +1591,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -1043,6 +1632,17 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -1053,6 +1653,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1060,7 +1671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -1071,8 +1682,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1088,6 +1699,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.8.1" @@ -1098,9 +1733,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -1111,19 +1746,35 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-util", - "rustls", + "rustls 0.23.37", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", "webpki-roots", ] @@ -1134,7 +1785,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", @@ -1149,7 +1800,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "native-tls", "tokio", @@ -1167,9 +1818,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", "ipnet", "libc", "percent-encoding", @@ -1436,7 +2087,7 @@ dependencies = [ "getrandom 0.2.17", "hmac", "js-sys", - "p256", + "p256 0.13.2", "p384", "pem", "rand 0.8.5", @@ -1444,7 +2095,7 @@ dependencies = [ "serde", "serde_json", "sha2", - "signature", + "signature 2.2.0", "simple_asn1", ] @@ -1517,6 +2168,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -1569,6 +2229,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1609,7 +2279,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 1.4.0", "httparse", "memchr", "mime", @@ -1776,14 +2446,31 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa 0.14.8", + "elliptic-curve 0.12.3", + "sha2", +] + [[package]] name = "p256" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" dependencies = [ - "ecdsa", - "elliptic-curve", + "ecdsa 0.16.9", + "elliptic-curve 0.13.8", "primeorder", "sha2", ] @@ -1794,8 +2481,8 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe42f1670a52a47d448f14b6a5c61dd78fce51856e68edaa38f7ae3a46b8d6b6" dependencies = [ - "ecdsa", - "elliptic-curve", + "ecdsa 0.16.9", + "elliptic-curve 0.13.8", "primeorder", "sha2", ] @@ -1921,9 +2608,19 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der", - "pkcs8", - "spki", + "der 0.7.10", + "pkcs8 0.10.2", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -1932,8 +2629,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.10", + "spki 0.7.3", ] [[package]] @@ -1995,7 +2692,7 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" dependencies = [ - "elliptic-curve", + "elliptic-curve 0.13.8", ] [[package]] @@ -2116,7 +2813,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.37", "socket2 0.6.3", "thiserror 2.0.18", "tokio", @@ -2136,7 +2833,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash", - "rustls", + "rustls 0.23.37", "rustls-pki-types", "slab", "thiserror 2.0.18", @@ -2284,7 +2981,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" dependencies = [ "libm", - "lru", + "lru 0.7.8", "parking_lot 0.11.2", "smallvec", "spin", @@ -2313,6 +3010,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" + [[package]] name = "regex-syntax" version = "0.8.10" @@ -2330,22 +3033,23 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.8.1", + "hyper-rustls 0.27.7", "hyper-tls", "hyper-util", "js-sys", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.37", "rustls-pki-types", "serde", "serde_json", @@ -2353,7 +3057,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower 0.5.3", "tower-http 0.6.8", @@ -2366,6 +3070,17 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -2402,10 +3117,10 @@ dependencies = [ "num-integer", "num-traits", "pkcs1", - "pkcs8", + "pkcs8 0.10.2", "rand_core 0.6.4", - "signature", - "spki", + "signature 2.2.0", + "spki 0.7.3", "subtle", "zeroize", ] @@ -2451,6 +3166,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.37" @@ -2462,11 +3189,23 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.9", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -2486,6 +3225,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.9" @@ -2539,12 +3288,26 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "seaweed-volume" version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-s3", + "aws-types", "axum", "base64", "bytes", @@ -2556,8 +3319,8 @@ dependencies = [ "flate2", "futures", "hex", - "http-body", - "hyper", + "http-body 1.0.1", + "hyper 1.8.1", "hyper-util", "image", "jsonwebtoken", @@ -2571,7 +3334,7 @@ dependencies = [ "rand 0.8.5", "reed-solomon-erasure", "reqwest", - "rustls", + "rustls 0.23.37", "rustls-pemfile", "rusty-leveldb", "serde", @@ -2581,7 +3344,7 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-stream", "toml", "tonic", @@ -2593,16 +3356,30 @@ dependencies = [ "uuid", ] +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct 0.1.1", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + [[package]] name = "sec1" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" dependencies = [ - "base16ct", - "der", + "base16ct 0.2.0", + "der 0.7.10", "generic-array", - "pkcs8", + "pkcs8 0.10.2", "subtle", "zeroize", ] @@ -2711,6 +3488,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2747,6 +3535,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "signature" version = "2.2.0" @@ -2819,6 +3617,16 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -2826,7 +3634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.10", ] [[package]] @@ -3069,13 +3877,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.37", "tokio", ] @@ -3155,11 +3973,11 @@ dependencies = [ "axum", "base64", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-timeout", "hyper-util", "percent-encoding", @@ -3168,7 +3986,7 @@ dependencies = [ "rustls-pemfile", "socket2 0.5.10", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -3234,8 +4052,8 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.11.0", "bytes", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", "pin-project-lite", "tower-layer", @@ -3252,8 +4070,8 @@ dependencies = [ "bitflags 2.11.0", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower 0.5.3", @@ -3347,6 +4165,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -3377,6 +4201,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -3418,6 +4248,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -4005,6 +4841,12 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.8.1" diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 56773ca0e..5b860308f 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -65,7 +65,7 @@ memmap2 = "0.9" uuid = { version = "1", features = ["v4"] } # HTTP client (for proxying, remote fetch) -reqwest = { version = "0.12", features = ["rustls-tls", "stream"] } +reqwest = { version = "0.12", features = ["rustls-tls", "stream", "multipart"] } # Content hashing md-5 = "0.10" @@ -92,6 +92,12 @@ futures = "0.3" # Disk space checking sysinfo = "0.31" +# AWS S3 SDK (for remote storage backends) +aws-config = { version = "1", features = ["behavior-version-latest"] } +aws-sdk-s3 = "1" +aws-credential-types = "1" +aws-types = "1" + [dev-dependencies] tempfile = "3" diff --git a/seaweed-volume/src/lib.rs b/seaweed-volume/src/lib.rs index 462c01336..002c16e9c 100644 --- a/seaweed-volume/src/lib.rs +++ b/seaweed-volume/src/lib.rs @@ -3,6 +3,7 @@ pub mod storage; pub mod security; pub mod server; pub mod metrics; +pub mod remote_storage; /// Generated protobuf modules. pub mod pb { diff --git a/seaweed-volume/src/remote_storage/mod.rs b/seaweed-volume/src/remote_storage/mod.rs new file mode 100644 index 000000000..98056efd8 --- /dev/null +++ b/seaweed-volume/src/remote_storage/mod.rs @@ -0,0 +1,155 @@ +//! Remote storage backends for tiered storage support. +//! +//! Provides a trait-based abstraction over cloud storage providers (S3, GCS, Azure, etc.) +//! and a registry to create clients from protobuf RemoteConf messages. + +pub mod s3; + +use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation}; + +/// Error type for remote storage operations. +#[derive(Debug, thiserror::Error)] +pub enum RemoteStorageError { + #[error("remote storage type {0} not found")] + TypeNotFound(String), + #[error("remote object not found: {0}")] + ObjectNotFound(String), + #[error("remote storage error: {0}")] + Other(String), + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} + +/// Metadata about a remote file entry. +#[derive(Debug, Clone)] +pub struct RemoteEntry { + pub size: i64, + pub last_modified_at: i64, // Unix seconds + pub e_tag: String, + pub storage_name: String, +} + +/// Trait for remote storage clients. Matches Go's RemoteStorageClient interface. +#[async_trait::async_trait] +pub trait RemoteStorageClient: Send + Sync { + /// Read (part of) a file from remote storage. + async fn read_file( + &self, + loc: &RemoteStorageLocation, + offset: i64, + size: i64, + ) -> Result, RemoteStorageError>; + + /// Write a file to remote storage. + async fn write_file( + &self, + loc: &RemoteStorageLocation, + data: &[u8], + ) -> Result; + + /// Get metadata for a file in remote storage. + async fn stat_file( + &self, + loc: &RemoteStorageLocation, + ) -> Result; + + /// Delete a file from remote storage. + async fn delete_file( + &self, + loc: &RemoteStorageLocation, + ) -> Result<(), RemoteStorageError>; + + /// List all buckets. + async fn list_buckets(&self) -> Result, RemoteStorageError>; + + /// The RemoteConf used to create this client. + fn remote_conf(&self) -> &RemoteConf; +} + +/// Create a new remote storage client from a RemoteConf. +pub fn make_remote_storage_client( + conf: &RemoteConf, +) -> Result, RemoteStorageError> { + match conf.r#type.as_str() { + // All S3-compatible backends use the same client with different credentials + "s3" | "wasabi" | "backblaze" | "aliyun" | "tencent" | "baidu" + | "filebase" | "storj" | "contabo" => { + let (access_key, secret_key, endpoint, region) = extract_s3_credentials(conf); + Ok(Box::new(s3::S3RemoteStorageClient::new( + conf.clone(), + &access_key, + &secret_key, + ®ion, + &endpoint, + conf.s3_force_path_style, + ))) + } + other => Err(RemoteStorageError::TypeNotFound(other.to_string())), + } +} + +/// Extract S3-compatible credentials from a RemoteConf based on its type. +fn extract_s3_credentials(conf: &RemoteConf) -> (String, String, String, String) { + match conf.r#type.as_str() { + "s3" => ( + conf.s3_access_key.clone(), + conf.s3_secret_key.clone(), + conf.s3_endpoint.clone(), + if conf.s3_region.is_empty() { "us-east-1".to_string() } else { conf.s3_region.clone() }, + ), + "wasabi" => ( + conf.wasabi_access_key.clone(), + conf.wasabi_secret_key.clone(), + conf.wasabi_endpoint.clone(), + conf.wasabi_region.clone(), + ), + "backblaze" => ( + conf.backblaze_key_id.clone(), + conf.backblaze_application_key.clone(), + conf.backblaze_endpoint.clone(), + conf.backblaze_region.clone(), + ), + "aliyun" => ( + conf.aliyun_access_key.clone(), + conf.aliyun_secret_key.clone(), + conf.aliyun_endpoint.clone(), + conf.aliyun_region.clone(), + ), + "tencent" => ( + conf.tencent_secret_id.clone(), + conf.tencent_secret_key.clone(), + conf.tencent_endpoint.clone(), + String::new(), + ), + "baidu" => ( + conf.baidu_access_key.clone(), + conf.baidu_secret_key.clone(), + conf.baidu_endpoint.clone(), + conf.baidu_region.clone(), + ), + "filebase" => ( + conf.filebase_access_key.clone(), + conf.filebase_secret_key.clone(), + conf.filebase_endpoint.clone(), + String::new(), + ), + "storj" => ( + conf.storj_access_key.clone(), + conf.storj_secret_key.clone(), + conf.storj_endpoint.clone(), + String::new(), + ), + "contabo" => ( + conf.contabo_access_key.clone(), + conf.contabo_secret_key.clone(), + conf.contabo_endpoint.clone(), + conf.contabo_region.clone(), + ), + _ => ( + conf.s3_access_key.clone(), + conf.s3_secret_key.clone(), + conf.s3_endpoint.clone(), + conf.s3_region.clone(), + ), + } +} diff --git a/seaweed-volume/src/remote_storage/s3.rs b/seaweed-volume/src/remote_storage/s3.rs new file mode 100644 index 000000000..70ec72416 --- /dev/null +++ b/seaweed-volume/src/remote_storage/s3.rs @@ -0,0 +1,178 @@ +//! S3-compatible remote storage client. +//! +//! Works with AWS S3, MinIO, SeaweedFS S3, and all S3-compatible providers. + +use aws_sdk_s3::Client; +use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; +use aws_sdk_s3::primitives::ByteStream; + +use crate::pb::remote_pb::{RemoteConf, RemoteStorageLocation}; +use super::{RemoteEntry, RemoteStorageClient, RemoteStorageError}; + +/// S3-compatible remote storage client. +pub struct S3RemoteStorageClient { + client: Client, + conf: RemoteConf, +} + +impl S3RemoteStorageClient { + /// Create a new S3 client from credentials and endpoint configuration. + pub fn new( + conf: RemoteConf, + access_key: &str, + secret_key: &str, + region: &str, + endpoint: &str, + force_path_style: bool, + ) -> Self { + let region = if region.is_empty() { "us-east-1" } else { region }; + + let credentials = Credentials::new( + access_key, + secret_key, + None, // session token + None, // expiry + "seaweedfs-volume", + ); + + let mut s3_config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .region(Region::new(region.to_string())) + .credentials_provider(credentials) + .force_path_style(force_path_style); + + if !endpoint.is_empty() { + s3_config = s3_config.endpoint_url(endpoint); + } + + let client = Client::from_conf(s3_config.build()); + + S3RemoteStorageClient { client, conf } + } +} + +#[async_trait::async_trait] +impl RemoteStorageClient for S3RemoteStorageClient { + async fn read_file( + &self, + loc: &RemoteStorageLocation, + offset: i64, + size: i64, + ) -> Result, RemoteStorageError> { + let key = loc.path.trim_start_matches('/'); + + let mut req = self.client.get_object() + .bucket(&loc.bucket) + .key(key); + + // Set byte range if specified + if size > 0 { + let end = offset + size - 1; + req = req.range(format!("bytes={}-{}", offset, end)); + } else if offset > 0 { + req = req.range(format!("bytes={}-", offset)); + } + + let resp = req.send().await.map_err(|e| { + let msg = format!("{}", e); + if msg.contains("NoSuchKey") || msg.contains("404") { + RemoteStorageError::ObjectNotFound(format!("{}/{}", loc.bucket, key)) + } else { + RemoteStorageError::Other(format!("s3 get object: {}", e)) + } + })?; + + let data = resp.body.collect().await + .map_err(|e| RemoteStorageError::Other(format!("s3 read body: {}", e)))?; + + Ok(data.into_bytes().to_vec()) + } + + async fn write_file( + &self, + loc: &RemoteStorageLocation, + data: &[u8], + ) -> Result { + let key = loc.path.trim_start_matches('/'); + + let resp = self.client.put_object() + .bucket(&loc.bucket) + .key(key) + .body(ByteStream::from(data.to_vec())) + .send() + .await + .map_err(|e| RemoteStorageError::Other(format!("s3 put object: {}", e)))?; + + Ok(RemoteEntry { + size: data.len() as i64, + last_modified_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64, + e_tag: resp.e_tag().unwrap_or_default().to_string(), + storage_name: loc.name.clone(), + }) + } + + async fn stat_file( + &self, + loc: &RemoteStorageLocation, + ) -> Result { + let key = loc.path.trim_start_matches('/'); + + let resp = self.client.head_object() + .bucket(&loc.bucket) + .key(key) + .send() + .await + .map_err(|e| { + let msg = format!("{}", e); + if msg.contains("404") || msg.contains("NotFound") { + RemoteStorageError::ObjectNotFound(format!("{}/{}", loc.bucket, key)) + } else { + RemoteStorageError::Other(format!("s3 head object: {}", e)) + } + })?; + + Ok(RemoteEntry { + size: resp.content_length().unwrap_or(0), + last_modified_at: resp.last_modified() + .map(|t| t.secs()) + .unwrap_or(0), + e_tag: resp.e_tag().unwrap_or_default().to_string(), + storage_name: loc.name.clone(), + }) + } + + async fn delete_file( + &self, + loc: &RemoteStorageLocation, + ) -> Result<(), RemoteStorageError> { + let key = loc.path.trim_start_matches('/'); + + self.client.delete_object() + .bucket(&loc.bucket) + .key(key) + .send() + .await + .map_err(|e| RemoteStorageError::Other(format!("s3 delete object: {}", e)))?; + + Ok(()) + } + + async fn list_buckets(&self) -> Result, RemoteStorageError> { + let resp = self.client.list_buckets() + .send() + .await + .map_err(|e| RemoteStorageError::Other(format!("s3 list buckets: {}", e)))?; + + Ok(resp.buckets() + .iter() + .filter_map(|b| b.name().map(String::from)) + .collect()) + } + + fn remote_conf(&self) -> &RemoteConf { + &self.conf + } +} diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 2269f2d60..5162cc993 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1806,20 +1806,75 @@ impl VolumeServer for VolumeGrpcService { Status::invalid_argument("remote storage configuration is required") })?; - // No remote storage backends are compiled in — fail with the same error - // chain as Go: GetRemoteStorage → makeRemoteStorageClient → type not found - let remote_type = &remote_conf.r#type; - return Err(Status::internal(format!( - "get remote client: make remote storage client {}: remote storage type {} not found", - remote_conf.name, remote_type, - ))); - - // If remote storage were available, the flow would be: - // 1. client.read_file(remote_location, offset, size) → data - // 2. Build needle: id, cookie, data, checksum, last_modified - // 3. store.write_volume_needle(vid, &mut needle) - // 4. For each replica: HTTP POST data to replica URL with ?type=replicate - // 5. Return FetchAndWriteNeedleResponse { e_tag: needle.etag() } + // Create remote storage client + let client = crate::remote_storage::make_remote_storage_client(remote_conf) + .map_err(|e| Status::internal(format!( + "get remote client: make remote storage client {}: {}", + remote_conf.name, e, + )))?; + + let remote_location = req.remote_location.as_ref().ok_or_else(|| { + Status::invalid_argument("remote storage location is required") + })?; + + // Read data from remote storage + let data = client.read_file(remote_location, req.offset, req.size).await + .map_err(|e| Status::internal(format!( + "read from remote {:?}: {}", remote_location, e + )))?; + + // Build needle and write locally + let mut n = Needle { + id: NeedleId(req.needle_id), + cookie: Cookie(req.cookie), + data_size: data.len() as u32, + data: data.clone(), + ..Needle::default() + }; + n.checksum = crate::storage::needle::crc::CRC::new(&n.data); + n.size = crate::storage::types::Size(4 + n.data_size as i32 + 1); + n.last_modified = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + n.set_has_last_modified_date(); + + let e_tag; + { + let mut store = self.state.store.write().unwrap(); + match store.write_volume_needle(vid, &mut n) { + Ok(_) => { e_tag = n.etag(); } + Err(e) => { + return Err(Status::internal(format!( + "local write needle {} size {}: {}", req.needle_id, req.size, e + ))); + } + } + } + + // Replicate to peers (best-effort, matches Go's concurrent replication) + if !req.replicas.is_empty() { + let file_id = format!("{},{:x}{:08x}", vid, req.needle_id, req.cookie); + let http_client = reqwest::Client::new(); + for replica in &req.replicas { + let url = format!("http://{}/{}?type=replicate", replica.url, file_id); + let form = reqwest::multipart::Form::new() + .part("file", reqwest::multipart::Part::bytes(data.clone())); + if let Err(e) = http_client.post(&url) + .multipart(form) + .send() + .await + { + return Err(Status::internal(format!( + "remote write needle {} size {}: {}", req.needle_id, req.size, e + ))); + } + } + } + + Ok(Response::new(volume_server_pb::FetchAndWriteNeedleResponse { + e_tag, + })) } async fn scrub_volume( diff --git a/test/volume_server/grpc/fetch_remote_s3_test.go b/test/volume_server/grpc/fetch_remote_s3_test.go new file mode 100644 index 000000000..bd1c94cbc --- /dev/null +++ b/test/volume_server/grpc/fetch_remote_s3_test.go @@ -0,0 +1,288 @@ +package volume_server_grpc_test + +import ( + "bytes" + "context" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +// findAvailablePort finds a free TCP port on localhost. +func findAvailablePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil +} + +// waitForPort waits until a TCP port is listening, up to timeout. +func waitForPort(port int, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond) + if err == nil { + conn.Close() + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("port %d not listening after %v", port, timeout) +} + +// startWeedMini starts a weed mini subprocess and returns the S3 endpoint and cleanup func. +func startWeedMini(t *testing.T) (s3Endpoint string, cleanup func()) { + t.Helper() + + weedBin, err := exec.LookPath("weed") + if err != nil { + weedBin = filepath.Join("..", "..", "..", "weed", "weed_binary") + if _, err := os.Stat(weedBin); os.IsNotExist(err) { + t.Skip("weed binary not found, skipping S3 remote storage test") + } + } + + miniMasterPort, _ := findAvailablePort() + miniVolumePort, _ := findAvailablePort() + miniFilerPort, _ := findAvailablePort() + miniS3Port, _ := findAvailablePort() + miniDir := t.TempDir() + os.WriteFile(filepath.Join(miniDir, "security.toml"), []byte("# empty\n"), 0644) + + ctx, cancel := context.WithCancel(context.Background()) + + miniCmd := exec.CommandContext(ctx, weedBin, "mini", + fmt.Sprintf("-dir=%s", miniDir), + fmt.Sprintf("-master.port=%d", miniMasterPort), + fmt.Sprintf("-volume.port=%d", miniVolumePort), + fmt.Sprintf("-filer.port=%d", miniFilerPort), + fmt.Sprintf("-s3.port=%d", miniS3Port), + ) + miniCmd.Env = append(os.Environ(), "AWS_ACCESS_KEY_ID=admin", "AWS_SECRET_ACCESS_KEY=admin") + miniCmd.Dir = miniDir + logFile, _ := os.CreateTemp("", "weed-mini-*.log") + miniCmd.Stdout = logFile + miniCmd.Stderr = logFile + t.Logf("weed mini logs at %s", logFile.Name()) + + if err := miniCmd.Start(); err != nil { + cancel() + logFile.Close() + t.Fatalf("start weed mini: %v", err) + } + + if err := waitForPort(miniS3Port, 30*time.Second); err != nil { + cancel() + miniCmd.Wait() + logFile.Close() + t.Fatalf("weed mini S3 not ready: %v", err) + } + t.Logf("weed mini S3 ready on port %d", miniS3Port) + + return fmt.Sprintf("http://127.0.0.1:%d", miniS3Port), func() { + cancel() + miniCmd.Wait() + logFile.Close() + } +} + +func newS3Client(endpoint string) *s3.S3 { + sess, _ := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String(endpoint), + Credentials: credentials.NewStaticCredentials("admin", "admin", ""), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + }) + return s3.New(sess) +} + +// TestFetchAndWriteNeedleFromS3 tests the full FetchAndWriteNeedle flow: +// 1. Start a weed mini instance as S3 backend +// 2. Upload a test object to it via S3 API +// 3. Call FetchAndWriteNeedle on the volume server to fetch from S3 +// 4. Verify the response contains a valid e_tag +func TestFetchAndWriteNeedleFromS3(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + s3Endpoint, cleanupMini := startWeedMini(t) + defer cleanupMini() + + s3Client := newS3Client(s3Endpoint) + + // Create bucket and upload test data + bucket := "test-remote-fetch" + s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) + + testData := []byte("Hello from S3 remote storage! This is test data for FetchAndWriteNeedle.") + testKey := "test-object.dat" + _, err := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(testKey), + Body: bytes.NewReader(testData), + }) + if err != nil { + t.Fatalf("put object: %v", err) + } + t.Logf("uploaded %d bytes to s3://%s/%s", len(testData), bucket, testKey) + + // Start volume server + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(99) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer grpcCancel() + + // FetchAndWriteNeedle from S3 + resp, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ + VolumeId: volumeID, + NeedleId: 42, + Cookie: 12345, + Offset: 0, + Size: int64(len(testData)), + RemoteConf: &remote_pb.RemoteConf{ + Name: "test-s3", + Type: "s3", + S3AccessKey: "admin", + S3SecretKey: "admin", + S3Region: "us-east-1", + S3Endpoint: s3Endpoint, + S3ForcePathStyle: true, + }, + RemoteLocation: &remote_pb.RemoteStorageLocation{ + Name: "test-s3", + Bucket: bucket, + Path: "/" + testKey, + }, + }) + if err != nil { + t.Fatalf("FetchAndWriteNeedle failed: %v", err) + } + if resp.GetETag() == "" { + t.Fatal("FetchAndWriteNeedle returned empty e_tag") + } + t.Logf("FetchAndWriteNeedle success: e_tag=%s", resp.GetETag()) +} + +// TestFetchAndWriteNeedleFromS3WithPartialRead tests reading a byte range from S3. +func TestFetchAndWriteNeedleFromS3WithPartialRead(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + s3Endpoint, cleanupMini := startWeedMini(t) + defer cleanupMini() + + s3Client := newS3Client(s3Endpoint) + + bucket := "partial-read-test" + s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) + + // Upload 1KB of data + fullData := make([]byte, 1024) + for i := range fullData { + fullData[i] = byte(i % 256) + } + s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), Key: aws.String("big.dat"), + Body: bytes.NewReader(fullData), + }) + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + framework.AllocateVolume(t, grpcClient, 98, "") + + grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer grpcCancel() + + // Fetch only bytes 100-199 (100 bytes) from the 1KB object + resp, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ + VolumeId: 98, NeedleId: 7, Cookie: 999, + Offset: 100, Size: 100, + RemoteConf: &remote_pb.RemoteConf{ + Name: "test-s3-partial", Type: "s3", + S3AccessKey: "admin", S3SecretKey: "admin", + S3Region: "us-east-1", S3Endpoint: s3Endpoint, S3ForcePathStyle: true, + }, + RemoteLocation: &remote_pb.RemoteStorageLocation{ + Name: "test-s3-partial", Bucket: bucket, Path: "/big.dat", + }, + }) + if err != nil { + t.Fatalf("FetchAndWriteNeedle partial read failed: %v", err) + } + if resp.GetETag() == "" { + t.Fatal("empty e_tag for partial read") + } + t.Logf("FetchAndWriteNeedle partial read success: e_tag=%s", resp.GetETag()) +} + +// TestFetchAndWriteNeedleS3NotFound tests that fetching a non-existent S3 object returns an error. +func TestFetchAndWriteNeedleS3NotFound(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + s3Endpoint, cleanupMini := startWeedMini(t) + defer cleanupMini() + + s3Client := newS3Client(s3Endpoint) + + bucket := "notfound-test" + s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucket)}) + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + framework.AllocateVolume(t, grpcClient, 97, "") + + grpcCtx, grpcCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer grpcCancel() + + _, err := grpcClient.FetchAndWriteNeedle(grpcCtx, &volume_server_pb.FetchAndWriteNeedleRequest{ + VolumeId: 97, NeedleId: 1, Cookie: 1, + Offset: 0, Size: 100, + RemoteConf: &remote_pb.RemoteConf{ + Name: "test-s3-nf", Type: "s3", + S3AccessKey: "admin", S3SecretKey: "admin", + S3Region: "us-east-1", S3Endpoint: s3Endpoint, S3ForcePathStyle: true, + }, + RemoteLocation: &remote_pb.RemoteStorageLocation{ + Name: "test-s3-nf", Bucket: bucket, Path: "/does-not-exist.dat", + }, + }) + if err == nil { + t.Fatal("FetchAndWriteNeedle should fail for non-existent object") + } + if !strings.Contains(err.Error(), "read from remote") { + t.Fatalf("expected 'read from remote' error, got: %v", err) + } + t.Logf("correctly got error for non-existent object: %v", err) +}