Browse Source

Merge branch 'master' into add-ec-vacuum

add-ec-vacuum
chrislu 4 months ago
parent
commit
f9e7ca4308
  1. 4
      .github/workflows/binaries_dev.yml
  2. 2
      .github/workflows/binaries_release0.yml
  3. 2
      .github/workflows/binaries_release1.yml
  4. 2
      .github/workflows/binaries_release2.yml
  5. 2
      .github/workflows/binaries_release3.yml
  6. 2
      .github/workflows/binaries_release4.yml
  7. 2
      .github/workflows/binaries_release5.yml
  8. 2
      .github/workflows/codeql.yml
  9. 2
      .github/workflows/container_dev.yml
  10. 2
      .github/workflows/container_latest.yml
  11. 2
      .github/workflows/container_release1.yml
  12. 2
      .github/workflows/container_release2.yml
  13. 2
      .github/workflows/container_release3.yml
  14. 2
      .github/workflows/container_release4.yml
  15. 2
      .github/workflows/container_release5.yml
  16. 2
      .github/workflows/deploy_telemetry.yml
  17. 2
      .github/workflows/depsreview.yml
  18. 2
      .github/workflows/e2e.yml
  19. 2
      .github/workflows/fuse-integration.yml
  20. 2
      .github/workflows/go.yml
  21. 2
      .github/workflows/helm_chart_release.yml
  22. 2
      .github/workflows/helm_ci.yml
  23. 14
      .github/workflows/s3-go-tests.yml
  24. 10
      .github/workflows/s3tests.yml
  25. 2
      .github/workflows/test-s3-over-https-using-awscli.yml
  26. 16
      go.mod
  27. 32
      go.sum
  28. 9
      weed/command/filer.go
  29. 17
      weed/filer/tikv/tikv_store.go
  30. 55
      weed/s3api/chunked_bug_reproduction_test.go
  31. 44
      weed/s3api/chunked_reader_v4.go
  32. 547
      weed/server/filer_server_handlers_copy.go
  33. 2
      weed/server/filer_server_handlers_write.go

4
.github/workflows/binaries_dev.yml

@ -38,7 +38,7 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Set BUILD_TIME env
run: echo BUILD_TIME=$(date -u +%Y%m%d-%H%M) >> ${GITHUB_ENV}
@ -87,7 +87,7 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Set BUILD_TIME env
run: echo BUILD_TIME=$(date -u +%Y%m%d-%H%M) >> ${GITHUB_ENV}

2
.github/workflows/binaries_release0.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/binaries_release1.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/binaries_release2.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/binaries_release3.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/binaries_release4.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/binaries_release5.yml

@ -28,7 +28,7 @@ jobs:
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Go Release Binaries Normal Volume Size
uses: wangyoucao577/go-release-action@481a2c1a0f1be199722e3e9b74d7199acafc30a8 # v1.22
with:

2
.github/workflows/codeql.yml

@ -18,7 +18,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL

2
.github/workflows/container_dev.yml

@ -16,7 +16,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_latest.yml

@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_release1.yml

@ -16,7 +16,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_release2.yml

@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_release3.yml

@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_release4.yml

@ -16,7 +16,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/container_release5.yml

@ -16,7 +16,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
-
name: Docker meta
id: docker_meta

2
.github/workflows/deploy_telemetry.yml

@ -21,7 +21,7 @@ jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5

2
.github/workflows/depsreview.yml

@ -9,6 +9,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: 'Dependency Review'
uses: actions/dependency-review-action@da24556b548a50705dd671f47852072ea4c105d9

2
.github/workflows/e2e.yml

@ -30,7 +30,7 @@ jobs:
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Install dependencies
run: |

2
.github/workflows/fuse-integration.yml

@ -33,7 +33,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go ${{ env.GO_VERSION }}
uses: actions/setup-go@v5

2
.github/workflows/go.yml

@ -27,7 +27,7 @@ jobs:
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v2
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v2
- name: Get dependencies
run: |

2
.github/workflows/helm_chart_release.yml

@ -12,7 +12,7 @@ jobs:
release:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
- name: Publish Helm charts
uses: stefanprodan/helm-gh-pages@v1.7.0
with:

