diff --git a/go.mod b/go.mod index b0dd04ae9..ff1a92f7e 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,6 @@ require ( cloud.google.com/go v0.121.6 // indirect cloud.google.com/go/pubsub v1.50.1 cloud.google.com/go/storage v1.56.2 - github.com/Azure/azure-pipeline-go v0.2.3 - github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Shopify/sarama v1.38.1 github.com/aws/aws-sdk-go v1.55.8 github.com/beorn7/perks v1.0.1 // indirect @@ -57,7 +55,6 @@ require ( github.com/kurin/blazer v0.5.3 github.com/linxGnu/grocksdb v1.10.2 github.com/mailru/easyjson v0.7.7 // indirect - github.com/mattn/go-ieproxy v0.0.11 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -232,7 +229,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 // indirect github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0 // indirect diff --git a/go.sum b/go.sum index ed5920fa8..23c4743d8 100644 --- a/go.sum +++ b/go.sum @@ -541,8 +541,6 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= -github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= -github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 h1:5YTBM8QDVIBN3sxBil89WfdAAqDZbyJTgh688DSxX5w= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1/go.mod h1:YD5h/ldMsG0XiIw7PdyNhLxaM317eFh5yNLccNfGdyw= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 h1:wL5IEG5zb7BVv1Kv0Xm92orq+5hB5Nipn3B5tn4Rqfk= @@ -561,20 +559,7 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 h1:FwladfywkNirM+FZY github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2/go.mod h1:vv5Ad0RrIoT1lJFdWBZwt4mB1+j+V8DUroixmKDTCdk= github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 h1:l3SabZmNuXCMCbQUIeR4W6/N4j8SeH/lwX+a6leZhHo= github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2/go.mod h1:k+mEZ4f1pVqZTRqtSDW2AhZ/3wT5qLpsUA75C/k7dtE= -github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk= -github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= -github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= -github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= -github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q= -github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= -github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= -github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= -github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= -github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= -github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= -github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= -github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= @@ -929,8 +914,6 @@ github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= -github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -1398,9 +1381,6 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= -github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= -github.com/mattn/go-ieproxy v0.0.11 h1:MQ/5BuGSgDAHZOJe6YY80IF2UVCfGkwfo6AeD7HtHYo= -github.com/mattn/go-ieproxy v0.0.11/go.mod h1:/NsJd+kxZBmjMc5hrJCKMbP57B84rvq9BiDRbtO9AS0= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= @@ -1920,8 +1900,6 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -2023,7 +2001,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -2048,7 +2025,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -2152,7 +2128,6 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/test/s3/multipart/aws_upload.go b/test/s3/multipart/aws_upload.go index 0553bd403..fbb1cb879 100644 --- a/test/s3/multipart/aws_upload.go +++ b/test/s3/multipart/aws_upload.go @@ -108,7 +108,6 @@ func main() { fmt.Printf("part %d: %v\n", i, part) } - completeResponse, err := completeMultipartUpload(svc, resp, completedParts) if err != nil { fmt.Println(err.Error()) diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 52368f8cd..a9bb1fe16 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -26,15 +26,15 @@ var ( This is to change replication factor in .dat file header. Need to shut down the volume servers that has those volumes. -1. fix the .dat file in place - // just see the replication setting - go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads - Current Volume Replication: 000 - // fix the replication setting - go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads -replication 001 - Current Volume Replication: 000 - Changing to: 001 - Done. + 1. fix the .dat file in place + // just see the replication setting + go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads + Current Volume Replication: 000 + // fix the replication setting + go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads -replication 001 + Current Volume Replication: 000 + Changing to: 001 + Done. 2. copy the fixed .dat and related .idx files to some remote server 3. restart volume servers or start new volume servers. @@ -42,7 +42,7 @@ that has those volumes. func main() { flag.Parse() util_http.NewGlobalHttpClient() - + fileName := strconv.Itoa(*fixVolumeId) if *fixVolumeCollection != "" { fileName = *fixVolumeCollection + "_" + fileName diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index e289fefe8..b4ceeb58c 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -19,8 +19,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "google.golang.org/grpc" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/grpc" ) var ( @@ -31,18 +31,18 @@ var ( ) /* - Diff the volume's files across multiple volume servers. - diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 +Diff the volume's files across multiple volume servers. +diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5 - Example Output: - reference 127.0.0.1:8081 - fileId volumeServer message - 5,01617c3f61 127.0.0.1:8080 wrongSize +Example Output: +reference 127.0.0.1:8081 +fileId volumeServer message +5,01617c3f61 127.0.0.1:8080 wrongSize */ func main() { flag.Parse() util_http.InitGlobalHttpClient() - + util.LoadSecurityConfiguration() grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 164b5b238..5f1ea1375 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -28,12 +28,12 @@ This is to resolve an one-time issue that caused inconsistency with .dat and .id In this case, the .dat file contains all data, but some deletion caused incorrect offset. The .idx has all correct offsets. -1. fix the .dat file, a new .dat_fixed file will be generated. - go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads -2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file + 1. fix the .dat file, a new .dat_fixed file will be generated. + go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads + 2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file mv 9.dat_fixed 9.dat -3. fix the .idx file with the "weed fix" - weed fix -volumeId=9 -dir=/Users/chrislu/Downloads + 3. fix the .idx file with the "weed fix" + weed fix -volumeId=9 -dir=/Users/chrislu/Downloads */ func main() { flag.Parse() diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go index 1e591dff2..46e4cbf06 100644 --- a/unmaintained/s3/presigned_put/presigned_put.go +++ b/unmaintained/s3/presigned_put/presigned_put.go @@ -7,22 +7,25 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "net/http" "strings" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) // Downloads an item from an S3 Bucket in the region configured in the shared config // or AWS_REGION environment variable. // // Usage: -// go run presigned_put.go +// +// go run presigned_put.go +// // For this exampl to work, the domainName is needd -// weed s3 -domainName=localhost +// +// weed s3 -domainName=localhost func main() { util_http.InitGlobalHttpClient() - + h := md5.New() content := strings.NewReader(stringContent) content.WriteTo(h) diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go index cfdb36815..b148e4a4a 100644 --- a/unmaintained/stream_read_volume/stream_read_volume.go +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -12,8 +12,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" - "google.golang.org/grpc" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/grpc" ) var ( diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go index 6dc703dbc..a98da1d01 100644 --- a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go +++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "io" "log" "math/rand" @@ -13,7 +14,6 @@ import ( "strings" "sync" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go index 1cdcad0b3..1c3befe3d 100644 --- a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go +++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go @@ -4,6 +4,7 @@ import ( "bytes" "flag" "fmt" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "io" "log" "math/rand" @@ -14,7 +15,6 @@ import ( "strings" "sync" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index a75a095d4..03f728ad0 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -1,18 +1,18 @@ package main import ( + "context" "flag" "github.com/seaweedfs/seaweedfs/weed/pb" "log" "time" - "context" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/storage/needle" util2 "github.com/seaweedfs/seaweedfs/weed/util" - "golang.org/x/tools/godoc/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "golang.org/x/tools/godoc/util" ) var ( diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go index 5e7bc019e..f9322be42 100644 --- a/weed/filer/redis2/redis_store.go +++ b/weed/filer/redis2/redis_store.go @@ -61,14 +61,14 @@ func (store *Redis2Store) initialize(hostPort string, password string, database tlsConfig := &tls.Config{ Certificates: []tls.Certificate{clientCert}, - RootCAs: caCertPool, - ServerName: redisHost, - MinVersion: tls.VersionTLS12, + RootCAs: caCertPool, + ServerName: redisHost, + MinVersion: tls.VersionTLS12, } store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, + Addr: hostPort, + Password: password, + DB: database, TLSConfig: tlsConfig, }) } else { diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index d3836754f..c20f9eca8 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -89,23 +89,23 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { glog.Fatalf("setting file handle entry to nil") } fh.entry.SetEntry(entry) - + // Invalidate chunk offset cache since chunks may have changed fh.invalidateChunkCache() } func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry { result := fh.entry.UpdateEntry(fn) - + // Invalidate chunk offset cache since entry may have been modified fh.invalidateChunkCache() - + return result } func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { fh.entry.AppendChunks(chunks) - + // Invalidate chunk offset cache since new chunks were added fh.invalidateChunkCache() } diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3521a0df2..18f6df8a0 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -183,4 +183,3 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } - diff --git a/weed/query/engine/arithmetic_functions.go b/weed/query/engine/arithmetic_functions.go index fd8ac1684..e2237e31b 100644 --- a/weed/query/engine/arithmetic_functions.go +++ b/weed/query/engine/arithmetic_functions.go @@ -15,11 +15,11 @@ import ( type ArithmeticOperator string const ( - OpAdd ArithmeticOperator = "+" - OpSub ArithmeticOperator = "-" - OpMul ArithmeticOperator = "*" - OpDiv ArithmeticOperator = "/" - OpMod ArithmeticOperator = "%" + OpAdd ArithmeticOperator = "+" + OpSub ArithmeticOperator = "-" + OpMul ArithmeticOperator = "*" + OpDiv ArithmeticOperator = "/" + OpMod ArithmeticOperator = "%" ) // EvaluateArithmeticExpression evaluates basic arithmetic operations between two values @@ -69,7 +69,7 @@ func (e *SQLEngine) EvaluateArithmeticExpression(left, right *schema_pb.Value, o // Convert result back to appropriate schema value type // If both operands were integers and operation doesn't produce decimal, return integer - if e.isIntegerValue(left) && e.isIntegerValue(right) && + if e.isIntegerValue(left) && e.isIntegerValue(right) && (operator == OpAdd || operator == OpSub || operator == OpMul || operator == OpMod) { return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: int64(result)}, diff --git a/weed/query/engine/arithmetic_functions_test.go b/weed/query/engine/arithmetic_functions_test.go index 8c5e11dec..f07ada54f 100644 --- a/weed/query/engine/arithmetic_functions_test.go +++ b/weed/query/engine/arithmetic_functions_test.go @@ -10,131 +10,131 @@ func TestArithmeticOperations(t *testing.T) { engine := NewTestSQLEngine() tests := []struct { - name string - left *schema_pb.Value - right *schema_pb.Value - operator ArithmeticOperator - expected *schema_pb.Value - expectErr bool + name string + left *schema_pb.Value + right *schema_pb.Value + operator ArithmeticOperator + expected *schema_pb.Value + expectErr bool }{ // Addition tests { - name: "Add two integers", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpAdd, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 15}}, + name: "Add two integers", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpAdd, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 15}}, expectErr: false, }, { - name: "Add integer and float", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.5}}, - operator: OpAdd, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 15.5}}, + name: "Add integer and float", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.5}}, + operator: OpAdd, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 15.5}}, expectErr: false, }, // Subtraction tests { - name: "Subtract two integers", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}}, - operator: OpSub, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}}, + name: "Subtract two integers", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}}, + operator: OpSub, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}}, expectErr: false, }, // Multiplication tests { - name: "Multiply two integers", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 6}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}}, - operator: OpMul, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 42}}, + name: "Multiply two integers", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 6}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}}, + operator: OpMul, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 42}}, expectErr: false, }, { - name: "Multiply with float", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, - operator: OpMul, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 12.5}}, + name: "Multiply with float", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, + operator: OpMul, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 12.5}}, expectErr: false, }, // Division tests { - name: "Divide two integers", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 20}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}}, - operator: OpDiv, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.0}}, + name: "Divide two integers", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 20}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}}, + operator: OpDiv, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.0}}, expectErr: false, }, { - name: "Division by zero", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, - operator: OpDiv, - expected: nil, + name: "Division by zero", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, + operator: OpDiv, + expected: nil, expectErr: true, }, // Modulo tests { - name: "Modulo operation", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 17}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpMod, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}}, + name: "Modulo operation", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 17}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpMod, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}}, expectErr: false, }, { - name: "Modulo by zero", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, - operator: OpMod, - expected: nil, + name: "Modulo by zero", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, + operator: OpMod, + expected: nil, expectErr: true, }, // String conversion tests { - name: "Add string number to integer", - left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "15"}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpAdd, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 20.0}}, + name: "Add string number to integer", + left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "15"}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpAdd, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 20.0}}, expectErr: false, }, { - name: "Invalid string conversion", - left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "not_a_number"}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpAdd, - expected: nil, + name: "Invalid string conversion", + left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "not_a_number"}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpAdd, + expected: nil, expectErr: true, }, // Boolean conversion tests { - name: "Add boolean to integer", - left: &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: true}}, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpAdd, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 6.0}}, + name: "Add boolean to integer", + left: &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: true}}, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpAdd, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 6.0}}, expectErr: false, }, // Null value tests { - name: "Add with null left operand", - left: nil, - right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - operator: OpAdd, - expected: nil, + name: "Add with null left operand", + left: nil, + right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + operator: OpAdd, + expected: nil, expectErr: true, }, { - name: "Add with null right operand", - left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - right: nil, - operator: OpAdd, - expected: nil, + name: "Add with null right operand", + left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + right: nil, + operator: OpAdd, + expected: nil, expectErr: true, }, } @@ -203,7 +203,7 @@ func TestIndividualArithmeticFunctions(t *testing.T) { if err != nil { t.Errorf("Divide function failed: %v", err) } - expected = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 10.0/3.0}} + expected = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 10.0 / 3.0}} if !valuesEqual(result, expected) { t.Errorf("Divide: Expected %v, got %v", expected, result) } @@ -224,45 +224,45 @@ func TestMathematicalFunctions(t *testing.T) { t.Run("ROUND function tests", func(t *testing.T) { tests := []struct { - name string - value *schema_pb.Value - precision *schema_pb.Value - expected *schema_pb.Value - expectErr bool + name string + value *schema_pb.Value + precision *schema_pb.Value + expected *schema_pb.Value + expectErr bool }{ { - name: "Round float to integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.7}}, + name: "Round float to integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.7}}, precision: nil, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 4.0}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 4.0}}, expectErr: false, }, { - name: "Round integer stays integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + name: "Round integer stays integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, precision: nil, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, expectErr: false, }, { - name: "Round with precision 2", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14159}}, + name: "Round with precision 2", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14159}}, precision: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, expectErr: false, }, { - name: "Round negative number", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.7}}, + name: "Round negative number", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.7}}, precision: nil, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -4.0}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -4.0}}, expectErr: false, }, { - name: "Round null value", - value: nil, + name: "Round null value", + value: nil, precision: nil, - expected: nil, + expected: nil, expectErr: true, }, } @@ -299,33 +299,33 @@ func TestMathematicalFunctions(t *testing.T) { t.Run("CEIL function tests", func(t *testing.T) { tests := []struct { - name string - value *schema_pb.Value - expected *schema_pb.Value - expectErr bool + name string + value *schema_pb.Value + expected *schema_pb.Value + expectErr bool }{ { - name: "Ceil positive decimal", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.2}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}}, + name: "Ceil positive decimal", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.2}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}}, expectErr: false, }, { - name: "Ceil negative decimal", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -3}}, + name: "Ceil negative decimal", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -3}}, expectErr: false, }, { - name: "Ceil integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + name: "Ceil integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, expectErr: false, }, { - name: "Ceil null value", - value: nil, - expected: nil, + name: "Ceil null value", + value: nil, + expected: nil, expectErr: true, }, } @@ -355,33 +355,33 @@ func TestMathematicalFunctions(t *testing.T) { t.Run("FLOOR function tests", func(t *testing.T) { tests := []struct { - name string - value *schema_pb.Value - expected *schema_pb.Value - expectErr bool + name string + value *schema_pb.Value + expected *schema_pb.Value + expectErr bool }{ { - name: "Floor positive decimal", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.8}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}}, + name: "Floor positive decimal", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.8}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}}, expectErr: false, }, { - name: "Floor negative decimal", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -4}}, + name: "Floor negative decimal", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -4}}, expectErr: false, }, { - name: "Floor integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + name: "Floor integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, expectErr: false, }, { - name: "Floor null value", - value: nil, - expected: nil, + name: "Floor null value", + value: nil, + expected: nil, expectErr: true, }, } @@ -411,57 +411,57 @@ func TestMathematicalFunctions(t *testing.T) { t.Run("ABS function tests", func(t *testing.T) { tests := []struct { - name string - value *schema_pb.Value - expected *schema_pb.Value - expectErr bool + name string + value *schema_pb.Value + expected *schema_pb.Value + expectErr bool }{ { - name: "Abs positive integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + name: "Abs positive integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, expectErr: false, }, { - name: "Abs negative integer", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, + name: "Abs negative integer", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}}, expectErr: false, }, { - name: "Abs positive double", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, + name: "Abs positive double", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, expectErr: false, }, { - name: "Abs negative double", - value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.14}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, + name: "Abs negative double", + value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.14}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}}, expectErr: false, }, { - name: "Abs positive float", - value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, + name: "Abs positive float", + value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, expectErr: false, }, { - name: "Abs negative float", - value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: -2.5}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, + name: "Abs negative float", + value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: -2.5}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}}, expectErr: false, }, { - name: "Abs zero", - value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, - expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, + name: "Abs zero", + value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, + expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, expectErr: false, }, { - name: "Abs null value", - value: nil, - expected: nil, + name: "Abs null value", + value: nil, + expected: nil, expectErr: true, }, } diff --git a/weed/query/engine/datetime_functions.go b/weed/query/engine/datetime_functions.go index 2ece58e15..9803145f0 100644 --- a/weed/query/engine/datetime_functions.go +++ b/weed/query/engine/datetime_functions.go @@ -16,7 +16,7 @@ import ( func (e *SQLEngine) CurrentDate() (*schema_pb.Value, error) { now := time.Now() dateStr := now.Format("2006-01-02") - + return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: dateStr}, }, nil @@ -25,10 +25,10 @@ func (e *SQLEngine) CurrentDate() (*schema_pb.Value, error) { // CurrentTimestamp returns the current timestamp func (e *SQLEngine) CurrentTimestamp() (*schema_pb.Value, error) { now := time.Now() - + // Return as TimestampValue with microseconds timestampMicros := now.UnixMicro() - + return &schema_pb.Value{ Kind: &schema_pb.Value_TimestampValue{ TimestampValue: &schema_pb.TimestampValue{ @@ -42,7 +42,7 @@ func (e *SQLEngine) CurrentTimestamp() (*schema_pb.Value, error) { func (e *SQLEngine) CurrentTime() (*schema_pb.Value, error) { now := time.Now() timeStr := now.Format("15:04:05") - + return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: timeStr}, }, nil @@ -61,13 +61,13 @@ func (e *SQLEngine) Now() (*schema_pb.Value, error) { type DatePart string const ( - PartYear DatePart = "YEAR" - PartMonth DatePart = "MONTH" - PartDay DatePart = "DAY" - PartHour DatePart = "HOUR" - PartMinute DatePart = "MINUTE" - PartSecond DatePart = "SECOND" - PartWeek DatePart = "WEEK" + PartYear DatePart = "YEAR" + PartMonth DatePart = "MONTH" + PartDay DatePart = "DAY" + PartHour DatePart = "HOUR" + PartMinute DatePart = "MINUTE" + PartSecond DatePart = "SECOND" + PartWeek DatePart = "WEEK" PartDayOfYear DatePart = "DOY" PartDayOfWeek DatePart = "DOW" PartQuarter DatePart = "QUARTER" @@ -172,7 +172,7 @@ func (e *SQLEngine) DateTrunc(precision string, value *schema_pb.Value) (*schema case "year", "years": truncated = time.Date(t.Year(), 1, 1, 0, 0, 0, 0, t.Location()) case "decade", "decades": - year := (t.Year()/10) * 10 + year := (t.Year() / 10) * 10 truncated = time.Date(year, 1, 1, 0, 0, 0, 0, t.Location()) case "century", "centuries": year := ((t.Year()-1)/100)*100 + 1 diff --git a/weed/remote_storage/azure/azure_highlevel.go b/weed/remote_storage/azure/azure_highlevel.go deleted file mode 100644 index a5cd4070b..000000000 --- a/weed/remote_storage/azure/azure_highlevel.go +++ /dev/null @@ -1,120 +0,0 @@ -package azure - -import ( - "context" - "crypto/rand" - "encoding/base64" - "errors" - "fmt" - "github.com/Azure/azure-pipeline-go/pipeline" - . "github.com/Azure/azure-storage-blob-go/azblob" - "io" - "sync" -) - -// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6 -// uploadReaderAtToBlockBlob was not public - -// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob. -func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64, - blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) { - if o.BlockSize == 0 { - // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error - if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks { - return nil, errors.New("buffer is too large to upload to a block blob") - } - // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request - if readerSize <= BlockBlobMaxUploadBlobBytes { - o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified - } else { - o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks - if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB - o.BlockSize = BlobDefaultDownloadBlockSize - } - // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize). - } - } - - if readerSize <= BlockBlobMaxUploadBlobBytes { - // If the size can fit in 1 Upload call, do it this way - var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize) - if o.Progress != nil { - body = pipeline.NewRequestBodyProgress(body, o.Progress) - } - return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions) - } - - var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1) - - blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs - progress := int64(0) - progressLock := &sync.Mutex{} - - err := DoBatchTransfer(ctx, BatchTransferOptions{ - OperationName: "uploadReaderAtToBlockBlob", - TransferSize: readerSize, - ChunkSize: o.BlockSize, - Parallelism: o.Parallelism, - Operation: func(offset int64, count int64, ctx context.Context) error { - // This function is called once per block. - // It is passed this block's offset within the buffer and its count of bytes - // Prepare to read the proper block/section of the buffer - var body io.ReadSeeker = io.NewSectionReader(reader, offset, count) - blockNum := offset / o.BlockSize - if o.Progress != nil { - blockProgress := int64(0) - body = pipeline.NewRequestBodyProgress(body, - func(bytesTransferred int64) { - diff := bytesTransferred - blockProgress - blockProgress = bytesTransferred - progressLock.Lock() // 1 goroutine at a time gets a progress report - progress += diff - o.Progress(progress) - progressLock.Unlock() - }) - } - - // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks - // at the same time causing PutBlockList to get a mix of blocks from all the clients. - blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes()) - _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions) - return err - }, - }) - if err != nil { - return nil, err - } - // All put blocks were successful, call Put Block List to finalize the blob - return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions) -} - -// The UUID reserved variants. -const ( - reservedNCS byte = 0x80 - reservedRFC4122 byte = 0x40 - reservedMicrosoft byte = 0x20 - reservedFuture byte = 0x00 -) - -type uuid [16]byte - -// NewUUID returns a new uuid using RFC 4122 algorithm. -func newUUID() (u uuid) { - u = uuid{} - // Set all bits to randomly (or pseudo-randomly) chosen values. - rand.Read(u[:]) - u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122) - - var version byte = 4 - u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4) - return -} - -// String returns an unparsed version of the generated UUID sequence. -func (u uuid) String() string { - return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:]) -} - -func (u uuid) bytes() []byte { - return u[:] -} diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go index 8183c77a4..bfedd68e2 100644 --- a/weed/remote_storage/azure/azure_storage_client.go +++ b/weed/remote_storage/azure/azure_storage_client.go @@ -3,21 +3,58 @@ package azure import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "io" - "net/url" "os" "reflect" + "regexp" "strings" - - "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/seaweedfs/seaweedfs/weed/filer" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/util" ) +const ( + defaultBlockSize = 4 * 1024 * 1024 + defaultConcurrency = 16 +) + +// invalidMetadataChars matches any character that is not valid in Azure metadata keys. +// Azure metadata keys must be valid C# identifiers: letters, digits, and underscores only. +var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`) + +// sanitizeMetadataKey converts an S3 metadata key to a valid Azure metadata key. +// Azure metadata keys must be valid C# identifiers (letters, digits, underscores only, cannot start with digit). +// To prevent collisions, invalid characters are replaced with their hex representation (_XX_). +// Examples: +// - "my-key" -> "my_2d_key" +// - "my.key" -> "my_2e_key" +// - "key@value" -> "key_40_value" +func sanitizeMetadataKey(key string) string { + // Replace each invalid character with _XX_ where XX is the hex code + result := invalidMetadataChars.ReplaceAllStringFunc(key, func(s string) string { + return fmt.Sprintf("_%02x_", s[0]) + }) + + // Azure metadata keys cannot start with a digit + if len(result) > 0 && result[0] >= '0' && result[0] <= '9' { + result = "_" + result + } + + return result +} + func init() { remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker) } @@ -42,25 +79,35 @@ func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storag } } - // Use your Storage account's name and key to create a credential object. + // Create credential and client credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err) + return nil, fmt.Errorf("invalid Azure credential with account name:%s: %w", accountName, err) } - // Create a request pipeline that is used to process HTTP(S) requests and responses. - p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) + azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Retry: policy.RetryOptions{ + MaxRetries: 10, // Increased from default 3 to maintain resiliency similar to old SDK's 20 + TryTimeout: time.Minute, + RetryDelay: 2 * time.Second, + MaxRetryDelay: time.Minute, + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to create Azure client: %w", err) + } - // Create an ServiceURL object that wraps the service URL and a request pipeline. - u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName)) - client.serviceURL = azblob.NewServiceURL(*u, p) + client.client = azClient return client, nil } type azureRemoteStorageClient struct { - conf *remote_pb.RemoteConf - serviceURL azblob.ServiceURL + conf *remote_pb.RemoteConf + client *azblob.Client } var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) @@ -68,59 +115,74 @@ var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{}) func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { pathKey := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) - - // List the container that we have created above - for marker := (azblob.Marker{}); marker.NotDone(); { - // Get a result segment starting with the blob indicated by the current Marker. - listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{ - Prefix: pathKey, - }) + containerClient := az.client.ServiceClient().NewContainerClient(loc.Bucket) + + // List blobs with pager + pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Prefix: &pathKey, + }) + + for pager.More() { + resp, err := pager.NextPage(context.Background()) if err != nil { - return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err) + return fmt.Errorf("azure traverse %s%s: %w", loc.Bucket, loc.Path, err) } - // ListBlobs returns the start of the next segment; you MUST use this to get - // the next segment (after processing the current result segment). - marker = listBlob.NextMarker - - // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) - for _, blobInfo := range listBlob.Segment.BlobItems { - key := blobInfo.Name - key = "/" + key + for _, blobItem := range resp.Segment.BlobItems { + if blobItem.Name == nil { + continue + } + key := "/" + *blobItem.Name dir, name := util.FullPath(key).DirAndName() - err = visitFn(dir, name, false, &filer_pb.RemoteEntry{ - RemoteMtime: blobInfo.Properties.LastModified.Unix(), - RemoteSize: *blobInfo.Properties.ContentLength, - RemoteETag: string(blobInfo.Properties.Etag), + + remoteEntry := &filer_pb.RemoteEntry{ StorageName: az.conf.Name, - }) + } + if blobItem.Properties != nil { + if blobItem.Properties.LastModified != nil { + remoteEntry.RemoteMtime = blobItem.Properties.LastModified.Unix() + } + if blobItem.Properties.ContentLength != nil { + remoteEntry.RemoteSize = *blobItem.Properties.ContentLength + } + if blobItem.Properties.ETag != nil { + remoteEntry.RemoteETag = string(*blobItem.Properties.ETag) + } + } + + err = visitFn(dir, name, false, remoteEntry) if err != nil { - return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err) + return fmt.Errorf("azure processing %s%s: %w", loc.Bucket, loc.Path, err) } } } return } + func (az *azureRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { key := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) - blobURL := containerURL.NewBlockBlobURL(key) + blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key) - downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if readErr != nil { - return nil, readErr + count := size + if count == 0 { + count = blob.CountToEnd } - // NOTE: automatically retries are performed if the connection fails - bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}) - defer bodyStream.Close() - - data, err = io.ReadAll(bodyStream) + downloadResp, err := blobClient.DownloadStream(context.Background(), &blob.DownloadStreamOptions{ + Range: blob.HTTPRange{ + Offset: offset, + Count: count, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to download file %s%s: %w", loc.Bucket, loc.Path, err) + } + defer downloadResp.Body.Close() + data, err = io.ReadAll(downloadResp.Body) if err != nil { - return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) + return nil, fmt.Errorf("failed to read download stream %s%s: %w", loc.Bucket, loc.Path, err) } return @@ -137,23 +199,23 @@ func (az *azureRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorage func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { key := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) - blobURL := containerURL.NewBlockBlobURL(key) + blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key) - readerAt, ok := reader.(io.ReaderAt) - if !ok { - return nil, fmt.Errorf("unexpected reader: readerAt expected") + // Upload from reader + metadata := toMetadata(entry.Extended) + httpHeaders := &blob.HTTPHeaders{} + if entry.Attributes != nil && entry.Attributes.Mime != "" { + httpHeaders.BlobContentType = &entry.Attributes.Mime } - fileSize := int64(filer.FileSize(entry)) - _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{ - BlockSize: 4 * 1024 * 1024, - BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: entry.Attributes.Mime}, - Metadata: toMetadata(entry.Extended), - Parallelism: 16, + _, err = blobClient.UploadStream(context.Background(), reader, &blockblob.UploadStreamOptions{ + BlockSize: defaultBlockSize, + Concurrency: defaultConcurrency, + HTTPHeaders: httpHeaders, + Metadata: metadata, }) if err != nil { - return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err) + return nil, fmt.Errorf("azure upload to %s%s: %w", loc.Bucket, loc.Path, err) } // read back the remote entry @@ -162,36 +224,45 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { key := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) - blobURL := containerURL.NewBlockBlobURL(key) - - attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key) + props, err := blobClient.GetProperties(context.Background(), nil) if err != nil { return nil, err } - return &filer_pb.RemoteEntry{ - RemoteMtime: attr.LastModified().Unix(), - RemoteSize: attr.ContentLength(), - RemoteETag: string(attr.ETag()), + remoteEntry := &filer_pb.RemoteEntry{ StorageName: az.conf.Name, - }, nil + } + + if props.LastModified != nil { + remoteEntry.RemoteMtime = props.LastModified.Unix() + } + if props.ContentLength != nil { + remoteEntry.RemoteSize = *props.ContentLength + } + if props.ETag != nil { + remoteEntry.RemoteETag = string(*props.ETag) + } + return remoteEntry, nil } -func toMetadata(attributes map[string][]byte) map[string]string { - metadata := make(map[string]string) +func toMetadata(attributes map[string][]byte) map[string]*string { + metadata := make(map[string]*string) for k, v := range attributes { if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { - metadata[k[len(s3_constants.AmzUserMetaPrefix):]] = string(v) + // S3 stores metadata keys in lowercase; normalize for consistency. + key := strings.ToLower(k[len(s3_constants.AmzUserMetaPrefix):]) + + // Sanitize key to prevent collisions and ensure Azure compliance + key = sanitizeMetadataKey(key) + + val := string(v) + metadata[key] = &val } } - parsed_metadata := make(map[string]string) - for k, v := range metadata { - parsed_metadata[strings.Replace(k, "-", "_", -1)] = v - } - return parsed_metadata + return metadata } func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) { @@ -201,54 +272,68 @@ func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStor metadata := toMetadata(newEntry.Extended) key := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) + blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key) - _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + _, err = blobClient.SetMetadata(context.Background(), metadata, nil) return } func (az *azureRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { key := loc.Path[1:] - containerURL := az.serviceURL.NewContainerURL(loc.Bucket) - if _, err = containerURL.NewBlobURL(key).Delete(context.Background(), - azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { - return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err) + blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key) + + _, err = blobClient.Delete(context.Background(), &blob.DeleteOptions{ + DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), + }) + if err != nil { + // Make delete idempotent - don't return error if blob doesn't exist + if bloberror.HasCode(err, bloberror.BlobNotFound) { + return nil + } + return fmt.Errorf("azure delete %s%s: %w", loc.Bucket, loc.Path, err) } return } func (az *azureRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) { - ctx := context.Background() - for containerMarker := (azblob.Marker{}); containerMarker.NotDone(); { - listContainer, err := az.serviceURL.ListContainersSegment(ctx, containerMarker, azblob.ListContainersSegmentOptions{}) - if err == nil { - for _, v := range listContainer.ContainerItems { - buckets = append(buckets, &remote_storage.Bucket{ - Name: v.Name, - CreatedAt: v.Properties.LastModified, - }) - } - } else { + pager := az.client.NewListContainersPager(nil) + + for pager.More() { + resp, err := pager.NextPage(context.Background()) + if err != nil { return buckets, err } - containerMarker = listContainer.NextMarker + + for _, containerItem := range resp.ContainerItems { + if containerItem.Name != nil { + bucket := &remote_storage.Bucket{ + Name: *containerItem.Name, + } + if containerItem.Properties != nil && containerItem.Properties.LastModified != nil { + bucket.CreatedAt = *containerItem.Properties.LastModified + } + buckets = append(buckets, bucket) + } + } } return } func (az *azureRemoteStorageClient) CreateBucket(name string) (err error) { - containerURL := az.serviceURL.NewContainerURL(name) - if _, err = containerURL.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessNone); err != nil { - return fmt.Errorf("create bucket %s: %v", name, err) + containerClient := az.client.ServiceClient().NewContainerClient(name) + _, err = containerClient.Create(context.Background(), nil) + if err != nil { + return fmt.Errorf("create bucket %s: %w", name, err) } return } func (az *azureRemoteStorageClient) DeleteBucket(name string) (err error) { - containerURL := az.serviceURL.NewContainerURL(name) - if _, err = containerURL.Delete(context.Background(), azblob.ContainerAccessConditions{}); err != nil { - return fmt.Errorf("delete bucket %s: %v", name, err) + containerClient := az.client.ServiceClient().NewContainerClient(name) + _, err = containerClient.Delete(context.Background(), nil) + if err != nil { + return fmt.Errorf("delete bucket %s: %w", name, err) } return } diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go new file mode 100644 index 000000000..acb7dbd17 --- /dev/null +++ b/weed/remote_storage/azure/azure_storage_client_test.go @@ -0,0 +1,377 @@ +package azure + +import ( + "bytes" + "fmt" + "os" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// TestAzureStorageClientBasic tests basic Azure storage client operations +func TestAzureStorageClientBasic(t *testing.T) { + // Skip if credentials not available + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure storage test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + // Create client + maker := azureRemoteStorageMaker{} + conf := &remote_pb.RemoteConf{ + Name: "test-azure", + AzureAccountName: accountName, + AzureAccountKey: accountKey, + } + + client, err := maker.Make(conf) + if err != nil { + t.Fatalf("Failed to create Azure client: %v", err) + } + + azClient := client.(*azureRemoteStorageClient) + + // Test 1: Create bucket/container + t.Run("CreateBucket", func(t *testing.T) { + err := azClient.CreateBucket(testContainer) + // Ignore error if bucket already exists + if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) { + t.Fatalf("Failed to create bucket: %v", err) + } + }) + + // Test 2: List buckets + t.Run("ListBuckets", func(t *testing.T) { + buckets, err := azClient.ListBuckets() + if err != nil { + t.Fatalf("Failed to list buckets: %v", err) + } + if len(buckets) == 0 { + t.Log("No buckets found (might be expected)") + } else { + t.Logf("Found %d buckets", len(buckets)) + } + }) + + // Test 3: Write file + testContent := []byte("Hello from SeaweedFS Azure SDK migration test!") + testKey := fmt.Sprintf("/test-file-%d.txt", time.Now().Unix()) + loc := &remote_pb.RemoteStorageLocation{ + Name: "test-azure", + Bucket: testContainer, + Path: testKey, + } + + t.Run("WriteFile", func(t *testing.T) { + entry := &filer_pb.Entry{ + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Mime: "text/plain", + }, + Extended: map[string][]byte{ + "x-amz-meta-test-key": []byte("test-value"), + }, + } + + reader := bytes.NewReader(testContent) + remoteEntry, err := azClient.WriteFile(loc, entry, reader) + if err != nil { + t.Fatalf("Failed to write file: %v", err) + } + if remoteEntry == nil { + t.Fatal("Remote entry is nil") + } + if remoteEntry.RemoteSize != int64(len(testContent)) { + t.Errorf("Expected size %d, got %d", len(testContent), remoteEntry.RemoteSize) + } + }) + + // Test 4: Read file + t.Run("ReadFile", func(t *testing.T) { + data, err := azClient.ReadFile(loc, 0, int64(len(testContent))) + if err != nil { + t.Fatalf("Failed to read file: %v", err) + } + if !bytes.Equal(data, testContent) { + t.Errorf("Content mismatch. Expected: %s, Got: %s", testContent, data) + } + }) + + // Test 5: Read partial file + t.Run("ReadPartialFile", func(t *testing.T) { + data, err := azClient.ReadFile(loc, 0, 5) + if err != nil { + t.Fatalf("Failed to read partial file: %v", err) + } + expected := testContent[:5] + if !bytes.Equal(data, expected) { + t.Errorf("Content mismatch. Expected: %s, Got: %s", expected, data) + } + }) + + // Test 6: Update metadata + t.Run("UpdateMetadata", func(t *testing.T) { + oldEntry := &filer_pb.Entry{ + Extended: map[string][]byte{ + "x-amz-meta-test-key": []byte("test-value"), + }, + } + newEntry := &filer_pb.Entry{ + Extended: map[string][]byte{ + "x-amz-meta-test-key": []byte("test-value"), + "x-amz-meta-new-key": []byte("new-value"), + }, + } + err := azClient.UpdateFileMetadata(loc, oldEntry, newEntry) + if err != nil { + t.Fatalf("Failed to update metadata: %v", err) + } + }) + + // Test 7: Traverse (list objects) + t.Run("Traverse", func(t *testing.T) { + foundFile := false + err := azClient.Traverse(loc, func(dir string, name string, isDir bool, remoteEntry *filer_pb.RemoteEntry) error { + if !isDir && name == testKey[1:] { // Remove leading slash + foundFile = true + } + return nil + }) + if err != nil { + t.Fatalf("Failed to traverse: %v", err) + } + if !foundFile { + t.Log("Test file not found in traverse (might be expected due to path matching)") + } + }) + + // Test 8: Delete file + t.Run("DeleteFile", func(t *testing.T) { + err := azClient.DeleteFile(loc) + if err != nil { + t.Fatalf("Failed to delete file: %v", err) + } + }) + + // Test 9: Verify file deleted (should fail) + t.Run("VerifyDeleted", func(t *testing.T) { + _, err := azClient.ReadFile(loc, 0, 10) + if !bloberror.HasCode(err, bloberror.BlobNotFound) { + t.Errorf("Expected BlobNotFound error, but got: %v", err) + } + }) + + // Clean up: Try to delete the test container + // Comment out if you want to keep the container + /* + t.Run("DeleteBucket", func(t *testing.T) { + err := azClient.DeleteBucket(testContainer) + if err != nil { + t.Logf("Warning: Failed to delete bucket: %v", err) + } + }) + */ +} + +// TestToMetadata tests the metadata conversion function +func TestToMetadata(t *testing.T) { + tests := []struct { + name string + input map[string][]byte + expected map[string]*string + }{ + { + name: "basic metadata", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "key1": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "key2": []byte("value2"), + }, + expected: map[string]*string{ + "key1": stringPtr("value1"), + "key2": stringPtr("value2"), + }, + }, + { + name: "metadata with dashes", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "content-type": []byte("text/plain"), + }, + expected: map[string]*string{ + "content_2d_type": stringPtr("text/plain"), // dash (0x2d) -> _2d_ + }, + }, + { + name: "non-metadata keys ignored", + input: map[string][]byte{ + "some-other-key": []byte("ignored"), + s3_constants.AmzUserMetaPrefix + "included": []byte("included"), + }, + expected: map[string]*string{ + "included": stringPtr("included"), + }, + }, + { + name: "keys starting with digits", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "123key": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "456-test": []byte("value2"), + s3_constants.AmzUserMetaPrefix + "789": []byte("value3"), + }, + expected: map[string]*string{ + "_123key": stringPtr("value1"), // starts with digit -> prefix _ + "_456_2d_test": stringPtr("value2"), // starts with digit AND has dash + "_789": stringPtr("value3"), + }, + }, + { + name: "uppercase and mixed case keys", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "My-Key": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "UPPERCASE": []byte("value2"), + s3_constants.AmzUserMetaPrefix + "MiXeD-CaSe": []byte("value3"), + }, + expected: map[string]*string{ + "my_2d_key": stringPtr("value1"), // lowercase + dash -> _2d_ + "uppercase": stringPtr("value2"), + "mixed_2d_case": stringPtr("value3"), + }, + }, + { + name: "keys with invalid characters", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "my.key": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "key+plus": []byte("value2"), + s3_constants.AmzUserMetaPrefix + "key@symbol": []byte("value3"), + s3_constants.AmzUserMetaPrefix + "key-with.": []byte("value4"), + s3_constants.AmzUserMetaPrefix + "key/slash": []byte("value5"), + }, + expected: map[string]*string{ + "my_2e_key": stringPtr("value1"), // dot (0x2e) -> _2e_ + "key_2b_plus": stringPtr("value2"), // plus (0x2b) -> _2b_ + "key_40_symbol": stringPtr("value3"), // @ (0x40) -> _40_ + "key_2d_with_2e_": stringPtr("value4"), // dash and dot + "key_2f_slash": stringPtr("value5"), // slash (0x2f) -> _2f_ + }, + }, + { + name: "collision prevention", + input: map[string][]byte{ + s3_constants.AmzUserMetaPrefix + "my-key": []byte("value1"), + s3_constants.AmzUserMetaPrefix + "my.key": []byte("value2"), + s3_constants.AmzUserMetaPrefix + "my_key": []byte("value3"), + }, + expected: map[string]*string{ + "my_2d_key": stringPtr("value1"), // dash (0x2d) + "my_2e_key": stringPtr("value2"), // dot (0x2e) + "my_key": stringPtr("value3"), // underscore is valid, no encoding + }, + }, + { + name: "empty input", + input: map[string][]byte{}, + expected: map[string]*string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := toMetadata(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("Expected %d keys, got %d", len(tt.expected), len(result)) + } + for key, expectedVal := range tt.expected { + if resultVal, ok := result[key]; !ok { + t.Errorf("Expected key %s not found", key) + } else if resultVal == nil || expectedVal == nil { + if resultVal != expectedVal { + t.Errorf("For key %s: expected %v, got %v", key, expectedVal, resultVal) + } + } else if *resultVal != *expectedVal { + t.Errorf("For key %s: expected %s, got %s", key, *expectedVal, *resultVal) + } + } + }) + } +} + +func contains(s, substr string) bool { + return bytes.Contains([]byte(s), []byte(substr)) +} + +func stringPtr(s string) *string { + return &s +} + +// Benchmark tests +func BenchmarkToMetadata(b *testing.B) { + input := map[string][]byte{ + "x-amz-meta-key1": []byte("value1"), + "x-amz-meta-key2": []byte("value2"), + "x-amz-meta-content-type": []byte("text/plain"), + "other-key": []byte("ignored"), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + toMetadata(input) + } +} + +// Test that the maker implements the interface +func TestAzureRemoteStorageMaker(t *testing.T) { + maker := azureRemoteStorageMaker{} + + if !maker.HasBucket() { + t.Error("Expected HasBucket() to return true") + } + + // Test with missing credentials + conf := &remote_pb.RemoteConf{ + Name: "test", + } + _, err := maker.Make(conf) + if err == nil { + t.Error("Expected error with missing credentials") + } +} + +// Test error cases +func TestAzureStorageClientErrors(t *testing.T) { + // Test with invalid credentials + maker := azureRemoteStorageMaker{} + conf := &remote_pb.RemoteConf{ + Name: "test", + AzureAccountName: "invalid", + AzureAccountKey: "aW52YWxpZGtleQ==", // base64 encoded "invalidkey" + } + + client, err := maker.Make(conf) + if err != nil { + t.Skip("Invalid credentials correctly rejected at client creation") + } + + // If client creation succeeded, operations should fail + azClient := client.(*azureRemoteStorageClient) + loc := &remote_pb.RemoteStorageLocation{ + Name: "test", + Bucket: "nonexistent", + Path: "/test.txt", + } + + // These operations should fail with invalid credentials + _, err = azClient.ReadFile(loc, 0, 10) + if err == nil { + t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)") + } +} diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index fb28355bc..b0e40e1a7 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -3,24 +3,31 @@ package azuresink import ( "bytes" "context" + "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" "net/http" - "net/url" "strings" "time" - "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/util" ) type AzureSink struct { - containerURL azblob.ContainerURL + client *azblob.Client container string dir string filerSource *source.FilerSource @@ -61,20 +68,28 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e g.container = container g.dir = dir - // Use your Storage account's name and key to create a credential object. + // Create credential and client credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - glog.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err) + return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err) } - // Create a request pipeline that is used to process HTTP(S) requests and responses. - p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) - - // Create an ServiceURL object that wraps the service URL and a request pipeline. - u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName)) - serviceURL := azblob.NewServiceURL(*u, p) + serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) + client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Retry: policy.RetryOptions{ + MaxRetries: 10, // Increased from default 3 for replication sink resiliency + TryTimeout: time.Minute, + RetryDelay: 2 * time.Second, + MaxRetryDelay: time.Minute, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to create Azure client: %w", err) + } - g.containerURL = serviceURL.NewContainerURL(g.container) + g.client = client return nil } @@ -87,13 +102,19 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo key = key + "/" } - if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(), - azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { - return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err) + blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key) + _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{ + DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude), + }) + if err != nil { + // Make delete idempotent - don't return error if blob doesn't exist + if bloberror.HasCode(err, bloberror.BlobNotFound) { + return nil + } + return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err) } return nil - } func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { @@ -107,26 +128,38 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] totalSize := filer.FileSize(entry) chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) - // Create a URL that references a to-be-created blob in your - // Azure Storage account's container. - appendBlobURL := g.containerURL.NewAppendBlobURL(key) + // Create append blob client + appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key) - accessCondition := azblob.BlobAccessConditions{} + // Create blob with access conditions + accessConditions := &blob.AccessConditions{} if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - accessCondition.ModifiedAccessConditions.IfUnmodifiedSince = time.Unix(entry.Attributes.Mtime, 0) + modifiedTime := time.Unix(entry.Attributes.Mtime, 0) + accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{ + IfUnmodifiedSince: &modifiedTime, + } } - res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{}) - if res != nil && res.StatusCode() == http.StatusPreconditionFailed { - glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err) - return nil - } + _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{ + AccessConditions: accessConditions, + }) + if err != nil { - return err + if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { + // Blob already exists, which is fine for an append blob - we can append to it + } else { + // Check if this is a precondition failed error (HTTP 412) + var respErr *azcore.ResponseError + if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed { + glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key) + return nil + } + return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) + } } writeFunc := func(data []byte) error { - _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) + _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{}) return writeErr } @@ -139,7 +172,6 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] } return nil - } func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { diff --git a/weed/replication/sink/azuresink/azure_sink_test.go b/weed/replication/sink/azuresink/azure_sink_test.go new file mode 100644 index 000000000..e139086e6 --- /dev/null +++ b/weed/replication/sink/azuresink/azure_sink_test.go @@ -0,0 +1,355 @@ +package azuresink + +import ( + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// MockConfiguration for testing +type mockConfiguration struct { + values map[string]interface{} +} + +func newMockConfiguration() *mockConfiguration { + return &mockConfiguration{ + values: make(map[string]interface{}), + } +} + +func (m *mockConfiguration) GetString(key string) string { + if v, ok := m.values[key]; ok { + return v.(string) + } + return "" +} + +func (m *mockConfiguration) GetBool(key string) bool { + if v, ok := m.values[key]; ok { + return v.(bool) + } + return false +} + +func (m *mockConfiguration) GetInt(key string) int { + if v, ok := m.values[key]; ok { + return v.(int) + } + return 0 +} + +func (m *mockConfiguration) GetInt64(key string) int64 { + if v, ok := m.values[key]; ok { + return v.(int64) + } + return 0 +} + +func (m *mockConfiguration) GetFloat64(key string) float64 { + if v, ok := m.values[key]; ok { + return v.(float64) + } + return 0.0 +} + +func (m *mockConfiguration) GetStringSlice(key string) []string { + if v, ok := m.values[key]; ok { + return v.([]string) + } + return nil +} + +func (m *mockConfiguration) SetDefault(key string, value interface{}) { + if _, exists := m.values[key]; !exists { + m.values[key] = value + } +} + +// Test the AzureSink interface implementation +func TestAzureSinkInterface(t *testing.T) { + sink := &AzureSink{} + + if sink.GetName() != "azure" { + t.Errorf("Expected name 'azure', got '%s'", sink.GetName()) + } + + // Test directory setting + sink.dir = "/test/dir" + if sink.GetSinkToDirectory() != "/test/dir" { + t.Errorf("Expected directory '/test/dir', got '%s'", sink.GetSinkToDirectory()) + } + + // Test incremental setting + sink.isIncremental = true + if !sink.IsIncremental() { + t.Error("Expected isIncremental to be true") + } +} + +// Test Azure sink initialization +func TestAzureSinkInitialization(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + sink := &AzureSink{} + + err := sink.initialize(accountName, accountKey, testContainer, "/test") + if err != nil { + t.Fatalf("Failed to initialize Azure sink: %v", err) + } + + if sink.container != testContainer { + t.Errorf("Expected container '%s', got '%s'", testContainer, sink.container) + } + + if sink.dir != "/test" { + t.Errorf("Expected dir '/test', got '%s'", sink.dir) + } + + if sink.client == nil { + t.Error("Expected client to be initialized") + } +} + +// Test configuration-based initialization +func TestAzureSinkInitializeFromConfig(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink config test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + config := newMockConfiguration() + config.values["azure.account_name"] = accountName + config.values["azure.account_key"] = accountKey + config.values["azure.container"] = testContainer + config.values["azure.directory"] = "/test" + config.values["azure.is_incremental"] = true + + sink := &AzureSink{} + err := sink.Initialize(config, "azure.") + if err != nil { + t.Fatalf("Failed to initialize from config: %v", err) + } + + if !sink.IsIncremental() { + t.Error("Expected incremental to be true") + } +} + +// Test cleanKey function +func TestCleanKey(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {"/test/file.txt", "test/file.txt"}, + {"test/file.txt", "test/file.txt"}, + {"/", ""}, + {"", ""}, + {"/a/b/c", "a/b/c"}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result := cleanKey(tt.input) + if result != tt.expected { + t.Errorf("cleanKey(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} + +// Test entry operations (requires valid credentials) +func TestAzureSinkEntryOperations(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink entry test: credentials not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + sink := &AzureSink{} + err := sink.initialize(accountName, accountKey, testContainer, "/test") + if err != nil { + t.Fatalf("Failed to initialize: %v", err) + } + + // Test CreateEntry with directory (should be no-op) + t.Run("CreateDirectory", func(t *testing.T) { + entry := &filer_pb.Entry{ + IsDirectory: true, + } + err := sink.CreateEntry("/test/dir", entry, nil) + if err != nil { + t.Errorf("CreateEntry for directory should not error: %v", err) + } + }) + + // Test CreateEntry with file + testKey := "/test-sink-file-" + time.Now().Format("20060102-150405") + ".txt" + t.Run("CreateFile", func(t *testing.T) { + entry := &filer_pb.Entry{ + IsDirectory: false, + Content: []byte("Test content for Azure sink"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + }, + } + err := sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed to create entry: %v", err) + } + }) + + // Test UpdateEntry + t.Run("UpdateEntry", func(t *testing.T) { + oldEntry := &filer_pb.Entry{ + Content: []byte("Old content"), + } + newEntry := &filer_pb.Entry{ + Content: []byte("New content for update test"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + }, + } + found, err := sink.UpdateEntry(testKey, oldEntry, "/test", newEntry, false, nil) + if err != nil { + t.Fatalf("Failed to update entry: %v", err) + } + if !found { + t.Error("Expected found to be true") + } + }) + + // Test DeleteEntry + t.Run("DeleteFile", func(t *testing.T) { + err := sink.DeleteEntry(testKey, false, false, nil) + if err != nil { + t.Fatalf("Failed to delete entry: %v", err) + } + }) + + // Test DeleteEntry with directory marker + testDirKey := "/test-dir-" + time.Now().Format("20060102-150405") + t.Run("DeleteDirectory", func(t *testing.T) { + // First create a directory marker + entry := &filer_pb.Entry{ + IsDirectory: false, + Content: []byte(""), + } + err := sink.CreateEntry(testDirKey+"/", entry, nil) + if err != nil { + t.Logf("Warning: Failed to create directory marker: %v", err) + } + + // Then delete it + err = sink.DeleteEntry(testDirKey, true, false, nil) + if err != nil { + t.Logf("Warning: Failed to delete directory: %v", err) + } + }) +} + +// Test CreateEntry with precondition (IfUnmodifiedSince) +func TestAzureSinkPrecondition(t *testing.T) { + accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") + accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY") + testContainer := os.Getenv("AZURE_TEST_CONTAINER") + + if accountName == "" || accountKey == "" { + t.Skip("Skipping Azure sink precondition test: credentials not set") + } + if testContainer == "" { + testContainer = "seaweedfs-test" + } + + sink := &AzureSink{} + err := sink.initialize(accountName, accountKey, testContainer, "/test") + if err != nil { + t.Fatalf("Failed to initialize: %v", err) + } + + testKey := "/test-precondition-" + time.Now().Format("20060102-150405") + ".txt" + + // Create initial entry + entry := &filer_pb.Entry{ + Content: []byte("Initial content"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + }, + } + err = sink.CreateEntry(testKey, entry, nil) + if err != nil { + t.Fatalf("Failed to create initial entry: %v", err) + } + + // Try to create again with old mtime (should be skipped due to precondition) + oldEntry := &filer_pb.Entry{ + Content: []byte("Should not overwrite"), + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Old timestamp + }, + } + err = sink.CreateEntry(testKey, oldEntry, nil) + // Should either succeed (skip) or fail with precondition error + if err != nil { + t.Logf("Create with old mtime: %v (expected)", err) + } + + // Clean up + sink.DeleteEntry(testKey, false, false, nil) +} + +// Benchmark tests +func BenchmarkCleanKey(b *testing.B) { + keys := []string{ + "/simple/path.txt", + "no/leading/slash.txt", + "/", + "/complex/path/with/many/segments/file.txt", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cleanKey(keys[i%len(keys)]) + } +} + +// Test error handling with invalid credentials +func TestAzureSinkInvalidCredentials(t *testing.T) { + sink := &AzureSink{} + + err := sink.initialize("invalid-account", "aW52YWxpZGtleQ==", "test-container", "/test") + if err != nil { + t.Skip("Invalid credentials correctly rejected at initialization") + } + + // If initialization succeeded, operations should fail + entry := &filer_pb.Entry{ + Content: []byte("test"), + } + err = sink.CreateEntry("/test.txt", entry, nil) + if err == nil { + t.Log("Expected error with invalid credentials, but got none (might be cached)") + } +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 1f147e884..e3e7c0bbb 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -56,10 +56,10 @@ type IdentityAccessManagement struct { } type Identity struct { - Name string - Account *Account - Credentials []*Credential - Actions []Action + Name string + Account *Account + Credentials []*Credential + Actions []Action PrincipalArn string // ARN for IAM authorization (e.g., "arn:seaweed:iam::user/username") } diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index c6de70738..d63e10364 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -294,7 +294,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl ETag: chunk.ETag, IsCompressed: chunk.IsCompressed, // Preserve SSE metadata with updated within-part offset - SseType: chunk.SseType, + SseType: chunk.SseType, SseMetadata: sseKmsMetadata, } finalParts = append(finalParts, p) diff --git a/weed/s3api/policy_engine/types.go b/weed/s3api/policy_engine/types.go index 5f417afb4..d68b1f297 100644 --- a/weed/s3api/policy_engine/types.go +++ b/weed/s3api/policy_engine/types.go @@ -407,8 +407,6 @@ func (cs *CompiledStatement) EvaluateStatement(args *PolicyEvaluationArgs) bool return false } - - return true } diff --git a/weed/s3api/s3_list_parts_action_test.go b/weed/s3api/s3_list_parts_action_test.go index 4c0a28eff..c0e9aa8a1 100644 --- a/weed/s3api/s3_list_parts_action_test.go +++ b/weed/s3api/s3_list_parts_action_test.go @@ -35,7 +35,7 @@ func TestListPartsActionMapping(t *testing.T) { { name: "get_object_with_uploadId", method: "GET", - bucket: "test-bucket", + bucket: "test-bucket", objectKey: "test-object.txt", queryParams: map[string]string{"uploadId": "test-upload-id"}, fallbackAction: s3_constants.ACTION_READ, @@ -43,14 +43,14 @@ func TestListPartsActionMapping(t *testing.T) { description: "GET request with uploadId should map to s3:ListParts (this was the missing mapping)", }, { - name: "get_object_with_uploadId_and_other_params", - method: "GET", - bucket: "test-bucket", - objectKey: "test-object.txt", + name: "get_object_with_uploadId_and_other_params", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", queryParams: map[string]string{ - "uploadId": "test-upload-id-123", - "max-parts": "100", - "part-number-marker": "50", + "uploadId": "test-upload-id-123", + "max-parts": "100", + "part-number-marker": "50", }, fallbackAction: s3_constants.ACTION_READ, expectedAction: "s3:ListParts", @@ -107,7 +107,7 @@ func TestListPartsActionMapping(t *testing.T) { action := determineGranularS3Action(req, tc.fallbackAction, tc.bucket, tc.objectKey) // Verify the action mapping - assert.Equal(t, tc.expectedAction, action, + assert.Equal(t, tc.expectedAction, action, "Test case: %s - %s", tc.name, tc.description) }) } @@ -145,17 +145,17 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) { t.Run("policy_enforcement_precision", func(t *testing.T) { // This test documents the security improvement - before the fix, both operations // would incorrectly map to s3:GetObject, preventing fine-grained access control - + testCases := []struct { description string - queryParams map[string]string + queryParams map[string]string expectedAction string securityNote string }{ { description: "List multipart upload parts", queryParams: map[string]string{"uploadId": "upload-abc123"}, - expectedAction: "s3:ListParts", + expectedAction: "s3:ListParts", securityNote: "FIXED: Now correctly maps to s3:ListParts instead of s3:GetObject", }, { @@ -165,7 +165,7 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) { securityNote: "UNCHANGED: Still correctly maps to s3:GetObject", }, { - description: "Get object with complex upload ID", + description: "Get object with complex upload ID", queryParams: map[string]string{"uploadId": "complex-upload-id-with-hyphens-123-abc-def"}, expectedAction: "s3:ListParts", securityNote: "FIXED: Complex upload IDs now correctly detected", @@ -185,8 +185,8 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) { req.URL.RawQuery = query.Encode() action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-object") - - assert.Equal(t, tc.expectedAction, action, + + assert.Equal(t, tc.expectedAction, action, "%s - %s", tc.description, tc.securityNote) } }) @@ -196,7 +196,7 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) { func TestListPartsActionRealWorldScenarios(t *testing.T) { t.Run("large_file_upload_workflow", func(t *testing.T) { // Simulate a large file upload workflow where users need different permissions for each step - + // Step 1: Initiate multipart upload (POST with uploads query) req1 := &http.Request{ Method: "POST", @@ -206,7 +206,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { query1.Set("uploads", "") req1.URL.RawQuery = query1.Encode() action1 := determineGranularS3Action(req1, s3_constants.ACTION_WRITE, "data", "large-dataset.csv") - + // Step 2: List existing parts (GET with uploadId query) - THIS WAS THE MISSING MAPPING req2 := &http.Request{ Method: "GET", @@ -216,7 +216,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { query2.Set("uploadId", "dataset-upload-20240827-001") req2.URL.RawQuery = query2.Encode() action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "data", "large-dataset.csv") - + // Step 3: Upload a part (PUT with uploadId and partNumber) req3 := &http.Request{ Method: "PUT", @@ -227,7 +227,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { query3.Set("partNumber", "5") req3.URL.RawQuery = query3.Encode() action3 := determineGranularS3Action(req3, s3_constants.ACTION_WRITE, "data", "large-dataset.csv") - + // Step 4: Complete multipart upload (POST with uploadId) req4 := &http.Request{ Method: "POST", @@ -241,15 +241,15 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { // Verify each step has the correct action mapping assert.Equal(t, "s3:CreateMultipartUpload", action1, "Step 1: Initiate upload") assert.Equal(t, "s3:ListParts", action2, "Step 2: List parts (FIXED by this PR)") - assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part") + assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part") assert.Equal(t, "s3:CompleteMultipartUpload", action4, "Step 4: Complete upload") - + // Verify that each step requires different permissions (security principle) actions := []string{action1, action2, action3, action4} for i, action := range actions { for j, otherAction := range actions { if i != j { - assert.NotEqual(t, action, otherAction, + assert.NotEqual(t, action, otherAction, "Each multipart operation step should require different permissions for fine-grained control") } } @@ -258,7 +258,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { t.Run("edge_case_upload_ids", func(t *testing.T) { // Test various upload ID formats to ensure the fix works with real AWS-compatible upload IDs - + testUploadIds := []string{ "simple123", "complex-upload-id-with-hyphens", @@ -276,10 +276,10 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) { query := req.URL.Query() query.Set("uploadId", uploadId) req.URL.RawQuery = query.Encode() - + action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-file.bin") - - assert.Equal(t, "s3:ListParts", action, + + assert.Equal(t, "s3:ListParts", action, "Upload ID format %s should be correctly detected and mapped to s3:ListParts", uploadId) } }) diff --git a/weed/s3api/s3_sse_multipart_test.go b/weed/s3api/s3_sse_multipart_test.go index 804e4ab4a..ba67a4c5c 100644 --- a/weed/s3api/s3_sse_multipart_test.go +++ b/weed/s3api/s3_sse_multipart_test.go @@ -6,7 +6,7 @@ import ( "io" "strings" "testing" - + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index cc2fb3dfd..6a846120a 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -20,8 +20,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" weed_server "github.com/seaweedfs/seaweedfs/weed/server" - "github.com/seaweedfs/seaweedfs/weed/util/constants" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util/constants" ) // Object lock validation errors diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 1535ba207..4f1ca05be 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -15,10 +15,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/util/constants" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/constants" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 46fa2519d..a535ff16c 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -20,9 +20,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/util/constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/constants" ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) { diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 719cd4b74..a7ef8e7e9 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -29,7 +29,7 @@ const ( func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { if ms.option.VolumeGrowthDisabled { - glog.V(1).Infof("automatic volume grow disabled") + glog.V(1).Infof("automatic volume grow disabled") return } glog.V(1).Infoln("starting automatic volume grow") diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 4d246e26c..fbad37f02 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -185,18 +185,18 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl aHasChanges, bHasChanges := true, true const maxIterations = 5 iteration := 0 - + for (aHasChanges || bHasChanges) && iteration < maxIterations { iteration++ if verbose { fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id) } - + prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil { return err } - + // Detect if we're stuck in a loop with no progress if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) { fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", @@ -204,13 +204,13 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl return fmt.Errorf("sync not making progress after %d iterations", iteration) } } - + if iteration >= maxIterations && (aHasChanges || bHasChanges) { fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } - + return nil }