Browse Source

Merge remote-tracking branch 'origin/master'

pull/7312/head
Konstantin Lebedev 2 months ago
parent
commit
7f456a4c97
  1. 15
      go.mod
  2. 45
      go.sum
  3. 5
      k8s/charts/seaweedfs/templates/s3/s3-deployment.yaml
  4. 7
      k8s/charts/seaweedfs/values.yaml
  5. 1
      test/s3/multipart/aws_upload.go
  6. 2
      unmaintained/diff_volume_servers/diff_volume_servers.go
  7. 5
      unmaintained/s3/presigned_put/presigned_put.go
  8. 2
      unmaintained/stream_read_volume/stream_read_volume.go
  9. 2
      unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
  10. 2
      unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
  11. 4
      unmaintained/volume_tailer/volume_tailer.go
  12. 1
      weed/mq/broker/broker_grpc_pub.go
  13. 120
      weed/remote_storage/azure/azure_highlevel.go
  14. 283
      weed/remote_storage/azure/azure_storage_client.go
  15. 377
      weed/remote_storage/azure/azure_storage_client_test.go
  16. 92
      weed/replication/sink/azuresink/azure_sink.go
  17. 355
      weed/replication/sink/azuresink/azure_sink_test.go
  18. 2
      weed/s3api/policy_engine/types.go
  19. 6
      weed/s3api/s3api_object_handlers_put.go
  20. 56
      weed/s3api/s3api_object_handlers_put_test.go
  21. 4
      weed/s3api/s3err/s3-error.go
  22. 8
      weed/s3api/s3err/s3api_errors.go
  23. 5
      weed/server/filer_server_handlers_write.go
  24. 19
      weed/server/filer_server_handlers_write_autochunk.go
  25. 28
      weed/shell/command_volume_check_disk.go
  26. 7
      weed/util/constants/filer.go

15
go.mod