2
.github/workflows/helm_ci.yml

@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8
with:
fetch-depth: 0

14
.github/workflows/s3-go-tests.yml

@ -25,7 +25,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -89,7 +89,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -137,7 +137,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -188,7 +188,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -255,7 +255,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -319,7 +319,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
@ -370,7 +370,7 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5

10
.github/workflows/s3tests.yml

@ -20,7 +20,7 @@ jobs:
timeout-minutes: 15
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go 1.x
uses: actions/setup-go@v5
@ -313,7 +313,7 @@ jobs:
timeout-minutes: 15
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go 1.x
uses: actions/setup-go@v5
@ -439,7 +439,7 @@ jobs:
timeout-minutes: 10
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go 1.x
uses: actions/setup-go@v5
@ -562,7 +562,7 @@ jobs:
timeout-minutes: 10
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go 1.x
uses: actions/setup-go@v5
@ -662,7 +662,7 @@ jobs:
timeout-minutes: 15
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Set up Go 1.x
uses: actions/setup-go@v5

2
.github/workflows/test-s3-over-https-using-awscli.yml

@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- uses: actions/setup-go@v5
with:

16
go.mod

@ -99,16 +99,16 @@ require (
gocloud.dev v0.43.0
gocloud.dev/pubsub/natspubsub v0.43.0
gocloud.dev/pubsub/rabbitpubsub v0.43.0
golang.org/x/crypto v0.40.0
golang.org/x/crypto v0.41.0
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b
golang.org/x/image v0.29.0
golang.org/x/net v0.42.0
golang.org/x/net v0.43.0
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.34.0
golang.org/x/text v0.27.0 // indirect
golang.org/x/sys v0.35.0
golang.org/x/text v0.28.0 // indirect
golang.org/x/tools v0.35.0
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.244.0
google.golang.org/api v0.246.0
google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/grpc v1.74.2
google.golang.org/protobuf v1.36.6
@ -147,13 +147,13 @@ require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rclone/rclone v1.70.3
github.com/rdleal/intervalst v1.5.0
github.com/redis/go-redis/v9 v9.11.0
github.com/redis/go-redis/v9 v9.12.0
github.com/schollz/progressbar/v3 v3.18.0
github.com/shirou/gopsutil/v3 v3.24.5
github.com/tarantool/go-tarantool/v2 v2.4.0
github.com/tikv/client-go/v2 v2.0.7
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.4
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.5
go.etcd.io/etcd/client/pkg/v3 v3.6.4
go.uber.org/atomic v1.11.0
golang.org/x/sync v0.16.0
@ -393,7 +393,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/term v0.33.0 // indirect
golang.org/x/term v0.34.0 // indirect
golang.org/x/time v0.12.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0 // indirect

32
go.sum

@ -1494,8 +1494,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rdleal/intervalst v1.5.0 h1:SEB9bCFz5IqD1yhfH1Wv8IBnY/JQxDplwkxHjT6hamU=
github.com/rdleal/intervalst v1.5.0/go.mod h1:xO89Z6BC+LQDH+IPQQw/OESt5UADgFD41tYMUINGpxQ=
github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs=
github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.12.0 h1:XlVPGlflh4nxfhsNXPA8Qp6EmEfTo0rp8oaBzPipXnU=
github.com/redis/go-redis/v9 v9.12.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/rekby/fixenv v0.3.2/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
@ -1676,8 +1676,8 @@ github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.0 h1:/NyPd9KnCJgzrEXCArqk1T
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.0/go.mod h1:9YzkhlIymWaJGX6KMU3vh5sOf3UKbCXkG/ZdjaI3zNM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.44.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGgzPJusoI3zWMNb8I+GnM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.47.3/go.mod h1:bWnOIcUHd7+Sl7DN+yhyY1H/I61z53GczvwJgXMgvj0=
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.4 h1:4Ivg/MqjZxAgkbMTDeqAHsfzVWLGdVznanlFLoY8RzQ=
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.4/go.mod h1:Pp1w2xxUoLQ3NCNAwV7pvDq0TVQOdtAqs+ZiC+i8r14=
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.5 h1:olAAZfpMnFYChJNgZJ16G4jqoelRNx7Kx4tW50XcMv0=
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.5/go.mod h1:Pp1w2xxUoLQ3NCNAwV7pvDq0TVQOdtAqs+ZiC+i8r14=
github.com/ydb-platform/ydb-go-yc v0.12.1 h1:qw3Fa+T81+Kpu5Io2vYHJOwcrYrVjgJlT6t/0dOXJrA=
github.com/ydb-platform/ydb-go-yc v0.12.1/go.mod h1:t/ZA4ECdgPWjAb4jyDe8AzQZB5dhpGbi3iCahFaNwBY=
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 h1:9E5q8Nsy2RiJMZDNVy0A3KUrIMBPakJ2VgloeWbcI84=
@ -1801,8 +1801,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -1941,8 +1941,8 @@ golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -2100,8 +2100,8 @@ golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@ -2118,8 +2118,8 @@ golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -2140,8 +2140,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@ -2295,8 +2295,8 @@ google.golang.org/api v0.106.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/
google.golang.org/api v0.107.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI=
google.golang.org/api v0.244.0 h1:lpkP8wVibSKr++NCD36XzTk/IzeKJ3klj7vbj+XU5pE=
google.golang.org/api v0.244.0/go.mod h1:dMVhVcylamkirHdzEBAIQWUCgqY885ivNeZYd7VAVr8=
google.golang.org/api v0.246.0 h1:H0ODDs5PnMZVZAEtdLMn2Ul2eQi7QNjqM2DIFp8TlTM=
google.golang.org/api v0.246.0/go.mod h1:dMVhVcylamkirHdzEBAIQWUCgqY885ivNeZYd7VAVr8=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=

9
weed/command/filer.go

@ -157,6 +157,8 @@ func init() {
filerSftpOptions.clientAliveInterval = cmdFiler.Flag.Duration("sftp.clientAliveInterval", 5*time.Second, "interval for sending keep-alive messages")
filerSftpOptions.clientAliveCountMax = cmdFiler.Flag.Int("sftp.clientAliveCountMax", 3, "maximum number of missed keep-alive messages before disconnecting")
filerSftpOptions.userStoreFile = cmdFiler.Flag.String("sftp.userStoreFile", "", "path to JSON file containing user credentials and permissions")
filerSftpOptions.dataCenter = cmdFiler.Flag.String("sftp.dataCenter", "", "prefer to read and write to volumes in this data center")
filerSftpOptions.bindIp = cmdFiler.Flag.String("sftp.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
filerSftpOptions.localSocket = cmdFiler.Flag.String("sftp.localSocket", "", "default to /tmp/seaweedfs-sftp-<port>.sock")
}
@ -256,13 +258,16 @@ func runFiler(cmd *Command, args []string) bool {
}
if *filerStartSftp {
sftpOptions.filer = &filerAddress
filerSftpOptions.filer = &filerAddress
if *filerSftpOptions.bindIp == "" {
filerSftpOptions.bindIp = f.bindIp
}
if *f.dataCenter != "" && *filerSftpOptions.dataCenter == "" {
filerSftpOptions.dataCenter = f.dataCenter
}
go func(delay time.Duration) {
time.Sleep(delay * time.Second)
sftpOptions.startSftpServer()
filerSftpOptions.startSftpServer()
}(startDelay)
}

17
weed/filer/tikv/tikv_store.go

@ -10,6 +10,7 @@ import (
"fmt"
"io"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -265,7 +266,21 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
break
}
// Only increment counter after successful processing
// Check TTL expiration before calling eachEntryFunc (similar to Redis stores)
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
// Entry is expired, delete it and continue without counting toward limit
if deleteErr := store.DeleteEntry(ctx, entry.FullPath); deleteErr != nil {
glog.V(0).InfofCtx(ctx, "failed to delete expired entry %s: %v", entry.FullPath, deleteErr)
}
if err := iter.Next(); err != nil {
break
}
continue
}
}
// Only increment counter for non-expired entries
i++
if err := iter.Next(); !eachEntryFunc(entry) || err != nil {

55
weed/s3api/chunked_bug_reproduction_test.go

@ -0,0 +1,55 @@
package s3api
import (
"bytes"
"io"
"net/http"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// TestChunkedEncodingMixedFormat tests the fix for GitHub issue #6847
// where AWS SDKs send mixed format: unsigned streaming headers but signed chunk data
func TestChunkedEncodingMixedFormat(t *testing.T) {
expectedContent := "hello world\n"
// Create the problematic mixed format payload:
// - Unsigned streaming headers (STREAMING-UNSIGNED-PAYLOAD-TRAILER)
// - But chunk data contains chunk-signature headers
mixedFormatPayload := "c;chunk-signature=347f6c62acd95b7c6ae18648776024a9e8cd6151184a5e777ea8e1d9b4e45b3c\r\n" +
"hello world\n\r\n" +
"0;chunk-signature=1a99b7790b8db0f4bfc048c8802056c3179d561e40c073167e79db5f1a6af4b2\r\n" +
"x-amz-checksum-crc32:rwg7LQ==\r\n" +
"\r\n"
// Create HTTP request with unsigned streaming headers
req, _ := http.NewRequest("PUT", "/test-bucket/test-object", bytes.NewReader([]byte(mixedFormatPayload)))
req.Header.Set("x-amz-content-sha256", "STREAMING-UNSIGNED-PAYLOAD-TRAILER")
req.Header.Set("x-amz-trailer", "x-amz-checksum-crc32")
// Process through SeaweedFS chunked reader
iam := setupTestIAM()
reader, errCode := iam.newChunkedReader(req)
if errCode != s3err.ErrNone {
t.Fatalf("Failed to create chunked reader: %v", errCode)
}
// Read the content
actualContent, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("Failed to read content: %v", err)
}
// Should correctly extract just the content, ignoring chunk signatures
if string(actualContent) != expectedContent {
t.Errorf("Mixed format handling failed. Expected: %q, Got: %q", expectedContent, string(actualContent))
}
}
// setupTestIAM creates a test IAM instance using the same pattern as existing tests
func setupTestIAM() *IdentityAccessManagement {
iam := &IdentityAccessManagement{}
return iam
}

44
weed/s3api/chunked_reader_v4.go

@ -440,18 +440,35 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
continue
}
case verifyChunk:
// Calculate the hashed chunk.
hashedChunk := hex.EncodeToString(cr.chunkSHA256Writer.Sum(nil))
// Calculate the chunk signature.
newSignature := cr.getChunkSignature(hashedChunk)
if !compareSignatureV4(cr.chunkSignature, newSignature) {
// Chunk signature doesn't match we return signature does not match.
cr.err = errors.New(s3err.ErrMsgChunkSignatureMismatch)
return 0, cr.err
// Check if we have credentials for signature verification
// This handles the case where we have unsigned streaming (no cred) but chunks contain signatures
//
// BUG FIX for GitHub issue #6847:
// Some AWS SDK versions (Java 3.7.412+, .NET 4.0.0-preview.6+) send mixed format:
// - HTTP headers indicate unsigned streaming (STREAMING-UNSIGNED-PAYLOAD-TRAILER)
// - But chunk data contains chunk-signature headers (normally only for signed streaming)
// This causes a nil pointer dereference when trying to verify signatures without credentials
if cr.cred != nil {
// Normal signed streaming - verify the chunk signature
// Calculate the hashed chunk.
hashedChunk := hex.EncodeToString(cr.chunkSHA256Writer.Sum(nil))
// Calculate the chunk signature.
newSignature := cr.getChunkSignature(hashedChunk)
if !compareSignatureV4(cr.chunkSignature, newSignature) {
// Chunk signature doesn't match we return signature does not match.
cr.err = errors.New(s3err.ErrMsgChunkSignatureMismatch)
return 0, cr.err
}
// Newly calculated signature becomes the seed for the next chunk
// this follows the chaining.
cr.seedSignature = newSignature
} else {
// For unsigned streaming, we should not verify chunk signatures even if they are present
// This fixes the bug where AWS SDKs send chunk signatures with unsigned streaming headers
glog.V(3).Infof("Skipping chunk signature verification for unsigned streaming")
}
// Newly calculated signature becomes the seed for the next chunk
// this follows the chaining.
cr.seedSignature = newSignature
// Common cleanup and state transition for both signed and unsigned streaming
cr.chunkSHA256Writer.Reset()
if cr.lastChunk {
cr.state = eofChunk
@ -513,9 +530,10 @@ func readChunkLine(b *bufio.Reader) ([]byte, error) {
if err != nil {
// We always know when EOF is coming.
// If the caller asked for a line, there should be a line.
if err == io.EOF {
switch err {
case io.EOF:
err = io.ErrUnexpectedEOF
} else if err == bufio.ErrBufferFull {
case bufio.ErrBufferFull:
err = errLineTooLong
}
return nil, err

547
weed/server/filer_server_handlers_copy.go

@ -0,0 +1,547 @@
package weed_server
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
src := r.URL.Query().Get("cp.from")
dst := r.URL.Path
glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst)
var err error
if src, err = clearName(src); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
if dst, err = clearName(dst); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
src = strings.TrimRight(src, "/")
if src == "" {
err = fmt.Errorf("invalid source '/'")
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
srcPath := util.FullPath(src)
dstPath := util.FullPath(dst)
if dstPath.IsLongerFileName(so.MaxFileNameLength) {
err = fmt.Errorf("dst name too long")
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
if err != nil {
err = fmt.Errorf("failed to get src entry '%s': %w", src, err)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
glog.V(1).InfofCtx(ctx, "FilerServer.copy source entry: content_len=%d, chunks_len=%d", len(srcEntry.Content), len(srcEntry.GetChunks()))
// Check if source is a directory - currently not supported for recursive copying
if srcEntry.IsDirectory() {
err = fmt.Errorf("copy: directory copying not yet supported for '%s'", src)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
_, oldName := srcPath.DirAndName()
finalDstPath := dstPath
// Check if destination is a directory
dstPathEntry, findErr := fs.filer.FindEntry(ctx, dstPath)
if findErr != nil && findErr != filer_pb.ErrNotFound {
err = fmt.Errorf("failed to check destination path %s: %w", dstPath, findErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
if findErr == nil && dstPathEntry.IsDirectory() {
finalDstPath = dstPath.Child(oldName)
} else {
newDir, newName := dstPath.DirAndName()
newName = util.Nvl(newName, oldName)
finalDstPath = util.FullPath(newDir).Child(newName)
}
// Check if destination file already exists
// TODO: add an overwrite parameter to allow overwriting
if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound {
err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if dstEntry != nil {
err = fmt.Errorf("destination file %s already exists", finalDstPath)
writeJsonError(w, r, http.StatusConflict, err)
return
}
// Copy the file content and chunks
newEntry, err := fs.copyEntry(ctx, srcEntry, finalDstPath, so)
if err != nil {
err = fmt.Errorf("failed to copy entry from '%s' to '%s': %w", src, dst, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath)
w.WriteHeader(http.StatusNoContent)
}
// copyEntry creates a new entry with copied content and chunks
func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) {
// Create the base entry structure
// Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
// This creates an independent copy rather than another hard link to the same content
newEntry := &filer.Entry{
FullPath: dstPath,
// Deep copy Attr field to ensure slice independence (GroupNames, Md5)
Attr: func(a filer.Attr) filer.Attr {
a.GroupNames = append([]string(nil), a.GroupNames...)
a.Md5 = append([]byte(nil), a.Md5...)
return a
}(srcEntry.Attr),
Quota: srcEntry.Quota,
// Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy
}
// Deep copy Extended fields to ensure independence
if srcEntry.Extended != nil {
newEntry.Extended = make(map[string][]byte, len(srcEntry.Extended))
for k, v := range srcEntry.Extended {
newEntry.Extended[k] = append([]byte(nil), v...)
}
}
// Deep copy Remote field to ensure independence
if srcEntry.Remote != nil {
newEntry.Remote = &filer_pb.RemoteEntry{
StorageName: srcEntry.Remote.StorageName,
LastLocalSyncTsNs: srcEntry.Remote.LastLocalSyncTsNs,
RemoteETag: srcEntry.Remote.RemoteETag,
RemoteMtime: srcEntry.Remote.RemoteMtime,
RemoteSize: srcEntry.Remote.RemoteSize,
}
}
// Log if we're copying a hard link so we can track this behavior
if len(srcEntry.HardLinkId) > 0 {
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file", srcEntry.FullPath, srcEntry.HardLinkCounter)
}
// Handle small files stored in Content field
if len(srcEntry.Content) > 0 {
// For small files, just copy the content directly
newEntry.Content = make([]byte, len(srcEntry.Content))
copy(newEntry.Content, srcEntry.Content)
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied content directly, size=%d", len(newEntry.Content))
return newEntry, nil
}
// Handle files stored as chunks (including resolved hard link content)
if len(srcEntry.GetChunks()) > 0 {
srcChunks := srcEntry.GetChunks()
// Create HTTP client once for reuse across all chunk operations
client := &http.Client{Timeout: 60 * time.Second}
// Check if any chunks are manifest chunks - these require special handling
if filer.HasChunkManifest(srcChunks) {
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so, client)
if err != nil {
return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
}
newEntry.Chunks = newChunks
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
} else {
// Regular chunks without manifest - copy directly
newChunks, err := fs.copyChunks(ctx, srcChunks, so, client)
if err != nil {
return nil, fmt.Errorf("failed to copy chunks: %w", err)
}
newEntry.Chunks = newChunks
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied regular chunks, count=%d", len(newChunks))
}
return newEntry, nil
}
// Empty file case (or hard link with no content - should not happen if hard link was properly resolved)
if len(srcEntry.HardLinkId) > 0 {
glog.WarningfCtx(ctx, "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution", srcEntry.FullPath)
}
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: empty file, no content or chunks to copy")
return newEntry, nil
}
// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
if len(srcChunks) == 0 {
return nil, nil
}
// Optimize: Batch volume lookup for all chunks to reduce RPC calls
volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
if err != nil {
return nil, fmt.Errorf("failed to lookup volume locations: %w", err)
}
// Parallel chunk copying with concurrency control using errgroup
const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations
// Pre-allocate result slice to maintain order
newChunks := make([]*filer_pb.FileChunk, len(srcChunks))
// Use errgroup for cleaner concurrency management
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrentChunks) // Limit concurrent goroutines
// Validate that all chunk locations are available before starting any concurrent work
for _, chunk := range srcChunks {
volumeId := chunk.Fid.VolumeId
locations, ok := volumeLocationsMap[volumeId]
if !ok || len(locations) == 0 {
return nil, fmt.Errorf("no locations found for volume %d", volumeId)
}
}
glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d", len(srcChunks), maxConcurrentChunks)
// Launch goroutines for each chunk
for i, srcChunk := range srcChunks {
// Capture loop variables for goroutine closure
chunkIndex := i
chunk := srcChunk
chunkLocations := volumeLocationsMap[srcChunk.Fid.VolumeId]
g.Go(func() error {
glog.V(3).InfofCtx(gCtx, "FilerServer.copyChunks: copying chunk %d/%d, size=%d", chunkIndex+1, len(srcChunks), chunk.Size)
// Use streaming copy to avoid loading entire chunk into memory
newChunk, err := fs.streamCopyChunk(gCtx, chunk, so, client, chunkLocations)
if err != nil {
return fmt.Errorf("failed to copy chunk %d (%s): %w", chunkIndex+1, chunk.GetFileIdString(), err)
}
// Store result at correct index to maintain order
newChunks[chunkIndex] = newChunk
glog.V(4).InfofCtx(gCtx, "FilerServer.copyChunks: successfully copied chunk %d/%d", chunkIndex+1, len(srcChunks))
return nil
})
}
// Wait for all chunks to complete and return first error (if any)
if err := g.Wait(); err != nil {
return nil, err
}
// Verify all chunks were copied (shouldn't happen if no errors, but safety check)
for i, chunk := range newChunks {
if chunk == nil {
return nil, fmt.Errorf("chunk %d was not copied (internal error)", i)
}
}
glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: successfully completed parallel copy of %d chunks", len(srcChunks))
return newChunks, nil
}
// copyChunksWithManifest handles copying chunks that include manifest chunks
func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
if len(srcChunks) == 0 {
return nil, nil
}
glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)", len(srcChunks))
// Separate manifest chunks from regular data chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(srcChunks)
var newChunks []*filer_pb.FileChunk
// First, copy all non-manifest chunks directly
if len(nonManifestChunks) > 0 {
glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so, client)
if err != nil {
return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
}
newChunks = append(newChunks, newNonManifestChunks...)
}
// Process each manifest chunk separately
for i, manifestChunk := range manifestChunks {
glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d", i+1, len(manifestChunks))
// Resolve the manifest chunk to get the actual data chunks it references
lookupFileIdFn := func(ctx context.Context, fileId string) (urls []string, err error) {
return fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
}
resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFileIdFn, manifestChunk)
if err != nil {
return nil, fmt.Errorf("failed to resolve manifest chunk %s: %w", manifestChunk.GetFileIdString(), err)
}
glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks",
manifestChunk.GetFileIdString(), len(resolvedChunks))
// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so, client)
if err != nil {
return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
}
// Create a new manifest chunk that references the copied data chunks
newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so, client)
if err != nil {
return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
}
newChunks = append(newChunks, newManifestChunk)
glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks",
newManifestChunk.GetFileIdString(), len(newResolvedChunks))
}
glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)",
len(newChunks), len(manifestChunks), len(nonManifestChunks))
return newChunks, nil
}
// createManifestChunk creates a new manifest chunk that references the provided data chunks
func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) (*filer_pb.FileChunk, error) {
// Create the manifest data structure
filer_pb.BeforeEntrySerialization(dataChunks)
manifestData := &filer_pb.FileChunkManifest{
Chunks: dataChunks,
}
// Serialize the manifest
data, err := proto.Marshal(manifestData)
if err != nil {
return nil, fmt.Errorf("failed to marshal manifest: %w", err)
}
// Save the manifest data as a new chunk
saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
// Assign a new file ID
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
if assignErr != nil {
return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr)
}
// Upload the manifest data
err = fs.uploadData(ctx, reader, urlLocation, string(auth), client)
if err != nil {
return nil, fmt.Errorf("failed to upload manifest data: %w", err)
}
// Create the chunk metadata
chunk = &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(len(data)),
}
return chunk, nil
}
manifestChunk, err := saveFunc(bytes.NewReader(data), "", originalManifest.Offset, 0)
if err != nil {
return nil, fmt.Errorf("failed to save manifest chunk: %w", err)
}
// Set manifest-specific properties
manifestChunk.IsChunkManifest = true
manifestChunk.Size = originalManifest.Size
return manifestChunk, nil
}
// uploadData uploads data to a volume server
func (fs *FilerServer) uploadData(ctx context.Context, reader io.Reader, urlLocation, auth string, client *http.Client) error {
req, err := http.NewRequestWithContext(ctx, "PUT", urlLocation, reader)
if err != nil {
return fmt.Errorf("failed to create upload request: %w", err)
}
if auth != "" {
req.Header.Set("Authorization", "Bearer "+auth)
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to upload data: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return fmt.Errorf("upload failed with status %d, and failed to read response: %w", resp.StatusCode, readErr)
}
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks
func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks []*filer_pb.FileChunk) (map[uint32][]operation.Location, error) {
// Collect unique volume IDs and their string representations to avoid repeated conversions
volumeIdMap := make(map[uint32]string)
for _, chunk := range chunks {
vid := chunk.Fid.VolumeId
if _, found := volumeIdMap[vid]; !found {
volumeIdMap[vid] = fmt.Sprintf("%d", vid)
}
}
if len(volumeIdMap) == 0 {
return make(map[uint32][]operation.Location), nil
}
// Convert to slice of strings for the lookup call
volumeIdStrs := make([]string, 0, len(volumeIdMap))
for _, vidStr := range volumeIdMap {
volumeIdStrs = append(volumeIdStrs, vidStr)
}
// Perform single batched lookup
lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster, fs.grpcDialOption, volumeIdStrs)
if err != nil {
return nil, fmt.Errorf("failed to lookup volumes: %w", err)
}
// Convert result to map of volumeId -> locations
volumeLocationsMap := make(map[uint32][]operation.Location)
for volumeId, volumeIdStr := range volumeIdMap {
if volumeLocations, ok := lookupResult[volumeIdStr]; ok && len(volumeLocations.Locations) > 0 {
volumeLocationsMap[volumeId] = volumeLocations.Locations
}
}
return volumeLocationsMap, nil
}
// streamCopyChunk copies a chunk using streaming to minimize memory usage
func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) {
// Assign a new file ID for destination
fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
if err != nil {
return nil, fmt.Errorf("failed to assign new file ID: %w", err)
}
// Try all available locations for source chunk until one succeeds
fileIdString := srcChunk.GetFileIdString()
var lastErr error
for i, location := range locations {
srcUrl := fmt.Sprintf("http://%s/%s", location.Url, fileIdString)
glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)", srcUrl, urlLocation, i+1, len(locations))
// Perform streaming copy using HTTP client
err := fs.performStreamCopy(ctx, srcUrl, urlLocation, string(auth), srcChunk.Size, client)
if err != nil {
lastErr = err
glog.V(2).InfofCtx(ctx, "FilerServer.streamCopyChunk: failed streaming copy from %s: %v", srcUrl, err)
continue
}
// Success - create chunk metadata
newChunk := &filer_pb.FileChunk{
FileId: fileId,
Offset: srcChunk.Offset,
Size: srcChunk.Size,
ETag: srcChunk.ETag,
}
glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: successfully streamed %d bytes", srcChunk.Size)
return newChunk, nil
}
// All locations failed
return nil, fmt.Errorf("failed to stream copy chunk from any location: %w", lastErr)
}
// performStreamCopy performs the actual streaming copy from source URL to destination URL
func (fs *FilerServer) performStreamCopy(ctx context.Context, srcUrl, dstUrl, auth string, expectedSize uint64, client *http.Client) error {
// Create HTTP request to read from source
req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
if err != nil {
return fmt.Errorf("failed to create source request: %v", err)
}
// Perform source request
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to read from source: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("source returned status %d", resp.StatusCode)
}
// Create HTTP request to write to destination
dstReq, err := http.NewRequestWithContext(ctx, "PUT", dstUrl, resp.Body)
if err != nil {
return fmt.Errorf("failed to create destination request: %v", err)
}
dstReq.ContentLength = int64(expectedSize)
// Set authorization header if provided
if auth != "" {
dstReq.Header.Set("Authorization", "Bearer "+auth)
}
dstReq.Header.Set("Content-Type", "application/octet-stream")
// Perform destination request
dstResp, err := client.Do(dstReq)
if err != nil {
return fmt.Errorf("failed to write to destination: %v", err)
}
defer dstResp.Body.Close()
if dstResp.StatusCode != http.StatusCreated && dstResp.StatusCode != http.StatusOK {
// Read error response body for more details
body, readErr := io.ReadAll(dstResp.Body)
if readErr != nil {
return fmt.Errorf("destination returned status %d, and failed to read body: %w", dstResp.StatusCode, readErr)
}
return fmt.Errorf("destination returned status %d: %s", dstResp.StatusCode, string(body))
}
glog.V(4).InfofCtx(ctx, "FilerServer.performStreamCopy: successfully streamed data from %s to %s", srcUrl, dstUrl)
return nil
}

2
weed/server/filer_server_handlers_write.go

@ -116,6 +116,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
if query.Has("mv.from") {
fs.move(ctx, w, r, so)
} else if query.Has("cp.from") {
fs.copy(ctx, w, r, so)
} else {
fs.autoChunk(ctx, w, r, contentLength, so)
}

Loading…
Cancel
Save