@ -8,8 +8,6 @@ require (
cloud.google.com/go v0.121.6 // indirect
cloud.google.com/go/pubsub v1.50.1
cloud.google.com/go/storage v1.56.2
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Shopify/sarama v1.38.1
github.com/aws/aws-sdk-go v1.55.8
github.com/beorn7/perks v1.0.1 // indirect
@ -57,7 +55,6 @@ require (
github.com/kurin/blazer v0.5.3
github.com/linxGnu/grocksdb v1.10.2
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@ -93,7 +90,7 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.etcd.io/etcd/client/v3 v3.6.4
go.etcd.io/etcd/client/v3 v3.6.5
go.mongodb.org/mongo-driver v1.17.4
go.opencensus.io v0.24.0 // indirect
gocloud.dev v0.43.0
@ -116,7 +113,7 @@ require (
modernc.org/b v1.0.0 // indirect
modernc.org/mathutil v1.7.1
modernc.org/memory v1.11.0 // indirect
modernc.org/sqlite v1.38.2
modernc.org/sqlite v1.39.0
modernc.org/strutil v1.2.1
)
@ -150,7 +147,7 @@ require (
github.com/parquet-go/parquet-go v0.25.1
github.com/pkg/sftp v1.13.9
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rclone/rclone v1.71.0
github.com/rclone/rclone v1.71.1
github.com/rdleal/intervalst v1.5.0
github.com/redis/go-redis/v9 v9.12.1
github.com/schollz/progressbar/v3 v3.18.0
@ -159,7 +156,7 @@ require (
github.com/tikv/client-go/v2 v2.0.7
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.113.5
go.etcd.io/etcd/client/pkg/v3 v3.6.4
go.etcd.io/etcd/client/pkg/v3 v3.6.5
go.uber.org/atomic v1.11.0
golang.org/x/sync v0.17.0
golang.org/x/tools/godoc v0.1.0-deprecated
@ -232,7 +229,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0 // indirect
@ -430,7 +427,7 @@ require (
github.com/zeebo/blake3 v0.2.4 // indirect
github.com/zeebo/errs v1.4.0 // indirect
go.etcd.io/bbolt v1.4.2 // indirect
go.etcd.io/etcd/api/v3 v3.6.4 // indirect
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.37.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 // indirect

45
go.sum

@ -541,8 +541,6 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 h1:5YTBM8QDVIBN3sxBil89WfdAAqDZbyJTgh688DSxX5w=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1/go.mod h1:YD5h/ldMsG0XiIw7PdyNhLxaM317eFh5yNLccNfGdyw=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 h1:wL5IEG5zb7BVv1Kv0Xm92orq+5hB5Nipn3B5tn4Rqfk=
@ -561,20 +559,7 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 h1:FwladfywkNirM+FZY
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2/go.mod h1:vv5Ad0RrIoT1lJFdWBZwt4mB1+j+V8DUroixmKDTCdk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 h1:l3SabZmNuXCMCbQUIeR4W6/N4j8SeH/lwX+a6leZhHo=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2/go.mod h1:k+mEZ4f1pVqZTRqtSDW2AhZ/3wT5qLpsUA75C/k7dtE=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM=
@ -929,8 +914,6 @@ github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
@ -1398,9 +1381,6 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
github.com/mattn/go-ieproxy v0.0.11 h1:MQ/5BuGSgDAHZOJe6YY80IF2UVCfGkwfo6AeD7HtHYo=
github.com/mattn/go-ieproxy v0.0.11/go.mod h1:/NsJd+kxZBmjMc5hrJCKMbP57B84rvq9BiDRbtO9AS0=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
@ -1591,8 +1571,8 @@ github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQB
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rclone/rclone v1.71.0 h1:PK1+IUs3EL3pCdqaeHBPCiDcBpw3MWaMH1eWJsfC2ww=
github.com/rclone/rclone v1.71.0/go.mod h1:NLyX57FrnZ9nVLTY5TRdMmGelrGKbIRYGcgRkNdqqlA=
github.com/rclone/rclone v1.71.1 h1:cpODfWTRz5i/WAzXsyW85tzfIKNsd1aq8CE8lUB+0zg=
github.com/rclone/rclone v1.71.1/go.mod h1:NLyX57FrnZ9nVLTY5TRdMmGelrGKbIRYGcgRkNdqqlA=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rdleal/intervalst v1.5.0 h1:SEB9bCFz5IqD1yhfH1Wv8IBnY/JQxDplwkxHjT6hamU=
@ -1833,12 +1813,12 @@ go.einride.tech/aip v0.73.0 h1:bPo4oqBo2ZQeBKo4ZzLb1kxYXTY1ysJhpvQyfuGzvps=
go.einride.tech/aip v0.73.0/go.mod h1:Mj7rFbmXEgw0dq1dqJ7JGMvYCZZVxmGOR3S4ZcV5LvQ=
go.etcd.io/bbolt v1.4.2 h1:IrUHp260R8c+zYx/Tm8QZr04CX+qWS5PGfPdevhdm1I=
go.etcd.io/bbolt v1.4.2/go.mod h1:Is8rSHO/b4f3XigBC0lL0+4FwAQv3HXEEIgFMuKHceM=
go.etcd.io/etcd/api/v3 v3.6.4 h1:7F6N7toCKcV72QmoUKa23yYLiiljMrT4xCeBL9BmXdo=
go.etcd.io/etcd/api/v3 v3.6.4/go.mod h1:eFhhvfR8Px1P6SEuLT600v+vrhdDTdcfMzmnxVXXSbk=
go.etcd.io/etcd/client/pkg/v3 v3.6.4 h1:9HBYrjppeOfFjBjaMTRxT3R7xT0GLK8EJMVC4xg6ok0=
go.etcd.io/etcd/client/pkg/v3 v3.6.4/go.mod h1:sbdzr2cl3HzVmxNw//PH7aLGVtY4QySjQFuaCgcRFAI=
go.etcd.io/etcd/client/v3 v3.6.4 h1:YOMrCfMhRzY8NgtzUsHl8hC2EBSnuqbR3dh84Uryl7A=
go.etcd.io/etcd/client/v3 v3.6.4/go.mod h1:jaNNHCyg2FdALyKWnd7hxZXZxZANb0+KGY+YQaEMISo=
go.etcd.io/etcd/api/v3 v3.6.5 h1:pMMc42276sgR1j1raO/Qv3QI9Af/AuyQUW6CBAWuntA=
go.etcd.io/etcd/api/v3 v3.6.5/go.mod h1:ob0/oWA/UQQlT1BmaEkWQzI0sJ1M0Et0mMpaABxguOQ=
go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3RrBP8=
go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk=
go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U=
go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo=
go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw=
go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@ -1920,8 +1900,6 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@ -2023,7 +2001,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -2048,7 +2025,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@ -2152,7 +2128,6 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -2745,8 +2720,8 @@ modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
modernc.org/sqlite v1.38.2 h1:Aclu7+tgjgcQVShZqim41Bbw9Cho0y/7WzYptXqkEek=
modernc.org/sqlite v1.38.2/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
modernc.org/sqlite v1.39.0 h1:6bwu9Ooim0yVYA7IZn9demiQk/Ejp0BtTjBWFLymSeY=
modernc.org/sqlite v1.39.0/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=

5
k8s/charts/seaweedfs/templates/s3/s3-deployment.yaml

@ -152,7 +152,10 @@ spec:
{{- if .Values.s3.auditLogConfig }}
-auditLogConfig=/etc/sw/s3_auditLogConfig.json \
{{- end }}
-filer={{ template "seaweedfs.name" . }}-filer-client.{{ .Release.Namespace }}:{{ .Values.filer.port }}
-filer={{ template "seaweedfs.name" . }}-filer-client.{{ .Release.Namespace }}:{{ .Values.filer.port }} \
{{- range .Values.s3.extraArgs }}
{{ . }} \
{{- end }}
volumeMounts:
{{- if or (eq .Values.s3.logs.type "hostPath") (eq .Values.s3.logs.type "emptyDir") }}
- name: logs

7
k8s/charts/seaweedfs/values.yaml

@ -869,7 +869,7 @@ filer:
# anonymousRead: false
s3:
enabled: false
enabled: true
imageOverride: null
restartPolicy: null
replicas: 1
@ -972,6 +972,11 @@ s3:
extraEnvironmentVars:
# Custom command line arguments to add to the s3 command
# Example to fix connection idle seconds:
extraArgs: ["-idleTimeout=30"]
#extraArgs: []
# used to configure livenessProbe on s3 containers
#
livenessProbe:

1
test/s3/multipart/aws_upload.go

@ -108,7 +108,6 @@ func main() {
fmt.Printf("part %d: %v\n", i, part)
}
completeResponse, err := completeMultipartUpload(svc, resp, completedParts)
if err != nil {
fmt.Println(err.Error())

2
unmaintained/diff_volume_servers/diff_volume_servers.go

@ -19,8 +19,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
)
var (

5
unmaintained/s3/presigned_put/presigned_put.go

@ -7,18 +7,21 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"net/http"
"strings"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// Downloads an item from an S3 Bucket in the region configured in the shared config
// or AWS_REGION environment variable.
//
// Usage:
//
// go run presigned_put.go
//
// For this exampl to work, the domainName is needd
//
// weed s3 -domainName=localhost
func main() {
util_http.InitGlobalHttpClient()

2
unmaintained/stream_read_volume/stream_read_volume.go

@ -12,8 +12,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
)
var (

2
unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go

@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"io"
"log"
"math/rand"
@ -13,7 +14,6 @@ import (
"strings"
"sync"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (

2
unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go

@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"io"
"log"
"math/rand"
@ -14,7 +15,6 @@ import (
"strings"
"sync"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (

4
unmaintained/volume_tailer/volume_tailer.go

@ -1,18 +1,18 @@
package main
import (
"context"
"flag"
"github.com/seaweedfs/seaweedfs/weed/pb"
"log"
"time"
"context"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
util2 "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/tools/godoc/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"golang.org/x/tools/godoc/util"
)
var (

1
weed/mq/broker/broker_grpc_pub.go

@ -183,4 +183,3 @@ func findClientAddress(ctx context.Context) string {
}
return pr.Addr.String()
}

120
weed/remote_storage/azure/azure_highlevel.go

@ -1,120 +0,0 @@
package azure
import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"github.com/Azure/azure-pipeline-go/pipeline"
. "github.com/Azure/azure-storage-blob-go/azblob"
"io"
"sync"
)
// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6
// uploadReaderAtToBlockBlob was not public
// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if readerSize <= BlockBlobMaxUploadBlobBytes {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
} else {
o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
}
}
if readerSize <= BlockBlobMaxUploadBlobBytes {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
if o.Progress != nil {
body = pipeline.NewRequestBodyProgress(body, o.Progress)
}
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
}
var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
progress := int64(0)
progressLock := &sync.Mutex{}
err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "uploadReaderAtToBlockBlob",
TransferSize: readerSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
blockNum := offset / o.BlockSize
if o.Progress != nil {
blockProgress := int64(0)
body = pipeline.NewRequestBodyProgress(body,
func(bytesTransferred int64) {
diff := bytesTransferred - blockProgress
blockProgress = bytesTransferred
progressLock.Lock() // 1 goroutine at a time gets a progress report
progress += diff
o.Progress(progress)
progressLock.Unlock()
})
}
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
// at the same time causing PutBlockList to get a mix of blocks from all the clients.
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
return err
},
})
if err != nil {
return nil, err
}
// All put blocks were successful, call Put Block List to finalize the blob
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
}
// The UUID reserved variants.
const (
reservedNCS byte = 0x80
reservedRFC4122 byte = 0x40
reservedMicrosoft byte = 0x20
reservedFuture byte = 0x00
)
type uuid [16]byte
// NewUUID returns a new uuid using RFC 4122 algorithm.
func newUUID() (u uuid) {
u = uuid{}
// Set all bits to randomly (or pseudo-randomly) chosen values.
rand.Read(u[:])
u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
var version byte = 4
u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
return
}
// String returns an unparsed version of the generated UUID sequence.
func (u uuid) String() string {
return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
}
func (u uuid) bytes() []byte {
return u[:]
}

283
weed/remote_storage/azure/azure_storage_client.go

@ -3,21 +3,58 @@ package azure
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"io"
"net/url"
"os"
"reflect"
"regexp"
"strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/seaweedfs/seaweedfs/weed/filer"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
defaultBlockSize = 4 * 1024 * 1024
defaultConcurrency = 16
)
// invalidMetadataChars matches any character that is not valid in Azure metadata keys.
// Azure metadata keys must be valid C# identifiers: letters, digits, and underscores only.
var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`)
// sanitizeMetadataKey converts an S3 metadata key to a valid Azure metadata key.
// Azure metadata keys must be valid C# identifiers (letters, digits, underscores only, cannot start with digit).
// To prevent collisions, invalid characters are replaced with their hex representation (_XX_).
// Examples:
// - "my-key" -> "my_2d_key"
// - "my.key" -> "my_2e_key"
// - "key@value" -> "key_40_value"
func sanitizeMetadataKey(key string) string {
// Replace each invalid character with _XX_ where XX is the hex code
result := invalidMetadataChars.ReplaceAllStringFunc(key, func(s string) string {
return fmt.Sprintf("_%02x_", s[0])
})
// Azure metadata keys cannot start with a digit
if len(result) > 0 && result[0] >= '0' && result[0] <= '9' {
result = "_" + result
}
return result
}
func init() {
remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker)
}
@ -42,25 +79,35 @@ func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storag
}
}
// Use your Storage account's name and key to create a credential object.
// Create credential and client
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err)
return nil, fmt.Errorf("invalid Azure credential with account name:%s: %w", accountName, err)
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 10, // Increased from default 3 to maintain resiliency similar to old SDK's 20
TryTimeout: time.Minute,
RetryDelay: 2 * time.Second,
MaxRetryDelay: time.Minute,
},
},
})
if err != nil {
return nil, fmt.Errorf("failed to create Azure client: %w", err)
}
// Create a request pipeline that is used to process HTTP(S) requests and responses.
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
// Create an ServiceURL object that wraps the service URL and a request pipeline.
u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
client.serviceURL = azblob.NewServiceURL(*u, p)
client.client = azClient
return client, nil
}
type azureRemoteStorageClient struct {
conf *remote_pb.RemoteConf
serviceURL azblob.ServiceURL
client *azblob.Client
}
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
@ -68,59 +115,74 @@ var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
containerClient := az.client.ServiceClient().NewContainerClient(loc.Bucket)
// List the container that we have created above
for marker := (azblob.Marker{}); marker.NotDone(); {
// Get a result segment starting with the blob indicated by the current Marker.
listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
Prefix: pathKey,
// List blobs with pager
pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &pathKey,
})
for pager.More() {
resp, err := pager.NextPage(context.Background())
if err != nil {
return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err)
return fmt.Errorf("azure traverse %s%s: %w", loc.Bucket, loc.Path, err)
}
// ListBlobs returns the start of the next segment; you MUST use this to get
// the next segment (after processing the current result segment).
marker = listBlob.NextMarker
// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
for _, blobInfo := range listBlob.Segment.BlobItems {
key := blobInfo.Name
key = "/" + key
for _, blobItem := range resp.Segment.BlobItems {
if blobItem.Name == nil {
continue
}
key := "/" + *blobItem.Name
dir, name := util.FullPath(key).DirAndName()
err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
RemoteMtime: blobInfo.Properties.LastModified.Unix(),
RemoteSize: *blobInfo.Properties.ContentLength,
RemoteETag: string(blobInfo.Properties.Etag),
remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
})
}
if blobItem.Properties != nil {
if blobItem.Properties.LastModified != nil {
remoteEntry.RemoteMtime = blobItem.Properties.LastModified.Unix()
}
if blobItem.Properties.ContentLength != nil {
remoteEntry.RemoteSize = *blobItem.Properties.ContentLength
}
if blobItem.Properties.ETag != nil {
remoteEntry.RemoteETag = string(*blobItem.Properties.ETag)
}
}
err = visitFn(dir, name, false, remoteEntry)
if err != nil {
return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err)
return fmt.Errorf("azure processing %s%s: %w", loc.Bucket, loc.Path, err)
}
}
}
return
}
func (az *azureRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
key := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
blobURL := containerURL.NewBlockBlobURL(key)
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if readErr != nil {
return nil, readErr
count := size
if count == 0 {
count = blob.CountToEnd
}
// NOTE: automatically retries are performed if the connection fails
bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
defer bodyStream.Close()
data, err = io.ReadAll(bodyStream)
downloadResp, err := blobClient.DownloadStream(context.Background(), &blob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: offset,
Count: count,
},
})
if err != nil {
return nil, fmt.Errorf("failed to download file %s%s: %w", loc.Bucket, loc.Path, err)
}
defer downloadResp.Body.Close()
data, err = io.ReadAll(downloadResp.Body)
if err != nil {
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
return nil, fmt.Errorf("failed to read download stream %s%s: %w", loc.Bucket, loc.Path, err)
}
return
@ -137,23 +199,23 @@ func (az *azureRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorage
func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
blobURL := containerURL.NewBlockBlobURL(key)
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
readerAt, ok := reader.(io.ReaderAt)
if !ok {
return nil, fmt.Errorf("unexpected reader: readerAt expected")
// Upload from reader
metadata := toMetadata(entry.Extended)
httpHeaders := &blob.HTTPHeaders{}
if entry.Attributes != nil && entry.Attributes.Mime != "" {
httpHeaders.BlobContentType = &entry.Attributes.Mime
}
fileSize := int64(filer.FileSize(entry))
_, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: entry.Attributes.Mime},
Metadata: toMetadata(entry.Extended),
Parallelism: 16,
_, err = blobClient.UploadStream(context.Background(), reader, &blockblob.UploadStreamOptions{
BlockSize: defaultBlockSize,
Concurrency: defaultConcurrency,
HTTPHeaders: httpHeaders,
Metadata: metadata,
})
if err != nil {
return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err)
return nil, fmt.Errorf("azure upload to %s%s: %w", loc.Bucket, loc.Path, err)
}
// read back the remote entry
@ -162,36 +224,45 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati
func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
key := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
blobURL := containerURL.NewBlockBlobURL(key)
attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
props, err := blobClient.GetProperties(context.Background(), nil)
if err != nil {
return nil, err
}
return &filer_pb.RemoteEntry{
RemoteMtime: attr.LastModified().Unix(),
RemoteSize: attr.ContentLength(),
RemoteETag: string(attr.ETag()),
remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
}, nil
}
if props.LastModified != nil {
remoteEntry.RemoteMtime = props.LastModified.Unix()
}
if props.ContentLength != nil {
remoteEntry.RemoteSize = *props.ContentLength
}
if props.ETag != nil {
remoteEntry.RemoteETag = string(*props.ETag)
}
return remoteEntry, nil
}
func toMetadata(attributes map[string][]byte) map[string]string {
metadata := make(map[string]string)
func toMetadata(attributes map[string][]byte) map[string]*string {
metadata := make(map[string]*string)
for k, v := range attributes {
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
metadata[k[len(s3_constants.AmzUserMetaPrefix):]] = string(v)
}
// S3 stores metadata keys in lowercase; normalize for consistency.
key := strings.ToLower(k[len(s3_constants.AmzUserMetaPrefix):])
// Sanitize key to prevent collisions and ensure Azure compliance
key = sanitizeMetadataKey(key)
val := string(v)
metadata[key] = &val
}
parsed_metadata := make(map[string]string)
for k, v := range metadata {
parsed_metadata[strings.Replace(k, "-", "_", -1)] = v
}
return parsed_metadata
return metadata
}
func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
@ -201,54 +272,68 @@ func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStor
metadata := toMetadata(newEntry.Extended)
key := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key)
_, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
_, err = blobClient.SetMetadata(context.Background(), metadata, nil)
return
}
func (az *azureRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
key := loc.Path[1:]
containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
if _, err = containerURL.NewBlobURL(key).Delete(context.Background(),
azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err)
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key)
_, err = blobClient.Delete(context.Background(), &blob.DeleteOptions{
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
})
if err != nil {
// Make delete idempotent - don't return error if blob doesn't exist
if bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil
}
return fmt.Errorf("azure delete %s%s: %w", loc.Bucket, loc.Path, err)
}
return
}
func (az *azureRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
ctx := context.Background()
for containerMarker := (azblob.Marker{}); containerMarker.NotDone(); {
listContainer, err := az.serviceURL.ListContainersSegment(ctx, containerMarker, azblob.ListContainersSegmentOptions{})
if err == nil {
for _, v := range listContainer.ContainerItems {
buckets = append(buckets, &remote_storage.Bucket{
Name: v.Name,
CreatedAt: v.Properties.LastModified,
})
}
} else {
pager := az.client.NewListContainersPager(nil)
for pager.More() {
resp, err := pager.NextPage(context.Background())
if err != nil {
return buckets, err
}
containerMarker = listContainer.NextMarker
for _, containerItem := range resp.ContainerItems {
if containerItem.Name != nil {
bucket := &remote_storage.Bucket{
Name: *containerItem.Name,
}
if containerItem.Properties != nil && containerItem.Properties.LastModified != nil {
bucket.CreatedAt = *containerItem.Properties.LastModified
}
buckets = append(buckets, bucket)
}
}
}
return
}
func (az *azureRemoteStorageClient) CreateBucket(name string) (err error) {
containerURL := az.serviceURL.NewContainerURL(name)
if _, err = containerURL.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessNone); err != nil {
return fmt.Errorf("create bucket %s: %v", name, err)
containerClient := az.client.ServiceClient().NewContainerClient(name)
_, err = containerClient.Create(context.Background(), nil)
if err != nil {
return fmt.Errorf("create bucket %s: %w", name, err)
}
return
}
func (az *azureRemoteStorageClient) DeleteBucket(name string) (err error) {
containerURL := az.serviceURL.NewContainerURL(name)
if _, err = containerURL.Delete(context.Background(), azblob.ContainerAccessConditions{}); err != nil {
return fmt.Errorf("delete bucket %s: %v", name, err)
containerClient := az.client.ServiceClient().NewContainerClient(name)
_, err = containerClient.Delete(context.Background(), nil)
if err != nil {
return fmt.Errorf("delete bucket %s: %w", name, err)
}
return
}

377
weed/remote_storage/azure/azure_storage_client_test.go

@ -0,0 +1,377 @@
package azure
import (
"bytes"
"fmt"
"os"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// TestAzureStorageClientBasic tests basic Azure storage client operations
func TestAzureStorageClientBasic(t *testing.T) {
// Skip if credentials not available
accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
testContainer := os.Getenv("AZURE_TEST_CONTAINER")
if accountName == "" || accountKey == "" {
t.Skip("Skipping Azure storage test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
}
if testContainer == "" {
testContainer = "seaweedfs-test"
}
// Create client
maker := azureRemoteStorageMaker{}
conf := &remote_pb.RemoteConf{
Name: "test-azure",
AzureAccountName: accountName,
AzureAccountKey: accountKey,
}
client, err := maker.Make(conf)
if err != nil {
t.Fatalf("Failed to create Azure client: %v", err)
}
azClient := client.(*azureRemoteStorageClient)
// Test 1: Create bucket/container
t.Run("CreateBucket", func(t *testing.T) {
err := azClient.CreateBucket(testContainer)
// Ignore error if bucket already exists
if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
t.Fatalf("Failed to create bucket: %v", err)
}
})
// Test 2: List buckets
t.Run("ListBuckets", func(t *testing.T) {
buckets, err := azClient.ListBuckets()
if err != nil {
t.Fatalf("Failed to list buckets: %v", err)
}
if len(buckets) == 0 {
t.Log("No buckets found (might be expected)")
} else {
t.Logf("Found %d buckets", len(buckets))
}
})
// Test 3: Write file
testContent := []byte("Hello from SeaweedFS Azure SDK migration test!")
testKey := fmt.Sprintf("/test-file-%d.txt", time.Now().Unix())
loc := &remote_pb.RemoteStorageLocation{
Name: "test-azure",
Bucket: testContainer,
Path: testKey,
}
t.Run("WriteFile", func(t *testing.T) {
entry := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Mime: "text/plain",
},
Extended: map[string][]byte{
"x-amz-meta-test-key": []byte("test-value"),
},
}
reader := bytes.NewReader(testContent)
remoteEntry, err := azClient.WriteFile(loc, entry, reader)
if err != nil {
t.Fatalf("Failed to write file: %v", err)
}
if remoteEntry == nil {
t.Fatal("Remote entry is nil")
}
if remoteEntry.RemoteSize != int64(len(testContent)) {
t.Errorf("Expected size %d, got %d", len(testContent), remoteEntry.RemoteSize)
}
})
// Test 4: Read file
t.Run("ReadFile", func(t *testing.T) {
data, err := azClient.ReadFile(loc, 0, int64(len(testContent)))
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if !bytes.Equal(data, testContent) {
t.Errorf("Content mismatch. Expected: %s, Got: %s", testContent, data)
}
})
// Test 5: Read partial file
t.Run("ReadPartialFile", func(t *testing.T) {
data, err := azClient.ReadFile(loc, 0, 5)
if err != nil {
t.Fatalf("Failed to read partial file: %v", err)
}
expected := testContent[:5]
if !bytes.Equal(data, expected) {
t.Errorf("Content mismatch. Expected: %s, Got: %s", expected, data)
}
})
// Test 6: Update metadata
t.Run("UpdateMetadata", func(t *testing.T) {
oldEntry := &filer_pb.Entry{
Extended: map[string][]byte{
"x-amz-meta-test-key": []byte("test-value"),
},
}
newEntry := &filer_pb.Entry{
Extended: map[string][]byte{
"x-amz-meta-test-key": []byte("test-value"),
"x-amz-meta-new-key": []byte("new-value"),
},
}
err := azClient.UpdateFileMetadata(loc, oldEntry, newEntry)
if err != nil {
t.Fatalf("Failed to update metadata: %v", err)
}
})
// Test 7: Traverse (list objects)
t.Run("Traverse", func(t *testing.T) {
foundFile := false
err := azClient.Traverse(loc, func(dir string, name string, isDir bool, remoteEntry *filer_pb.RemoteEntry) error {
if !isDir && name == testKey[1:] { // Remove leading slash
foundFile = true
}
return nil
})
if err != nil {
t.Fatalf("Failed to traverse: %v", err)
}
if !foundFile {
t.Log("Test file not found in traverse (might be expected due to path matching)")
}
})
// Test 8: Delete file
t.Run("DeleteFile", func(t *testing.T) {
err := azClient.DeleteFile(loc)
if err != nil {
t.Fatalf("Failed to delete file: %v", err)
}
})
// Test 9: Verify file deleted (should fail)
t.Run("VerifyDeleted", func(t *testing.T) {
_, err := azClient.ReadFile(loc, 0, 10)
if !bloberror.HasCode(err, bloberror.BlobNotFound) {
t.Errorf("Expected BlobNotFound error, but got: %v", err)
}
})
// Clean up: Try to delete the test container
// Comment out if you want to keep the container
/*
t.Run("DeleteBucket", func(t *testing.T) {
err := azClient.DeleteBucket(testContainer)
if err != nil {
t.Logf("Warning: Failed to delete bucket: %v", err)
}
})
*/
}
// TestToMetadata tests the metadata conversion function
func TestToMetadata(t *testing.T) {
tests := []struct {
name string
input map[string][]byte
expected map[string]*string
}{
{
name: "basic metadata",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "key1": []byte("value1"),
s3_constants.AmzUserMetaPrefix + "key2": []byte("value2"),
},
expected: map[string]*string{
"key1": stringPtr("value1"),
"key2": stringPtr("value2"),
},
},
{
name: "metadata with dashes",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "content-type": []byte("text/plain"),
},
expected: map[string]*string{
"content_2d_type": stringPtr("text/plain"), // dash (0x2d) -> _2d_
},
},
{
name: "non-metadata keys ignored",
input: map[string][]byte{
"some-other-key": []byte("ignored"),
s3_constants.AmzUserMetaPrefix + "included": []byte("included"),
},
expected: map[string]*string{
"included": stringPtr("included"),
},
},
{
name: "keys starting with digits",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "123key": []byte("value1"),
s3_constants.AmzUserMetaPrefix + "456-test": []byte("value2"),
s3_constants.AmzUserMetaPrefix + "789": []byte("value3"),
},
expected: map[string]*string{
"_123key": stringPtr("value1"), // starts with digit -> prefix _
"_456_2d_test": stringPtr("value2"), // starts with digit AND has dash
"_789": stringPtr("value3"),
},
},
{
name: "uppercase and mixed case keys",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "My-Key": []byte("value1"),
s3_constants.AmzUserMetaPrefix + "UPPERCASE": []byte("value2"),
s3_constants.AmzUserMetaPrefix + "MiXeD-CaSe": []byte("value3"),
},
expected: map[string]*string{
"my_2d_key": stringPtr("value1"), // lowercase + dash -> _2d_
"uppercase": stringPtr("value2"),
"mixed_2d_case": stringPtr("value3"),
},
},
{
name: "keys with invalid characters",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "my.key": []byte("value1"),
s3_constants.AmzUserMetaPrefix + "key+plus": []byte("value2"),
s3_constants.AmzUserMetaPrefix + "key@symbol": []byte("value3"),
s3_constants.AmzUserMetaPrefix + "key-with.": []byte("value4"),
s3_constants.AmzUserMetaPrefix + "key/slash": []byte("value5"),
},
expected: map[string]*string{
"my_2e_key": stringPtr("value1"), // dot (0x2e) -> _2e_
"key_2b_plus": stringPtr("value2"), // plus (0x2b) -> _2b_
"key_40_symbol": stringPtr("value3"), // @ (0x40) -> _40_
"key_2d_with_2e_": stringPtr("value4"), // dash and dot
"key_2f_slash": stringPtr("value5"), // slash (0x2f) -> _2f_
},
},
{
name: "collision prevention",
input: map[string][]byte{
s3_constants.AmzUserMetaPrefix + "my-key": []byte("value1"),
s3_constants.AmzUserMetaPrefix + "my.key": []byte("value2"),
s3_constants.AmzUserMetaPrefix + "my_key": []byte("value3"),
},
expected: map[string]*string{
"my_2d_key": stringPtr("value1"), // dash (0x2d)
"my_2e_key": stringPtr("value2"), // dot (0x2e)
"my_key": stringPtr("value3"), // underscore is valid, no encoding
},
},
{
name: "empty input",
input: map[string][]byte{},
expected: map[string]*string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := toMetadata(tt.input)
if len(result) != len(tt.expected) {
t.Errorf("Expected %d keys, got %d", len(tt.expected), len(result))
}
for key, expectedVal := range tt.expected {
if resultVal, ok := result[key]; !ok {
t.Errorf("Expected key %s not found", key)
} else if resultVal == nil || expectedVal == nil {
if resultVal != expectedVal {
t.Errorf("For key %s: expected %v, got %v", key, expectedVal, resultVal)
}
} else if *resultVal != *expectedVal {
t.Errorf("For key %s: expected %s, got %s", key, *expectedVal, *resultVal)
}
}
})
}
}
func contains(s, substr string) bool {
return bytes.Contains([]byte(s), []byte(substr))
}
func stringPtr(s string) *string {
return &s
}
// Benchmark tests
func BenchmarkToMetadata(b *testing.B) {
input := map[string][]byte{
"x-amz-meta-key1": []byte("value1"),
"x-amz-meta-key2": []byte("value2"),
"x-amz-meta-content-type": []byte("text/plain"),
"other-key": []byte("ignored"),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
toMetadata(input)
}
}
// Test that the maker implements the interface
func TestAzureRemoteStorageMaker(t *testing.T) {
maker := azureRemoteStorageMaker{}
if !maker.HasBucket() {
t.Error("Expected HasBucket() to return true")
}
// Test with missing credentials
conf := &remote_pb.RemoteConf{
Name: "test",
}
_, err := maker.Make(conf)
if err == nil {
t.Error("Expected error with missing credentials")
}
}
// Test error cases
func TestAzureStorageClientErrors(t *testing.T) {
// Test with invalid credentials
maker := azureRemoteStorageMaker{}
conf := &remote_pb.RemoteConf{
Name: "test",
AzureAccountName: "invalid",
AzureAccountKey: "aW52YWxpZGtleQ==", // base64 encoded "invalidkey"
}
client, err := maker.Make(conf)
if err != nil {
t.Skip("Invalid credentials correctly rejected at client creation")
}
// If client creation succeeded, operations should fail
azClient := client.(*azureRemoteStorageClient)
loc := &remote_pb.RemoteStorageLocation{
Name: "test",
Bucket: "nonexistent",
Path: "/test.txt",
}
// These operations should fail with invalid credentials
_, err = azClient.ReadFile(loc, 0, 10)
if err == nil {
t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)")
}
}

92
weed/replication/sink/azuresink/azure_sink.go

@ -3,24 +3,31 @@ package azuresink
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"net/http"
"net/url"
"strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type AzureSink struct {
containerURL azblob.ContainerURL
client *azblob.Client
container string
dir string
filerSource *source.FilerSource
@ -61,20 +68,28 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
g.container = container
g.dir = dir
// Use your Storage account's name and key to create a credential object.
// Create credential and client
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
glog.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err)
return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err)
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 10, // Increased from default 3 for replication sink resiliency
TryTimeout: time.Minute,
RetryDelay: 2 * time.Second,
MaxRetryDelay: time.Minute,
},
},
})
if err != nil {
return fmt.Errorf("failed to create Azure client: %w", err)
}
// Create a request pipeline that is used to process HTTP(S) requests and responses.
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
// Create an ServiceURL object that wraps the service URL and a request pipeline.
u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
serviceURL := azblob.NewServiceURL(*u, p)
g.containerURL = serviceURL.NewContainerURL(g.container)
g.client = client
return nil
}
@ -87,13 +102,19 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo
key = key + "/"
}
if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(),
azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
_, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
})
if err != nil {
// Make delete idempotent - don't return error if blob doesn't exist
if bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil
}
return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err)
}
return nil
}
func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
@ -107,26 +128,38 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
// Create a URL that references a to-be-created blob in your
// Azure Storage account's container.
appendBlobURL := g.containerURL.NewAppendBlobURL(key)
// Create append blob client
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
accessCondition := azblob.BlobAccessConditions{}
// Create blob with access conditions
accessConditions := &blob.AccessConditions{}
if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
accessCondition.ModifiedAccessConditions.IfUnmodifiedSince = time.Unix(entry.Attributes.Mtime, 0)
modifiedTime := time.Unix(entry.Attributes.Mtime, 0)
accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{
IfUnmodifiedSince: &modifiedTime,
}
}
res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
if res != nil && res.StatusCode() == http.StatusPreconditionFailed {
glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err)
_, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{
AccessConditions: accessConditions,
})
if err != nil {
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
// Blob already exists, which is fine for an append blob - we can append to it
} else {
// Check if this is a precondition failed error (HTTP 412)
var respErr *azcore.ResponseError
if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed {
glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key)
return nil
}
if err != nil {
return err
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
}
}
writeFunc := func(data []byte) error {
_, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{})
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
@ -139,7 +172,6 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
}
return nil
}
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {

355
weed/replication/sink/azuresink/azure_sink_test.go

@ -0,0 +1,355 @@
package azuresink
import (
"os"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// MockConfiguration for testing
type mockConfiguration struct {
values map[string]interface{}
}
func newMockConfiguration() *mockConfiguration {
return &mockConfiguration{
values: make(map[string]interface{}),
}
}
func (m *mockConfiguration) GetString(key string) string {
if v, ok := m.values[key]; ok {
return v.(string)
}
return ""
}
func (m *mockConfiguration) GetBool(key string) bool {
if v, ok := m.values[key]; ok {
return v.(bool)
}
return false
}
func (m *mockConfiguration) GetInt(key string) int {
if v, ok := m.values[key]; ok {
return v.(int)
}
return 0
}
func (m *mockConfiguration) GetInt64(key string) int64 {
if v, ok := m.values[key]; ok {
return v.(int64)
}
return 0
}
func (m *mockConfiguration) GetFloat64(key string) float64 {
if v, ok := m.values[key]; ok {
return v.(float64)
}
return 0.0
}
func (m *mockConfiguration) GetStringSlice(key string) []string {
if v, ok := m.values[key]; ok {
return v.([]string)
}
return nil
}
func (m *mockConfiguration) SetDefault(key string, value interface{}) {
if _, exists := m.values[key]; !exists {
m.values[key] = value
}
}
// Test the AzureSink interface implementation
func TestAzureSinkInterface(t *testing.T) {
sink := &AzureSink{}
if sink.GetName() != "azure" {
t.Errorf("Expected name 'azure', got '%s'", sink.GetName())
}
// Test directory setting
sink.dir = "/test/dir"
if sink.GetSinkToDirectory() != "/test/dir" {
t.Errorf("Expected directory '/test/dir', got '%s'", sink.GetSinkToDirectory())
}
// Test incremental setting
sink.isIncremental = true
if !sink.IsIncremental() {
t.Error("Expected isIncremental to be true")
}
}
// Test Azure sink initialization
func TestAzureSinkInitialization(t *testing.T) {
accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
testContainer := os.Getenv("AZURE_TEST_CONTAINER")
if accountName == "" || accountKey == "" {
t.Skip("Skipping Azure sink test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
}
if testContainer == "" {
testContainer = "seaweedfs-test"
}
sink := &AzureSink{}
err := sink.initialize(accountName, accountKey, testContainer, "/test")
if err != nil {
t.Fatalf("Failed to initialize Azure sink: %v", err)
}
if sink.container != testContainer {
t.Errorf("Expected container '%s', got '%s'", testContainer, sink.container)
}
if sink.dir != "/test" {
t.Errorf("Expected dir '/test', got '%s'", sink.dir)
}
if sink.client == nil {
t.Error("Expected client to be initialized")
}
}
// Test configuration-based initialization
func TestAzureSinkInitializeFromConfig(t *testing.T) {
accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
testContainer := os.Getenv("AZURE_TEST_CONTAINER")
if accountName == "" || accountKey == "" {
t.Skip("Skipping Azure sink config test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
}
if testContainer == "" {
testContainer = "seaweedfs-test"
}
config := newMockConfiguration()
config.values["azure.account_name"] = accountName
config.values["azure.account_key"] = accountKey
config.values["azure.container"] = testContainer
config.values["azure.directory"] = "/test"
config.values["azure.is_incremental"] = true
sink := &AzureSink{}
err := sink.Initialize(config, "azure.")
if err != nil {
t.Fatalf("Failed to initialize from config: %v", err)
}
if !sink.IsIncremental() {
t.Error("Expected incremental to be true")
}
}
// Test cleanKey function
func TestCleanKey(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"/test/file.txt", "test/file.txt"},
{"test/file.txt", "test/file.txt"},
{"/", ""},
{"", ""},
{"/a/b/c", "a/b/c"},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
result := cleanKey(tt.input)
if result != tt.expected {
t.Errorf("cleanKey(%q) = %q, want %q", tt.input, result, tt.expected)
}
})
}
}
// Test entry operations (requires valid credentials)
func TestAzureSinkEntryOperations(t *testing.T) {
accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
testContainer := os.Getenv("AZURE_TEST_CONTAINER")
if accountName == "" || accountKey == "" {
t.Skip("Skipping Azure sink entry test: credentials not set")
}
if testContainer == "" {
testContainer = "seaweedfs-test"
}
sink := &AzureSink{}
err := sink.initialize(accountName, accountKey, testContainer, "/test")
if err != nil {
t.Fatalf("Failed to initialize: %v", err)
}
// Test CreateEntry with directory (should be no-op)
t.Run("CreateDirectory", func(t *testing.T) {
entry := &filer_pb.Entry{
IsDirectory: true,
}
err := sink.CreateEntry("/test/dir", entry, nil)
if err != nil {
t.Errorf("CreateEntry for directory should not error: %v", err)
}
})
// Test CreateEntry with file
testKey := "/test-sink-file-" + time.Now().Format("20060102-150405") + ".txt"
t.Run("CreateFile", func(t *testing.T) {
entry := &filer_pb.Entry{
IsDirectory: false,
Content: []byte("Test content for Azure sink"),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
},
}
err := sink.CreateEntry(testKey, entry, nil)
if err != nil {
t.Fatalf("Failed to create entry: %v", err)
}
})
// Test UpdateEntry
t.Run("UpdateEntry", func(t *testing.T) {
oldEntry := &filer_pb.Entry{
Content: []byte("Old content"),
}
newEntry := &filer_pb.Entry{
Content: []byte("New content for update test"),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
},
}
found, err := sink.UpdateEntry(testKey, oldEntry, "/test", newEntry, false, nil)
if err != nil {
t.Fatalf("Failed to update entry: %v", err)
}
if !found {
t.Error("Expected found to be true")
}
})
// Test DeleteEntry
t.Run("DeleteFile", func(t *testing.T) {
err := sink.DeleteEntry(testKey, false, false, nil)
if err != nil {
t.Fatalf("Failed to delete entry: %v", err)
}
})
// Test DeleteEntry with directory marker
testDirKey := "/test-dir-" + time.Now().Format("20060102-150405")
t.Run("DeleteDirectory", func(t *testing.T) {
// First create a directory marker
entry := &filer_pb.Entry{
IsDirectory: false,
Content: []byte(""),
}
err := sink.CreateEntry(testDirKey+"/", entry, nil)
if err != nil {
t.Logf("Warning: Failed to create directory marker: %v", err)
}
// Then delete it
err = sink.DeleteEntry(testDirKey, true, false, nil)
if err != nil {
t.Logf("Warning: Failed to delete directory: %v", err)
}
})
}
// Test CreateEntry with precondition (IfUnmodifiedSince)
func TestAzureSinkPrecondition(t *testing.T) {
accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
testContainer := os.Getenv("AZURE_TEST_CONTAINER")
if accountName == "" || accountKey == "" {
t.Skip("Skipping Azure sink precondition test: credentials not set")
}
if testContainer == "" {
testContainer = "seaweedfs-test"
}
sink := &AzureSink{}
err := sink.initialize(accountName, accountKey, testContainer, "/test")
if err != nil {
t.Fatalf("Failed to initialize: %v", err)
}
testKey := "/test-precondition-" + time.Now().Format("20060102-150405") + ".txt"
// Create initial entry
entry := &filer_pb.Entry{
Content: []byte("Initial content"),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
},
}
err = sink.CreateEntry(testKey, entry, nil)
if err != nil {
t.Fatalf("Failed to create initial entry: %v", err)
}
// Try to create again with old mtime (should be skipped due to precondition)
oldEntry := &filer_pb.Entry{
Content: []byte("Should not overwrite"),
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Old timestamp
},
}
err = sink.CreateEntry(testKey, oldEntry, nil)
// Should either succeed (skip) or fail with precondition error
if err != nil {
t.Logf("Create with old mtime: %v (expected)", err)
}
// Clean up
sink.DeleteEntry(testKey, false, false, nil)
}
// Benchmark tests
func BenchmarkCleanKey(b *testing.B) {
keys := []string{
"/simple/path.txt",
"no/leading/slash.txt",
"/",
"/complex/path/with/many/segments/file.txt",
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cleanKey(keys[i%len(keys)])
}
}
// Test error handling with invalid credentials
func TestAzureSinkInvalidCredentials(t *testing.T) {
sink := &AzureSink{}
err := sink.initialize("invalid-account", "aW52YWxpZGtleQ==", "test-container", "/test")
if err != nil {
t.Skip("Invalid credentials correctly rejected at initialization")
}
// If initialization succeeded, operations should fail
entry := &filer_pb.Entry{
Content: []byte("test"),
}
err = sink.CreateEntry("/test.txt", entry, nil)
if err == nil {
t.Log("Expected error with invalid credentials, but got none (might be cached)")
}
}

2
weed/s3api/policy_engine/types.go

@ -407,8 +407,6 @@ func (cs *CompiledStatement) EvaluateStatement(args *PolicyEvaluationArgs) bool
return false
}
return true
}

6
weed/s3api/s3api_object_handlers_put.go

@ -21,6 +21,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
// Object lock validation errors
@ -380,6 +381,11 @@ func setEtag(w http.ResponseWriter, etag string) {
func filerErrorToS3Error(errString string) s3err.ErrorCode {
switch {
case errString == constants.ErrMsgBadDigest:
return s3err.ErrBadDigest
case strings.Contains(errString, "context canceled") || strings.Contains(errString, "code = Canceled"):
// Client canceled the request, return client error not server error
return s3err.ErrInvalidRequest
case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"):
return s3err.ErrExistingObjectIsDirectory
case strings.HasSuffix(errString, "is a file"):

56
weed/s3api/s3api_object_handlers_put_test.go

@ -0,0 +1,56 @@
package s3api
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
func TestFilerErrorToS3Error(t *testing.T) {
tests := []struct {
name string
errString string
expectedErr s3err.ErrorCode
}{
{
name: "MD5 mismatch error",
errString: constants.ErrMsgBadDigest,
expectedErr: s3err.ErrBadDigest,
},
{
name: "Context canceled error",
errString: "rpc error: code = Canceled desc = context canceled",
expectedErr: s3err.ErrInvalidRequest,
},
{
name: "Context canceled error (simple)",
errString: "context canceled",
expectedErr: s3err.ErrInvalidRequest,
},
{
name: "Directory exists error",
errString: "existing /path/to/file is a directory",
expectedErr: s3err.ErrExistingObjectIsDirectory,
},
{
name: "File exists error",
errString: "/path/to/file is a file",
expectedErr: s3err.ErrExistingObjectIsFile,
},
{
name: "Unknown error",
errString: "some random error",
expectedErr: s3err.ErrInternalError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := filerErrorToS3Error(tt.errString)
if result != tt.expectedErr {
t.Errorf("filerErrorToS3Error(%q) = %v, want %v", tt.errString, result, tt.expectedErr)
}
})
}
}

4
weed/s3api/s3err/s3-error.go

@ -1,5 +1,7 @@
package s3err
import "github.com/seaweedfs/seaweedfs/weed/util/constants"
/*
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
* Copyright 2015-2017 MinIO, Inc.
@ -21,7 +23,7 @@ package s3err
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
var s3ErrorResponseMap = map[string]string{
"AccessDenied": "Access Denied.",
"BadDigest": "The Content-Md5 you specified did not match what we received.",
"BadDigest": constants.ErrMsgBadDigest,
"EntityTooSmall": "Your proposed upload is smaller than the minimum allowed object size.",
"EntityTooLarge": "Your proposed upload exceeds the maximum allowed object size.",
"IncompleteBody": "You did not provide the number of bytes specified by the Content-Length HTTP header.",

8
weed/s3api/s3err/s3api_errors.go

@ -4,6 +4,8 @@ import (
"encoding/xml"
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
// APIError structure
@ -59,6 +61,7 @@ const (
ErrInvalidBucketName
ErrInvalidBucketState
ErrInvalidDigest
ErrBadDigest
ErrInvalidMaxKeys
ErrInvalidMaxUploads
ErrInvalidMaxParts
@ -187,6 +190,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "The Content-Md5 you specified is not valid.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrBadDigest: {
Code: "BadDigest",
Description: constants.ErrMsgBadDigest,
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidMaxUploads: {
Code: "InvalidArgument",
Description: "Argument max-uploads must be an integer between 0 and 2147483647",

5
weed/server/filer_server_handlers_write.go

@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
@ -168,7 +169,7 @@ func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.
return
} else if wormEnforced {
// you cannot move a worm file or directory
err = fmt.Errorf("cannot move write-once entry from '%s' to '%s': operation not permitted", src, dst)
err = fmt.Errorf("cannot move write-once entry from '%s' to '%s': %s", src, dst, constants.ErrMsgOperationNotPermitted)
writeJsonError(w, r, http.StatusForbidden, err)
return
}
@ -228,7 +229,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if wormEnforced {
writeJsonError(w, r, http.StatusForbidden, errors.New("operation not permitted"))
writeJsonError(w, r, http.StatusForbidden, errors.New(constants.ErrMsgOperationNotPermitted))
return
}

19
weed/server/filer_server_handlers_write_autochunk.go

@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
@ -50,13 +51,17 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
if err != nil {
if err.Error() == "operation not permitted" {
errStr := err.Error()
switch {
case errStr == constants.ErrMsgOperationNotPermitted:
writeJsonError(w, r, http.StatusForbidden, err)
} else if strings.HasPrefix(err.Error(), "read input:") || err.Error() == io.ErrUnexpectedEOF.Error() {
case strings.HasPrefix(errStr, "read input:") || errStr == io.ErrUnexpectedEOF.Error():
writeJsonError(w, r, util.HttpStatusCancelled, err)
} else if strings.HasSuffix(err.Error(), "is a file") || strings.HasSuffix(err.Error(), "already exists") {
case strings.HasSuffix(errStr, "is a file") || strings.HasSuffix(errStr, "already exists"):
writeJsonError(w, r, http.StatusConflict, err)
} else {
case errStr == constants.ErrMsgBadDigest:
writeJsonError(w, r, http.StatusBadRequest, err)
default:
writeJsonError(w, r, http.StatusInternalServerError, err)
}
} else if reply != nil {
@ -110,7 +115,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
return nil, nil, errors.New(constants.ErrMsgBadDigest)
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
@ -142,7 +147,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
return nil, nil, errors.New(constants.ErrMsgBadDigest)
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
@ -171,7 +176,7 @@ func (fs *FilerServer) checkPermissions(ctx context.Context, r *http.Request, fi
return err
} else if enforced {
// you cannot change a worm file
return errors.New("operation not permitted")
return errors.New(constants.ErrMsgOperationNotPermitted)
}
return nil

28
weed/shell/command_volume_check_disk.go

@ -183,11 +183,34 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
aHasChanges, bHasChanges := true, true
for aHasChanges || bHasChanges {
const maxIterations = 5
iteration := 0
for (aHasChanges || bHasChanges) && iteration < maxIterations {
iteration++
if verbose {
fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id)
}
prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges
if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
return err
}
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration)
return fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
if iteration >= maxIterations && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
return nil
}
@ -307,11 +330,10 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
for _, deleteResult := range deleteResults {
if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
hasChanges = true
return
}
}
}
return
return hasChanges, nil
}
func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {

7
weed/util/constants/filer.go

@ -0,0 +1,7 @@
package constants
// Filer error messages
const (
ErrMsgOperationNotPermitted = "operation not permitted"
ErrMsgBadDigest = "The Content-Md5 you specified did not match what we received."
)
Loading…
Cancel
Save