diff --git a/go.mod b/go.mod index 496eb1378..8666c50f8 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/klauspost/reedsolomon v1.12.4 github.com/kurin/blazer v0.5.3 github.com/lib/pq v1.10.9 - github.com/linxGnu/grocksdb v1.9.5 + github.com/linxGnu/grocksdb v1.9.7 github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-ieproxy v0.0.11 // indirect @@ -96,7 +96,7 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect - go.etcd.io/etcd/client/v3 v3.5.16 + go.etcd.io/etcd/client/v3 v3.5.17 go.mongodb.org/mongo-driver v1.16.0 go.opencensus.io v0.24.0 // indirect gocloud.dev v0.40.0 @@ -119,7 +119,7 @@ require ( modernc.org/b v1.0.0 // indirect modernc.org/mathutil v1.6.0 modernc.org/memory v1.8.0 // indirect - modernc.org/sqlite v1.33.1 + modernc.org/sqlite v1.34.1 modernc.org/strutil v1.2.0 modernc.org/token v1.1.0 // indirect ) @@ -129,8 +129,8 @@ require ( github.com/arangodb/go-driver v1.6.4 github.com/armon/go-metrics v0.4.1 github.com/aws/aws-sdk-go-v2 v1.32.4 - github.com/aws/aws-sdk-go-v2/config v1.28.0 - github.com/aws/aws-sdk-go-v2/credentials v1.17.41 + github.com/aws/aws-sdk-go-v2/config v1.28.4 + github.com/aws/aws-sdk-go-v2/credentials v1.17.45 github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3 github.com/cognusion/imaging v1.0.1 github.com/fluent/fluent-logger-golang v1.9.0 @@ -143,7 +143,7 @@ require ( github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe github.com/rabbitmq/amqp091-go v1.10.0 - github.com/rclone/rclone v1.68.1 + github.com/rclone/rclone v1.68.2 github.com/rdleal/intervalst v1.4.0 github.com/redis/go-redis/v9 v9.7.0 github.com/schollz/progressbar/v3 v3.16.0 @@ -151,7 +151,7 @@ require ( github.com/tikv/client-go/v2 v2.0.7 github.com/ydb-platform/ydb-go-sdk-auth-environ v0.5.0 github.com/ydb-platform/ydb-go-sdk/v3 v3.91.0 - go.etcd.io/etcd/client/pkg/v3 v3.5.16 + go.etcd.io/etcd/client/pkg/v3 v3.5.17 go.uber.org/atomic v1.11.0 golang.org/x/sync v0.9.0 google.golang.org/grpc/security/advancedtls v1.0.0 @@ -191,7 +191,7 @@ require ( github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc // indirect github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect @@ -203,9 +203,9 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 // indirect github.com/aws/aws-sdk-go-v2/service/sns v1.31.3 // indirect github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.0 // indirect github.com/aws/smithy-go v1.22.0 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/bradenaw/juniper v0.15.2 // indirect @@ -335,7 +335,7 @@ require ( github.com/zeebo/blake3 v0.2.3 // indirect github.com/zeebo/errs v1.3.0 // indirect go.etcd.io/bbolt v1.3.10 // indirect - go.etcd.io/etcd/api/v3 v3.5.16 // indirect + go.etcd.io/etcd/api/v3 v3.5.17 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.29.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect diff --git a/go.sum b/go.sum index ec4548866..7d9658b30 100644 --- a/go.sum +++ b/go.sum @@ -652,12 +652,12 @@ github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkO github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA= -github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ= -github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= -github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= -github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw= +github.com/aws/aws-sdk-go-v2/config v1.28.4 h1:qgD0MKmkIzZR2DrAjWJcI9UkndjR+8f6sjUQvXh0mb0= +github.com/aws/aws-sdk-go-v2/config v1.28.4/go.mod h1:LgnWnNzHZw4MLplSyEGia0WgJ/kCGD86zGCjvNpehJs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.45 h1:DUgm5lFso57E7150RBgu1JpVQoF8fAPretiDStIuVjg= +github.com/aws/aws-sdk-go-v2/credentials v1.17.45/go.mod h1:dnBpENcPC1ekZrGpSWspX+ZRGzhkvqngT2Qp5xBR1dY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 h1:woXadbf0c7enQ2UGCi8gW/WuKmE0xIzxBF/eD94jMKQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19/go.mod h1:zminj5ucw7w0r65bP6nhyOd3xL6veAUMc3ElGMoLVb4= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 h1:zeN9UtUlA6FTx0vFSayxSX32HDw73Yb6Hh2izDSFxXY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10/go.mod h1:3HKuexPDcwLWPaqpW2UR/9n8N/u/3CKcGAzSs8p8u8g= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= @@ -682,12 +682,12 @@ github.com/aws/aws-sdk-go-v2/service/sns v1.31.3 h1:eSTEdxkfle2G98FE+Xl3db/XAXXV github.com/aws/aws-sdk-go-v2/service/sns v1.31.3/go.mod h1:1dn0delSO3J69THuty5iwP0US2Glt0mx2qBBlI13pvw= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3 h1:Vjqy5BZCOIsn4Pj8xzyqgGmsSqzz7y/WXbN3RgOoVrc= github.com/aws/aws-sdk-go-v2/service/sqs v1.34.3/go.mod h1:L0enV3GCRd5iG9B64W35C4/hwsCB00Ib+DKVGTadKHI= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI= -github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo= -github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.5 h1:HJwZwRt2Z2Tdec+m+fPjvdmkq2s9Ra+VR0hjF7V2o40= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.5/go.mod h1:wrMCEwjFPms+V86TCQQeOxQF/If4vT44FGIOFiMC2ck= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 h1:zcx9LiGWZ6i6pjdcoE9oXAB6mUdeyC36Ia/QEiIvYdg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4/go.mod h1:Tp/ly1cTjRLGBBmNccFumbZ8oqpZlpdhFf80SrRh4is= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.0 h1:s7LRgBqhwLaxcocnAniBJp7gaAB+4I4vHzqUqjH18yc= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.0/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8= github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -1243,8 +1243,8 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/linxGnu/grocksdb v1.9.5 h1:Pqx1DTR5bdJSZic8CJwc9UNZR60qoAOK09feMg4SoaI= -github.com/linxGnu/grocksdb v1.9.5/go.mod h1:QYiYypR2d4v63Wj1adOOfzglnoII0gLj3PNh4fZkcFA= +github.com/linxGnu/grocksdb v1.9.7 h1:Bp2r1Yti/IXxEobZZnDooXAui/Q+5gVqgQMenLWyDUw= +github.com/linxGnu/grocksdb v1.9.7/go.mod h1:QYiYypR2d4v63Wj1adOOfzglnoII0gLj3PNh4fZkcFA= github.com/lpar/date v1.0.0 h1:bq/zVqFTUmsxvd/CylidY4Udqpr9BOFrParoP6p0x/I= github.com/lpar/date v1.0.0/go.mod h1:KjYe0dDyMQTgpqcUz4LEIeM5VZwhggjVx/V2dtc8NSo= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= @@ -1423,8 +1423,8 @@ github.com/quic-go/quic-go v0.40.1 h1:X3AGzUNFs0jVuO3esAGnTfvdgvL4fq655WaOi1snv1 github.com/quic-go/quic-go v0.40.1/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/rclone/rclone v1.68.1 h1:vlEOAuPv4gGxWECM0NIaCwBNUt3ZQY7mCsyBtZjY+68= -github.com/rclone/rclone v1.68.1/go.mod h1:T8XKOt/2Fb9INROUtFH9eF9q9o9rI1W2qTrW2bw2cYU= +github.com/rclone/rclone v1.68.2 h1:0m2tKzfTnoZRhRseRFO3CsLa5ZCXYz3xWb98ke3dz98= +github.com/rclone/rclone v1.68.2/go.mod h1:DuhVHaYIVgIdtIg8vEVt/IBwyqPJUaarr/+nG8Zg+Fg= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rdleal/intervalst v1.4.0 h1:DWrcNJ9kxPOYnXxExrQkta1xsKFRN0rTOVXkBDJzVYg= @@ -1634,12 +1634,12 @@ go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg= go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= -go.etcd.io/etcd/api/v3 v3.5.16 h1:WvmyJVbjWqK4R1E+B12RRHz3bRGy9XVfh++MgbN+6n0= -go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= -go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= -go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= -go.etcd.io/etcd/client/v3 v3.5.16 h1:sSmVYOAHeC9doqi0gv7v86oY/BTld0SEFGaxsU9eRhE= -go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= +go.etcd.io/etcd/api/v3 v3.5.17 h1:cQB8eb8bxwuxOilBpMJAEo8fAONyrdXTHUNcMd8yT1w= +go.etcd.io/etcd/api/v3 v3.5.17/go.mod h1:d1hvkRuXkts6PmaYk2Vrgqbv7H4ADfAKhyJqHNLJCB4= +go.etcd.io/etcd/client/pkg/v3 v3.5.17 h1:XxnDXAWq2pnxqx76ljWwiQ9jylbpC4rvkAeRVOUKKVw= +go.etcd.io/etcd/client/pkg/v3 v3.5.17/go.mod h1:4DqK1TKacp/86nJk4FLQqo6Mn2vvQFBmruW3pP14H/w= +go.etcd.io/etcd/client/v3 v3.5.17 h1:o48sINNeWz5+pjy/Z0+HKpj/xSnBkuVhVvXkjEXbqZY= +go.etcd.io/etcd/client/v3 v3.5.17/go.mod h1:j2d4eXTHWkT2ClBgnnEPm/Wuu7jsqku41v9DZ3OtjQo= go.mongodb.org/mongo-driver v1.16.0 h1:tpRsfBJMROVHKpdGyc1BBEzzjDUWjItxbVSZ8Ls4BQ4= go.mongodb.org/mongo-driver v1.16.0/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -2500,8 +2500,8 @@ modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4= -modernc.org/sqlite v1.33.1 h1:trb6Z3YYoeM9eDL1O8do81kP+0ejv+YzgyFo+Gwy0nM= -modernc.org/sqlite v1.33.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k= +modernc.org/sqlite v1.34.1 h1:u3Yi6M0N8t9yKRDwhXcyp1eS5/ErhPTBggxWFuR6Hfk= +modernc.org/sqlite v1.34.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k= modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= diff --git a/k8s/charts/seaweedfs/Chart.yaml b/k8s/charts/seaweedfs/Chart.yaml index 24f46ffec..dc813df3c 100644 --- a/k8s/charts/seaweedfs/Chart.yaml +++ b/k8s/charts/seaweedfs/Chart.yaml @@ -1,6 +1,6 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "3.79" +appVersion: "3.80" # Dev note: Trigger a helm chart release by `git tag -a helm-` -version: 4.0.379 +version: 4.0.380 diff --git a/k8s/charts/seaweedfs/templates/post-install-bucket-hook.yaml b/k8s/charts/seaweedfs/templates/post-install-bucket-hook.yaml index 2260bd84a..44d650898 100644 --- a/k8s/charts/seaweedfs/templates/post-install-bucket-hook.yaml +++ b/k8s/charts/seaweedfs/templates/post-install-bucket-hook.yaml @@ -32,9 +32,9 @@ spec: - name: WEED_CLUSTER_DEFAULT value: "sw" - name: WEED_CLUSTER_SW_MASTER - value: "{{ template "seaweedfs.name" . }}-master.{{ .Release.Namespace }}:9333" + value: "{{ template "seaweedfs.name" . }}-master.{{ .Release.Namespace }}:{{ .Values.master.port }}" - name: WEED_CLUSTER_SW_FILER - value: "{{ template "seaweedfs.name" . }}-filer-client.{{ .Release.Namespace }}:8888" + value: "{{ template "seaweedfs.name" . }}-filer-client.{{ .Release.Namespace }}:{{ .Values.filer.port }}" - name: POD_IP valueFrom: fieldRef: @@ -53,6 +53,26 @@ spec: - "/bin/sh" - "-ec" - | + wait_for_service() { + local url=$1 + local max_attempts=60 # 5 minutes total (5s * 60) + local attempt=1 + + echo "Waiting for service at $url..." + while [ $attempt -le $max_attempts ]; do + if wget -q --spider "$url" >/dev/null 2>&1; then + echo "Service at $url is up!" + return 0 + fi + echo "Attempt $attempt: Service not ready yet, retrying in 5s..." + sleep 5 + attempt=$((attempt + 1)) + done + echo "Service at $url failed to become ready within 5 minutes" + exit 1 + } + wait_for_service "http://$WEED_CLUSTER_SW_MASTER{{ .Values.master.readinessProbe.httpGet.path }}" + wait_for_service "http://$WEED_CLUSTER_SW_FILER{{ .Values.filer.readinessProbe.httpGet.path }}" {{- range $reg, $props := $.Values.filer.s3.createBuckets }} exec /bin/echo \ "s3.bucket.create --name {{ $props.name }}" |\ diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 7e5c3f732..03de3f5e1 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.seaweedfs seaweedfs-client - 3.71 + 3.80 SeaweedFS Java Client A java client for SeaweedFS. @@ -31,9 +31,9 @@ - 3.16.3 + 3.25.5 - 1.53.0 + 1.68.1 32.0.0-jre diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 7e5c3f732..03de3f5e1 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.seaweedfs seaweedfs-client - 3.71 + 3.80 SeaweedFS Java Client A java client for SeaweedFS. @@ -31,9 +31,9 @@ - 3.16.3 + 3.25.5 - 1.53.0 + 1.68.1 32.0.0-jre diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index f6751512f..60d07560f 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.seaweedfs seaweedfs-client - 3.71 + 3.80 org.sonatype.oss @@ -14,9 +14,9 @@ - 3.9.1 + 3.25.5 - 1.23.0 + 1.68.1 28.0-jre diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java index e563c0ccc..18826dd48 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java +++ b/other/java/client/src/main/java/seaweedfs/client/ReadChunks.java @@ -14,20 +14,23 @@ public class ReadChunks { points.add(new Point(chunk.getOffset(), chunk, true)); points.add(new Point(chunk.getOffset() + chunk.getSize(), chunk, false)); } + Collections.sort(points, new Comparator() { @Override public int compare(Point a, Point b) { - int x = (int) (a.x - b.x); - if (a.x != b.x) { - return (int) (a.x - b.x); - } - if (a.ts != b.ts) { - return (int) (a.ts - b.ts); + int xComparison = Long.compare(a.x, b.x); + if (xComparison != 0) { + return xComparison; } - if (!a.isStart) { - return -1; + + // If x values are equal, compare ts + int tsComparison = Long.compare(a.ts, b.ts); + if (tsComparison != 0) { + return tsComparison; } - return 1; + + // If both x and ts are equal, prioritize start points + return Boolean.compare(b.isStart, a.isStart); // b.isStart first to prioritize starts } }); diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index 534febb6e..5c0981eae 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ com.seaweedfs seaweedfs-client - 3.71 + 3.80 compile com.seaweedfs seaweedfs-hadoop2-client - 3.71 + 3.80 compile diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index bba755a11..fd84befa0 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -327,7 +327,7 @@ - 3.71 + 3.80 3.2.4 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index e3aa2ab44..50fbdbc06 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 3.71 + 3.80 3.2.4 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index dddf529dc..decf55a59 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -327,7 +327,7 @@ - 3.71 + 3.80 3.2.4 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 067d649ff..3faba03be 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 3.71 + 3.80 3.2.4 diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 1344dfd2c..380540fd9 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -1,12 +1,15 @@ package command import ( + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/source" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/grpc" "regexp" "strings" @@ -14,16 +17,18 @@ import ( ) type FilerBackupOptions struct { - isActivePassive *bool - filer *string - path *string - excludePaths *string - excludeFileName *string - debug *bool - proxyByFiler *bool - doDeleteFiles *bool - timeAgo *time.Duration - retentionDays *int + isActivePassive *bool + filer *string + path *string + excludePaths *string + excludeFileName *string + debug *bool + proxyByFiler *bool + doDeleteFiles *bool + disableErrorRetry *bool + ignore404Error *bool + timeAgo *time.Duration + retentionDays *int } var ( @@ -41,6 +46,8 @@ func init() { filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days") + filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print") + filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer") } var cmdFilerBackup = &Command{ @@ -130,7 +137,23 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) - processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + var processEventFn func(*filer_pb.SubscribeMetadataResponse) error + if *backupOption.ignore404Error { + processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error { + err := processEventFnGenerated(resp) + if err == nil { + return nil + } + if errors.Is(err, http.ErrNotFound) { + glog.V(0).Infof("got 404 error, ignore it: %s", err.Error()) + return nil + } + return err + } + } else { + processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug) + } processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) @@ -154,6 +177,11 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti prefix = prefix + "/" } + eventErrorType := pb.RetryForeverOnError + if *backupOption.disableErrorRetry { + eventErrorType = pb.TrivialOnError + } + metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "backup_" + dataSink.GetName(), ClientId: clientId, @@ -164,7 +192,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti DirectoriesToWatch: nil, StartTsNs: startFrom.UnixNano(), StopTsNs: 0, - EventErrorType: pb.RetryForeverOnError, + EventErrorType: eventErrorType, } return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 90204af4a..9b489297c 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -4,6 +4,12 @@ import ( "context" "errors" "fmt" + "os" + "regexp" + "strings" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -16,11 +22,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/grace" "google.golang.org/grpc" - "os" - "regexp" - "strings" - "sync/atomic" - "time" ) type SyncOptions struct { @@ -403,11 +404,11 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str return nil } - if !strings.HasPrefix(resp.Directory, sourcePath) { + if !strings.HasPrefix(resp.Directory+"/", sourcePath) { return nil } for _, excludePath := range excludePaths { - if strings.HasPrefix(resp.Directory, excludePath) { + if strings.HasPrefix(resp.Directory+"/", excludePath) { return nil } } @@ -436,7 +437,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry1 : %v", err) + return fmt.Errorf("create entry1 : %w", err) } else { return nil } @@ -454,7 +455,11 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // new key is also in the watched directory if doDeleteFiles { oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + if strings.HasSuffix(sourcePath, "/") { + message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath)-1:]) + } else { + message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + } foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) if foundExisting { return err @@ -462,13 +467,13 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // not able to find old entry if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %v", oldKey, err) + return fmt.Errorf("delete old entry %v: %w", oldKey, err) } } // create the new entry newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry2 : %v", err) + return fmt.Errorf("create entry2 : %w", err) } else { return nil } @@ -486,7 +491,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // new key is in the watched directory key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil { - return fmt.Errorf("create entry3 : %v", err) + return fmt.Errorf("create entry3 : %w", err) } else { return nil } diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml index 10d9d1914..c9086b0f7 100644 --- a/weed/command/scaffold/master.toml +++ b/weed/command/scaffold/master.toml @@ -49,6 +49,7 @@ copy_1 = 7 # create 1 x 7 = 7 actual volumes copy_2 = 6 # create 2 x 6 = 12 actual volumes copy_3 = 3 # create 3 x 3 = 9 actual volumes copy_other = 1 # create n x 1 = n actual volumes +threshold = 0.9 # create threshold # configuration flags for replication [master.replication] diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 1832ddb73..bf9d3394e 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -78,12 +78,12 @@ func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err e ctx := context.Background() store.client, err = elastic.NewClient(options...) if err != nil { - return fmt.Errorf("init elastic %v.", err) + return fmt.Errorf("init elastic %v", err) } if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok { _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx) if err != nil { - return fmt.Errorf("create index(%s) %v.", indexKV, err) + return fmt.Errorf("create index(%s) %v", indexKV, err) } } return nil @@ -114,7 +114,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) value, err := jsoniter.Marshal(esEntry) if err != nil { glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) - return fmt.Errorf("insert entry %v.", err) + return fmt.Errorf("insert entry marshal %v", err) } _, err = store.client.Index(). Index(index). @@ -124,7 +124,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) Do(ctx) if err != nil { glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) - return fmt.Errorf("insert entry %v.", err) + return fmt.Errorf("insert entry %v", err) } return nil } @@ -194,7 +194,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e } } glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) - return fmt.Errorf("delete entry %v.", err) + return fmt.Errorf("delete entry %v", err) } func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go index 886125431..86262bc0f 100644 --- a/weed/filer/elastic/v7/elastic_store_kv.go +++ b/weed/filer/elastic/v7/elastic_store_kv.go @@ -26,7 +26,7 @@ func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) } } glog.Errorf("delete key(id:%s) %v.", string(key), err) - return fmt.Errorf("delete key %v.", err) + return fmt.Errorf("delete key %v", err) } func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { @@ -53,7 +53,7 @@ func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) val, err := jsoniter.Marshal(esEntry) if err != nil { glog.Errorf("insert key(%s) %v.", string(key), err) - return fmt.Errorf("insert key %v.", err) + return fmt.Errorf("insert key %v", err) } _, err = store.client.Index(). Index(indexKV). diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 350821757..bdd941117 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -4,6 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" + "io" "os" "sync" ) @@ -130,7 +131,10 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + if err == io.EOF && n == int(logicStop-logicStart) { + err = nil + } glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) break } diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index b4bac5976..7526a6e79 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -44,7 +44,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { { mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) - assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, "text/plain; charset=utf-8", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize) diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 6fecdcf2d..8a274d72c 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -228,8 +228,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } dataReader := r.Body + rAuthType := getRequestAuthType(r) if s3a.iam.isEnabled() { - rAuthType := getRequestAuthType(r) var s3ErrCode s3err.ErrorCode switch rAuthType { case authTypeStreamingSigned: @@ -243,6 +243,11 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ s3err.WriteErrorResponse(w, r, s3ErrCode) return } + } else { + if authTypeStreamingSigned == rAuthType { + s3err.WriteErrorResponse(w, r, s3err.ErrAuthNotSetup) + return + } } defer dataReader.Close() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index e2e6cda42..cc9bc3c51 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -43,12 +43,12 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { func (ms *MasterServer) ProcessGrowRequest() { go func() { ctx := context.Background() - firstRun := true + firstRun := true for { if firstRun { - firstRun = false + firstRun = false } else { - time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second) + time.Sleep(5*time.Minute + time.Duration(30*rand.Float32())*time.Second) } if !ms.Topo.IsLeader() { continue @@ -71,7 +71,7 @@ func (ms *MasterServer) ProcessGrowRequest() { case mustGrow > 0: vgr.WritableVolumeCount = uint32(mustGrow) _, err = ms.VolumeGrow(ctx, vgr) - case crowded+volumeGrowStepCount >= writable: + case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold: vgr.WritableVolumeCount = volumeGrowStepCount _, err = ms.VolumeGrow(ctx, vgr) default: diff --git a/weed/server/master_ui/master.html b/weed/server/master_ui/master.html index 2e37c3a62..40d49991b 100644 --- a/weed/server/master_ui/master.html +++ b/weed/server/master_ui/master.html @@ -33,14 +33,14 @@ {{ with .RaftServer }} Leader - {{ .Leader }} + {{ .Leader }} Other Masters @@ -88,9 +88,9 @@ {{ $dc.Id }} {{ $rack.Id }} - {{ $dn.Url }} + {{ $dn.Url }} {{ if ne $dn.PublicUrl $dn.Url }} - / {{ $dn.PublicUrl }} + / {{ $dn.PublicUrl }} {{ end }} {{ $dn.Volumes }} diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index a6dcc57d7..d32c5efdf 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -3,6 +3,7 @@ package master_ui import ( _ "embed" "html/template" + "strings" ) //go:embed master.html @@ -11,5 +12,17 @@ var masterHtml string //go:embed masterNewRaft.html var masterNewRaftHtml string -var StatusTpl = template.Must(template.New("status").Parse(masterHtml)) +var templateFunctions = template.FuncMap{ + "url": func(input string) string { + + if !strings.HasPrefix(input, "http://") && !strings.HasPrefix(input, "https://") { + return "http://" + input + } + + return input + }, +} + +var StatusTpl = template.Must(template.New("status").Funcs(templateFunctions).Parse(masterHtml)) + var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml)) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index fd7a1acdc..7328fffe7 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,7 +3,7 @@ package shell import ( "context" "fmt" - "math" + "math/rand/v2" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,11 +12,32 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "golang.org/x/exp/slices" "google.golang.org/grpc" ) +type DataCenterId string +type EcNodeId string +type RackId string + +type EcNode struct { + info *master_pb.DataNodeInfo + dc DataCenterId + rack RackId + freeEcSlot int +} +type CandidateEcNode struct { + ecNode *EcNode + shardCount int +} + +type EcRack struct { + ecNodes map[EcNodeId]*EcNode + freeEcSlot int +} + func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { if !commandEnv.isLocked() { @@ -68,7 +89,6 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { - fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), @@ -109,11 +129,11 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { +func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, dn := range rack.DataNodeInfos { - fn(dc.Id, RackId(rack.Id), dn) + fn(DataCenterId(dc.Id), RackId(rack.Id), dn) } } } @@ -131,11 +151,6 @@ func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { }) } -type CandidateEcNode struct { - ecNode *EcNode - shardCount int -} - // if the index node changed the freeEcSlot, need to keep every EcNode still sorted func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) { for i := index - 1; i >= 0; i-- { @@ -179,16 +194,6 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos) } -type RackId string -type EcNodeId string - -type EcNode struct { - info *master_pb.DataNodeInfo - dc string - rack RackId - freeEcSlot int -} - func (ecNode *EcNode) localShardIdCount(vid uint32) int { for _, diskInfo := range ecNode.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { @@ -201,13 +206,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -type EcRack struct { - ecNodes map[EcNodeId]*EcNode - freeEcSlot int -} - func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -224,8 +223,8 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes } func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { - eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - if selectedDataCenter != "" && selectedDataCenter != dc { + eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } @@ -283,8 +282,12 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n }) } -func ceilDivide(total, n int) int { - return int(math.Ceil(float64(total) / float64(n))) +func ceilDivide(a, b int) int { + var r int + if (a % b) != 0 { + r = 1 + } + return (a / b) + r } func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { @@ -471,16 +474,19 @@ func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra return nil } -func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { +func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { + return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { + shardBits := findEcVolumeShards(ecNode, vid) + return string(ecNode.rack), shardBits.ShardIdCount() + }) +} +func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) - return string(ecNode.rack), shardBits.ShardIdCount() - }) + rackToShardCount := countShardsByRack(vid, locations) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -488,16 +494,18 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode) for rackId, count := range rackToShardCount { - if count > averageShardsPerEcRack { - possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { - ecShardsToMove[shardId] = ecNode - } + if count <= averageShardsPerEcRack { + continue + } + possibleEcNodes := rackEcNodesWithVid[rackId] + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + ecShardsToMove[shardId] = ecNode } } for shardId, ecNode := range ecShardsToMove { - rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack) + // TODO: consider volume replica info when balancing racks + rackId := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack) if rackId == "" { fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id) continue @@ -519,23 +527,44 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } -func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId { - - // TODO later may need to add some randomness +func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId { + targets := []RackId{} + targetShards := -1 + for _, shards := range rackToShardCount { + if shards > targetShards { + targetShards = shards + } + } for rackId, rack := range rackToEcNodes { - if rackToShardCount[string(rackId)] >= averageShardsPerEcRack { - continue - } + shards := rackToShardCount[string(rackId)] if rack.freeEcSlot <= 0 { + // No EC shards slots left :( continue } - - return rackId + if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount { + // Don't select racks with more EC shards for the target volume than the replicaton limit. + continue + } + if shards >= averageShardsPerEcRack { + // Keep EC shards across racks as balanced as possible. + continue + } + if shards < targetShards { + // Favor racks with less shards, to ensure an uniform distribution. + targets = nil + targetShards = shards + } + if shards == targetShards { + targets = append(targets, rackId) + } } - return "" + if len(targets) == 0 { + return "" + } + return targets[rand.IntN(len(targets))] } func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { @@ -772,6 +801,37 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } +// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default? +func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) { + for _, ecNode := range nodes { + for _, diskInfo := range ecNode.info.DiskInfos { + for _, volumeInfo := range diskInfo.VolumeInfos { + if needle.VolumeId(volumeInfo.Id) != vid { + continue + } + return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + } + } + } + + return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) +} + +func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { + var resp *master_pb.GetMasterConfigurationResponse + var err error + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + return err + }) + if err != nil { + return nil, err + } + + return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) +} + func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go new file mode 100644 index 000000000..bdce47bf8 --- /dev/null +++ b/weed/shell/command_ec_common_test.go @@ -0,0 +1,133 @@ +package shell + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +var ( + topology1 = parseOutput(topoData) + topology2 = parseOutput(topoData2) + topologyEc = parseOutput(topoDataEc) +) + +func TestEcDistribution(t *testing.T) { + + // find out all volume servers with one slot left. + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "") + + sortEcNodesByFreeslotsDescending(ecNodes) + + if totalFreeEcSlots < erasure_coding.TotalShardsCount { + t.Errorf("not enough free ec shard slots: %d", totalFreeEcSlots) + } + allocatedDataNodes := ecNodes + if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { + allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] + } + + for _, dn := range allocatedDataNodes { + // fmt.Printf("info %+v %+v\n", dn.info, dn) + fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot) + } +} + +func TestVolumeIdToReplicaPlacement(t *testing.T) { + testCases := []struct { + topology *master_pb.TopologyInfo + vid string + want string + wantErr string + }{ + {topology1, "", "", "failed to resolve replica placement for volume ID 0"}, + {topology1, "0", "", "failed to resolve replica placement for volume ID 0"}, + {topology1, "1", "100", ""}, + {topology1, "296", "100", ""}, + {topology2, "", "", "failed to resolve replica placement for volume ID 0"}, + {topology2, "19012", "", "failed to resolve replica placement for volume ID 19012"}, + {topology2, "6271", "002", ""}, + {topology2, "17932", "002", ""}, + } + + for _, tc := range testCases { + vid, _ := needle.NewVolumeId(tc.vid) + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes) + + if tc.wantErr == "" && gotErr != nil { + t.Errorf("expected no error for volume %q, got %q", tc.vid, gotErr.Error()) + continue + } + if tc.wantErr != "" { + if gotErr == nil { + t.Errorf("got no error for volume %q, expected %q", tc.vid, tc.wantErr) + continue + } + if gotErr.Error() != tc.wantErr { + t.Errorf("expected error %q for volume %q, got %q", tc.wantErr, tc.vid, gotErr.Error()) + continue + } + } + + if got == nil { + if tc.want != "" { + t.Errorf("expected replica placement %q for volume %q, got nil", tc.want, tc.vid) + } + continue + } + want, _ := super_block.NewReplicaPlacementFromString(tc.want) + if !got.Equals(want) { + t.Errorf("got replica placement %q for volune %q, want %q", got.String(), tc.vid, want.String()) + } + } +} + +func TestPickRackToBalanceShardsInto(t *testing.T) { + testCases := []struct { + topology *master_pb.TopologyInfo + vid string + wantOneOf []string + }{ + // Non-EC volumes. We don't care about these, but the function should return all racks as a safeguard. + {topologyEc, "", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + {topologyEc, "6225", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + {topologyEc, "6226", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + {topologyEc, "6241", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + {topologyEc, "6242", []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}}, + // EC volumes. + {topologyEc, "9577", []string{"rack1", "rack2", "rack3"}}, + {topologyEc, "10457", []string{"rack1"}}, + {topologyEc, "12737", []string{"rack2"}}, + {topologyEc, "14322", []string{"rack3"}}, + } + + for _, tc := range testCases { + vid, _ := needle.NewVolumeId(tc.vid) + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + racks := collectRacks(ecNodes) + + locations := ecNodes + rackToShardCount := countShardsByRack(vid, locations) + averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) + + got := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack) + if string(got) == "" && len(tc.wantOneOf) == 0 { + continue + } + found := false + for _, want := range tc.wantOneOf { + if got := string(got); got == want { + found = true + break + } + } + if !(found) { + t.Errorf("expected one of %v for volume %q, got %q", tc.wantOneOf, tc.vid, got) + } + } +} diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 02d0f316d..d3e985a2f 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,11 +4,12 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -262,7 +263,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { vidMap := make(map[uint32]bool) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Collection == selectedCollection { @@ -282,7 +283,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6edd57f45..0b60694fc 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -85,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr if !*forceChanges { var nodeCount int - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { nodeCount++ }) if nodeCount < erasure_coding.ParityShardsCount { @@ -309,7 +309,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage) vidMap := make(map[uint32]bool) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { // ignore remote volumes diff --git a/weed/shell/command_ec_encode_test.go b/weed/shell/command_ec_encode_test.go deleted file mode 100644 index 346e2af14..000000000 --- a/weed/shell/command_ec_encode_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package shell - -import ( - "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" - "testing" -) - -func TestEcDistribution(t *testing.T) { - - topologyInfo := parseOutput(topoData) - - // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topologyInfo, "") - - sortEcNodesByFreeslotsDescending(ecNodes) - - if totalFreeEcSlots < erasure_coding.TotalShardsCount { - println("not enough free ec shard slots", totalFreeEcSlots) - } - allocatedDataNodes := ecNodes - if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { - allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] - } - - for _, dn := range allocatedDataNodes { - // fmt.Printf("info %+v %+v\n", dn.info, dn) - fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot) - } - -} diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 25380ddca..ef9461ef0 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -129,7 +129,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod Id: dataNodeId, DiskInfos: make(map[string]*master_pb.DiskInfo), }, - dc: dc, + dc: DataCenterId(dc), rack: RackId(rack), freeEcSlot: freeEcSlot, } diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index eb401aab1..d35f63c0a 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -197,44 +197,58 @@ func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterC func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) { plan := make(map[needle.VolumeId]needle.VolumeId) - volumes := maps.Keys(c.volumes) - sort.Slice(volumes, func(a, b int) bool { - return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size + volumeIds := maps.Keys(c.volumes) + sort.Slice(volumeIds, func(a, b int) bool { + return c.volumes[volumeIds[b]].Size < c.volumes[volumeIds[a]].Size }) - l := len(volumes) + l := len(volumeIds) for i := 0; i < l; i++ { - volume := c.volumes[volumes[i]] + volume := c.volumes[volumeIds[i]] if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) { - volumes = slices.Delete(volumes, i, i+1) + + if fromVolumeId != 0 && volumeIds[i] == fromVolumeId || toVolumeId != 0 && volumeIds[i] == toVolumeId { + if volume.GetReadOnly() { + return nil, fmt.Errorf("volume %d is readonly", volumeIds[i]) + } + if c.getVolumeSize(volume) == 0 { + return nil, fmt.Errorf("volume %d is empty", volumeIds[i]) + } + } + volumeIds = slices.Delete(volumeIds, i, i+1) i-- l-- } } for i := l - 1; i >= 0; i-- { - src := volumes[i] + src := volumeIds[i] if fromVolumeId != 0 && src != fromVolumeId { continue } for j := 0; j < i; j++ { - condidate := volumes[j] - if toVolumeId != 0 && condidate != toVolumeId { + candidate := volumeIds[j] + if toVolumeId != 0 && candidate != toVolumeId { continue } - if _, moving := plan[condidate]; moving { + if _, moving := plan[candidate]; moving { continue } - compatible, err := c.volumesAreCompatible(src, condidate) + compatible, err := c.volumesAreCompatible(src, candidate) if err != nil { return nil, err } if !compatible { + fmt.Printf("volume %d is not compatible with volume %d\n", src, candidate) continue } - if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit { + if c.getVolumeSizeBasedOnPlan(plan, candidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit { + fmt.Printf("volume %d (%d MB) merge into volume %d (%d MB) exceeds volume size limit (%d MB)\n", + src, c.getVolumeSizeById(src)/1024/1024, + candidate, c.getVolumeSizeById(candidate)/1024/1024, + c.volumeSizeLimit/1024/1024) continue } - plan[src] = condidate + plan[src] = candidate break } } diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index ea9f86c3c..2cd7f06dc 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -5,6 +5,12 @@ import ( "context" "flag" "fmt" + "io" + "math" + "strings" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -15,11 +21,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "go.uber.org/atomic" "golang.org/x/exp/slices" - "io" - "math" - "strings" - "sync" - "time" ) func init() { @@ -114,7 +115,7 @@ func (c *commandFsVerify) collectVolumeIds() error { if err != nil { return err } - eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, nodeInfo *master_pb.DataNodeInfo) { for _, diskInfo := range nodeInfo.DiskInfos { for _, vi := range diskInfo.VolumeInfos { volumeServer := pb.NewServerAddressFromDataNode(nodeInfo) diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 4e60f6ff8..f1a0c4450 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -2,9 +2,10 @@ package shell import ( "fmt" + "testing" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" - "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" @@ -275,7 +276,7 @@ func TestVolumeSelection(t *testing.T) { func TestDeleteEmptySelection(t *testing.T) { topologyInfo := parseOutput(topoData) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 { diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 29d7ebac4..bc1ca5304 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -5,10 +5,11 @@ import ( "errors" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -72,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern) // find all data nodes with volumes that needs replication change - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { var targetVolumeIds []uint32 for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go index fc1d50e64..a81ebe6dd 100644 --- a/weed/shell/command_volume_delete_empty.go +++ b/weed/shell/command_volume_delete_empty.go @@ -2,13 +2,14 @@ package shell import ( "flag" + "io" + "log" + "time" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" - "io" - "log" - "time" ) func init() { @@ -59,7 +60,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri quietSeconds := int64(*quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2c35b3bcd..61c4ac990 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -73,12 +73,13 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return nil } - if err = commandEnv.confirmIsLocked(args); err != nil { + commandEnv.noLock = *skipChange + takeAction := !*skipChange + + if err = commandEnv.confirmIsLocked(args); takeAction && err != nil { return } - takeAction := !*skipChange - underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { fixedVolumeReplicas := map[string]int{} @@ -132,7 +133,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { - // find the most under populated data nodes + // find the most underpopulated data nodes fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) if err != nil { return err @@ -179,8 +180,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - loc := newLocation(dc, string(rack), dn) + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(string(dc), string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 30d3ecd11..dbd814309 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -651,7 +651,7 @@ func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[string]map[ return } - eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, t *master_pb.DataNodeInfo) { var volumeCount, ecShardCount int dataNodeId := t.GetId() for _, diskInfo := range t.DiskInfos { diff --git a/weed/shell/command_volume_list_test.go b/weed/shell/command_volume_list_test.go index 1c8368229..4d67ada01 100644 --- a/weed/shell/command_volume_list_test.go +++ b/weed/shell/command_volume_list_test.go @@ -2,15 +2,18 @@ package shell import ( _ "embed" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" + //"google.golang.org/protobuf/proto" - "github.com/golang/protobuf/proto" "strconv" "strings" "testing" + "github.com/golang/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -127,3 +130,6 @@ var topoData string //go:embed volume.list2.txt var topoData2 string + +//go:embed volume.ecshards.txt +var topoDataEc string diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index cf9991695..2ddd3f625 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -14,6 +14,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" ) @@ -61,12 +63,18 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. targetNodeStr := volMoveCommand.String("target", "", "the target volume server :") diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") ioBytePerSecond := volMoveCommand.Int64("ioBytePerSecond", 0, "limit the speed of move") + noLock := volMoveCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk") + if err = volMoveCommand.Parse(args); err != nil { return nil } - if err = commandEnv.confirmIsLocked(args); err != nil { - return + if *noLock { + commandEnv.noLock = true + } else { + if err = commandEnv.confirmIsLocked(args); err != nil { + return + } } sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr) @@ -169,7 +177,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl if resp.LastAppendAtNs != 0 { lastAppendAtNs = resp.LastAppendAtNs } else { - fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes) + fmt.Fprintf(writer, "%s => %s volume %d processed %s\n", sourceVolumeServer, targetVolumeServer, volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes))) } } diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 2878d16fe..9cea40eb2 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -90,7 +90,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { vidMap := make(map[uint32]bool) - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index d5087f0ec..46da39eef 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -292,7 +292,7 @@ func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeS fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) vidMap := make(map[uint32]bool) - eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { // check collection name pattern diff --git a/weed/shell/commands.go b/weed/shell/commands.go index dbbd86f0e..264dd4818 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -82,7 +82,7 @@ func (ce *CommandEnv) isLocked() bool { return true } if ce.noLock { - return false + return true } return ce.locker.IsLocked() } diff --git a/weed/shell/volume.ecshards.txt b/weed/shell/volume.ecshards.txt new file mode 100644 index 000000000..2070187da --- /dev/null +++ b/weed/shell/volume.ecshards.txt @@ -0,0 +1,134 @@ +Topology volumeSizeLimit:1024 MB hdd(volume:15900/25063 active:15900 free:9163 remote:0) + DataCenter DefaultDataCenter hdd(volume:15900/25063 active:15900 free:9163 remote:0) + Rack rack1 hdd(volume:15900/25063 active:15900 free:9163 remote:0) + DataNode 172.19.0.10:8702 hdd(volume:7/2225 active:7 free:2225 remote:0) + Disk hdd(volume:7/2232 active:7 free:2225 remote:0) + volume id:6225 size:24404408 file_count:275 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664897660 + volume id:6226 size:20871152 file_count:258 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664888660 + volume id:6241 size:34861224 file_count:274 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664909248 + volume id:6242 size:40460472 file_count:236 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664906607 + ec volume id:12737 collection:s3qldata shards:[3] + ec volume id:14322 collection:s3qldata shards:[5] + ec volume id:9577 collection:s3qldata shards:[11] + Disk hdd total size:1737345132344 file_count:533580 deleted_file:10764 deleted_bytes:22028207276 + DataNode 172.19.0.10:8702 total size:1737345132344 file_count:533580 deleted_file:10764 deleted_bytes:22028207276 + Rack rack1 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + Rack rack2 hdd(volume:3/25063 active:3 free:25060 remote:0) + DataNode 172.19.0.13:8701 hdd(volume:3/2187 active:3 free:2184 remote:0) + Disk hdd(volume:3/2187 active:3 free:2184 remote:0) + volume id:6241 size:34861256 file_count:275 delete_count:1 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664909248 + ec volume id:10457 collection:s3qldata shards:[12] + ec volume id:14322 collection:s3qldata shards:[10] + ec volume id:9577 collection:s3qldata shards:[10] + Disk hdd total size:1695600546816 file_count:521054 deleted_file:9961 deleted_bytes:21063702677 + DataNode 172.19.0.13:8701 total size:1695600546816 file_count:521054 deleted_file:9961 deleted_bytes:21063702677 + Rack rack2 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + Rack rack3 hdd(volume:3/25063 active:28 free:25060 remote:0) + DataNode 172.19.0.14:8711 hdd(volume:3/1627 active:3 free:1624 remote:0) + Disk hdd(volume:3/1627 active:3 free:1624 remote:0) + ec volume id:10457 collection:s3qldata shards:[3] + ec volume id:12737 collection:s3qldata shards:[6] + ec volume id:9577 collection:s3qldata shards:[5] + Disk hdd total size:1050933775360 file_count:323231 deleted_file:8245 deleted_bytes:15595720358 + DataNode 172.19.0.14:8711 total size:1050933775360 file_count:323231 deleted_file:8245 deleted_bytes:15595720358 + Rack rack3 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + Rack rack4 hdd(volume:10/25063 active:4 free:25053 remote:0) + DataNode 172.19.0.16:8704 hdd(volume:10/2174 active:4 free:2164 remote:0) + Disk hdd(volume:4/2174 active:4 free:2170 remote:0) + ec volume id:10457 collection:s3qldata shards:[0 13] + ec volume id:12737 collection:s3qldata shards:[1] + ec volume id:14322 collection:s3qldata shards:[7] + ec volume id:9577 collection:s3qldata shards:[2] + Disk hdd total size:1653215155776 file_count:507914 deleted_file:11402 deleted_bytes:22641676340 + DataNode 172.19.0.16:8704 total size:1653215155776 file_count:507914 deleted_file:11402 deleted_bytes:22641676340 + DataNode 172.19.0.17:8703 hdd(volume:6/2214 active:6 free:2208 remote:0) + Disk hdd(volume:6/2214 active:6 free:2208 remote:0) + volume id:6226 size:20871152 file_count:258 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664888660 + volume id:6241 size:34861256 file_count:275 delete_count:1 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664909248 + ec volume id:10457 collection:s3qldata shards:[11] + ec volume id:12737 collection:s3qldata shards:[5] + ec volume id:14322 collection:s3qldata shards:[2 9] + ec volume id:9577 collection:s3qldata shards:[0] + Disk hdd total size:1715724688456 file_count:526901 deleted_file:10854 deleted_bytes:22441405472 + DataNode 172.19.0.17:8703 total size:1715724688456 file_count:526901 deleted_file:10854 deleted_bytes:22441405472 + Rack rack4 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + Rack rack5 hdd(volume:20/25063 active:20 free:25043 remote:0) + DataNode 172.19.0.19:8700 hdd(volume:6/2132 active:6 free:2126 remote:0) + Disk hdd(volume:6/2132 active:6 free:2126 remote:0) + volume id:6242 size:40460472 file_count:236 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664906607 + volume id:6225 size:24398232 file_count:274 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664897660 + ec volume id:10457 collection:s3qldata shards:[8] + ec volume id:12737 collection:s3qldata shards:[13] + ec volume id:14322 collection:s3qldata shards:[8] + ec volume id:9577 collection:s3qldata shards:[12] + Disk hdd total size:1635328083064 file_count:504512 deleted_file:11567 deleted_bytes:23202472281 + DataNode 172.19.0.19:8700 total size:1635328083064 file_count:504512 deleted_file:11567 deleted_bytes:23202472281 + DataNode 172.19.0.20:8706 hdd(volume:4/2153 active:4 free:2149 remote:0) + Disk hdd(volume:6/2153 active:1497 free:656 remote:0) + ec volume id:10457 collection:s3qldata shards:[1] + ec volume id:12737 collection:s3qldata shards:[7] + ec volume id:14322 collection:s3qldata shards:[5 11 12] + ec volume id:9577 collection:s3qldata shards:[1] + Disk hdd total size:1662887597128 file_count:510323 deleted_file:10919 deleted_bytes:22504428853 + DataNode 172.19.0.20:8706 total size:1662887597128 file_count:510323 deleted_file:10919 deleted_bytes:22504428853 + DataNode 172.19.0.21:8710 hdd(volume:6/2184 active:6 free:2178 remote:0) + Disk hdd(volume:6/2184 active:6 free:2178 remote:0) + volume id:6225 size:24398232 file_count:274 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664897660 + volume id:6242 size:40460472 file_count:236 replica_placement:2 version:3 compact_revision:1 modified_at_second:1664906607 + ec volume id:10457 collection:s3qldata shards:[9] + ec volume id:12737 collection:s3qldata shards:[4] + ec volume id:14322 collection:s3qldata shards:[11] + ec volume id:9577 collection:s3qldata shards:[3] + Disk hdd total size:1685060737528 file_count:517231 deleted_file:10635 deleted_bytes:22306836236 + DataNode 172.19.0.21:8710 total size:1685060737528 file_count:517231 deleted_file:10635 deleted_bytes:22306836236 + DataNode 172.19.0.3:8708 hdd(volume:4/961 active:4 free:957 remote:0) + Disk hdd(volume:4/961 active:4 free:957 remote:0) + ec volume id:10457 collection:s3qldata shards:[4] + ec volume id:12737 collection:s3qldata shards:[10] + ec volume id:14322 collection:s3qldata shards:[3] + ec volume id:9577 collection:s3qldata shards:[7] + Disk hdd total size:377523488192 file_count:119577 deleted_file:5368 deleted_bytes:8596766559 + DataNode 172.19.0.3:8708 total size:377523488192 file_count:119577 deleted_file:5368 deleted_bytes:8596766559 + Rack rack5 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + Rack rack6 hdd(volume:18/25063 active:18 free:25045 remote:0) + DataNode 172.19.0.4:8707 hdd(volume:4/958 active:4 free:954 remote:0) + Disk hdd(volume:4/958 active:4 free:954 remote:0) + ec volume id:10457 collection:s3qldata shards:[6] + ec volume id:12737 collection:s3qldata shards:[9] + ec volume id:14322 collection:s3qldata shards:[4] + ec volume id:9577 collection:s3qldata shards:[9] + Disk hdd total size:378345005760 file_count:119036 deleted_file:5301 deleted_bytes:8574028334 + DataNode 172.19.0.4:8707 total size:378345005760 file_count:119036 deleted_file:5301 deleted_bytes:8574028334 + DataNode 172.19.0.5:8705 hdd(volume:3/983 active:3 free:980 remote:0) + Disk hdd(volume:3/983 active:3 free:980 remote:0) + ec volume id:10457 collection:s3qldata shards:[5] + ec volume id:12737 collection:s3qldata shards:[8] + ec volume id:9577 collection:s3qldata shards:[6] + Disk hdd total size:404653451288 file_count:126527 deleted_file:4789 deleted_bytes:8145619860 + DataNode 172.19.0.5:8705 total size:404653451288 file_count:126527 deleted_file:4789 deleted_bytes:8145619860 + DataNode 172.19.0.6:8713 hdd(volume:2/970 active:2 free:968 remote:0) + Disk hdd(volume:2/970 active:2 free:968 remote:0) + ec volume id:12737 collection:s3qldata shards:[11] + ec volume id:9577 collection:s3qldata shards:[8] + Disk hdd total size:401028073512 file_count:125448 deleted_file:4891 deleted_bytes:7914078769 + DataNode 172.19.0.6:8713 total size:401028073512 file_count:125448 deleted_file:4891 deleted_bytes:7914078769 + DataNode 172.19.0.8:8709 hdd(volume:5/2144 active:5 free:2139 remote:0) + Disk hdd(volume:5/2144 active:5 free:2139 remote:0) + volume id:6226 size:20871152 file_count:258 replica_placement:2 version:3 compact_revision:2 modified_at_second:1664888660 + ec volume id:10457 collection:s3qldata shards:[2] + ec volume id:12737 collection:s3qldata shards:[2 12] + ec volume id:14322 collection:s3qldata shards:[1 13] + ec volume id:9577 collection:s3qldata shards:[13] + Disk hdd total size:1648662273096 file_count:507133 deleted_file:11386 deleted_bytes:23141702025 + DataNode 172.19.0.8:8709 total size:1648662273096 file_count:507133 deleted_file:11386 deleted_bytes:23141702025 + DataNode 172.19.0.9:8712 hdd(volume:4/2144 active:4 free:2140 remote:0) + Disk hdd(volume:4/2144 active:4 free:2140 remote:0) + ec volume id:10457 collection:s3qldata shards:[7] + ec volume id:12737 collection:s3qldata shards:[0] + ec volume id:14322 collection:s3qldata shards:[0 6] + ec volume id:9577 collection:s3qldata shards:[4] + Disk hdd total size:1629878746296 file_count:497502 deleted_file:11825 deleted_bytes:23550625989 + DataNode 172.19.0.9:8712 total size:1629878746296 file_count:497502 deleted_file:11825 deleted_bytes:23550625989 + Rack rack6 total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 + DataCenter DefaultDataCenter total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 +total size:17676186754616 file_count:5439969 deleted_file:127907 deleted_bytes:251707271029 \ No newline at end of file diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 68ffbd7e7..070f79865 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -3,6 +3,7 @@ package backend import ( "github.com/seaweedfs/seaweedfs/weed/glog" . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "io" "os" "runtime" "time" @@ -43,7 +44,11 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { if df.File == nil { return 0, os.ErrClosed } - return df.File.ReadAt(p, off) + n, err = df.File.ReadAt(p, off) + if err == io.EOF && n == len(p) { + err = nil + } + return } func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index a94ca1fe3..a1d929f6c 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -180,7 +180,7 @@ func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []strin for shardId := 0; shardId < DataShardsCount; shardId++ { w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize) if w != ErasureCodingLargeBlockSize { - return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err) + return fmt.Errorf("copy %s large block on shardId %d: %v", baseFileName, shardId, err) } datFileSize -= ErasureCodingLargeBlockSize } diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 9fcb11525..0a7f4d09a 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -2,6 +2,7 @@ package erasure_coding import ( "fmt" + "io" "os" "path" "strconv" @@ -93,6 +94,10 @@ func (shard *EcVolumeShard) Destroy() { func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { - return shard.ecdFile.ReadAt(buf, offset) + n, err := shard.ecdFile.ReadAt(buf, offset) + if err == io.EOF && n == len(buf) { + err = nil + } + return n, err } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index d3a76b561..385d30abe 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -255,8 +255,10 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize for l < h { m := (l + h) / 2 - if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) + if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + if n != types.NeedleMapEntrySize { + return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) + } } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 28de27910..89708303d 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -43,19 +43,8 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p } } - if r.Method == http.MethodPost { - contentType := r.Header.Get("Content-Type") + e = parseUpload(r, sizeLimit, pu) - // If content-type is explicitly set, upload the file without parsing form-data - if contentType != "" && !strings.Contains(contentType, "form-data") { - e = parseRawPost(r, sizeLimit, pu) - } else { - e = parseMultipart(r, sizeLimit, pu) - } - - } else { - e = parsePut(r, sizeLimit, pu) - } if e != nil { return } @@ -108,170 +97,157 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p return } -func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) error { - pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" - // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" - pu.MimeType = r.Header.Get("Content-Type") - pu.FileName = "" - dataSize, err := pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1)) - if err == io.EOF || dataSize == sizeLimit+1 { - io.Copy(io.Discard, r.Body) - } - pu.Data = pu.bytesBuffer.Bytes() - r.Body.Close() - return nil -} +func parseUpload(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { -func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { defer func() { if e != nil && r.Body != nil { io.Copy(io.Discard, r.Body) r.Body.Close() } }() - form, fe := r.MultipartReader() - if fe != nil { - glog.V(0).Infoln("MultipartReader [ERROR]", fe) - e = fe - return - } - // first multi-part item - part, fe := form.NextPart() - if fe != nil { - glog.V(0).Infoln("Reading Multi part [ERROR]", fe) - e = fe - return - } + contentType := r.Header.Get("Content-Type") + var dataSize int64 - pu.FileName = part.FileName() - if pu.FileName != "" { - pu.FileName = path.Base(pu.FileName) - } + if r.Method == http.MethodPost && (contentType == "" || strings.Contains(contentType, "form-data")) { + form, fe := r.MultipartReader() - var dataSize int64 - dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(part, sizeLimit+1)) - if e != nil { - glog.V(0).Infoln("Reading Content [ERROR]", e) - return - } - if dataSize == sizeLimit+1 { - e = fmt.Errorf("file over the limited %d bytes", sizeLimit) - return - } - pu.Data = pu.bytesBuffer.Bytes() + if fe != nil { + glog.V(0).Infoln("MultipartReader [ERROR]", fe) + e = fe + return + } - // if the filename is empty string, do a search on the other multi-part items - for pu.FileName == "" { - part2, fe := form.NextPart() + // first multi-part item + part, fe := form.NextPart() if fe != nil { - break // no more or on error, just safely break + glog.V(0).Infoln("Reading Multi part [ERROR]", fe) + e = fe + return } - fName := part2.FileName() + pu.FileName = part.FileName() + if pu.FileName != "" { + pu.FileName = path.Base(pu.FileName) + } - // found the first multi-part has filename - if fName != "" { - pu.bytesBuffer.Reset() - dataSize2, fe2 := pu.bytesBuffer.ReadFrom(io.LimitReader(part2, sizeLimit+1)) - if fe2 != nil { - glog.V(0).Infoln("Reading Content [ERROR]", fe2) - e = fe2 - return - } - if dataSize2 == sizeLimit+1 { - e = fmt.Errorf("file over the limited %d bytes", sizeLimit) - return + dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(part, sizeLimit+1)) + if e != nil { + glog.V(0).Infoln("Reading Content [ERROR]", e) + return + } + if dataSize == sizeLimit+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + pu.Data = pu.bytesBuffer.Bytes() + + contentType = part.Header.Get("Content-Type") + + // if the filename is empty string, do a search on the other multi-part items + for pu.FileName == "" { + part2, fe := form.NextPart() + if fe != nil { + break // no more or on error, just safely break } - // update - pu.Data = pu.bytesBuffer.Bytes() - pu.FileName = path.Base(fName) - break + fName := part2.FileName() + + // found the first multi-part has filename + if fName != "" { + pu.bytesBuffer.Reset() + dataSize2, fe2 := pu.bytesBuffer.ReadFrom(io.LimitReader(part2, sizeLimit+1)) + if fe2 != nil { + glog.V(0).Infoln("Reading Content [ERROR]", fe2) + e = fe2 + return + } + if dataSize2 == sizeLimit+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + + // update + pu.Data = pu.bytesBuffer.Bytes() + pu.FileName = path.Base(fName) + contentType = part.Header.Get("Content-Type") + part = part2 + break + } } - } - pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) + pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" - if !pu.IsChunkedFile { + } else { + disposition := r.Header.Get("Content-Disposition") - dotIndex := strings.LastIndex(pu.FileName, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = strings.ToLower(pu.FileName[dotIndex:]) - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && contentType != "application/octet-stream" && mtype != contentType { - pu.MimeType = contentType // only return mime type if not deducible - mtype = contentType - } + if strings.Contains(disposition, "name=") { - } - pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" - // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" + if !strings.HasPrefix(disposition, "inline") && !strings.HasPrefix(disposition, "attachment") { + disposition = "attachment; " + disposition + } - return -} + _, mediaTypeParams, err := mime.ParseMediaType(disposition) -func parseRawPost(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { + if err == nil { + dpFilename, hasFilename := mediaTypeParams["filename"] + dpName, hasName := mediaTypeParams["name"] - defer func() { - if e != nil && r.Body != nil { - io.Copy(io.Discard, r.Body) - r.Body.Close() - } - }() + if hasFilename { + pu.FileName = dpFilename + } else if hasName { + pu.FileName = dpName + } - pu.FileName = r.Header.Get("Content-Disposition") + } - if pu.FileName != "" && strings.Contains(pu.FileName, "filename=") { - parts := strings.Split(pu.FileName, "filename=") - parts = strings.Split(parts[1], "\"") + } else { + pu.FileName = "" + } - pu.FileName = parts[1] - } else { - pu.FileName = "" - } + if pu.FileName != "" { + pu.FileName = path.Base(pu.FileName) + } else { + pu.FileName = path.Base(r.URL.Path) + } - if pu.FileName != "" { - pu.FileName = path.Base(pu.FileName) - } else { - pu.FileName = path.Base(r.URL.Path) - } + dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1)) - var dataSize int64 - dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1)) + if e != nil { + glog.V(0).Infoln("Reading Content [ERROR]", e) + return + } + if dataSize == sizeLimit+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } - if e != nil { - glog.V(0).Infoln("Reading Content [ERROR]", e) - return - } - if dataSize == sizeLimit+1 { - e = fmt.Errorf("file over the limited %d bytes", sizeLimit) - return + pu.Data = pu.bytesBuffer.Bytes() + pu.MimeType = contentType + pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" } - pu.Data = pu.bytesBuffer.Bytes() + pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) if !pu.IsChunkedFile { dotIndex := strings.LastIndex(pu.FileName, ".") ext, mtype := "", "" + if dotIndex > 0 { ext = strings.ToLower(pu.FileName[dotIndex:]) mtype = mime.TypeByExtension(ext) } - contentType := r.Header.Get("Content-Type") if contentType != "" && contentType != "application/octet-stream" && mtype != contentType { pu.MimeType = contentType // only return mime type if not deducible - mtype = contentType + } else if mtype != "" && pu.MimeType == "" && mtype != "application/octet-stream" { + pu.MimeType = mtype } } - pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" - // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" return } diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index f8468e9e2..1907efad3 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -196,6 +196,9 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6 var count int count, err = r.ReadAt(bytes, offset) + if err == io.EOF && count == NeedleHeaderSize { + err = nil + } if count <= 0 || err != nil { return nil, bytes, 0, err } @@ -230,7 +233,12 @@ func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, o return nil, nil } bytes = make([]byte, bodyLength) - if _, err = r.ReadAt(bytes, offset); err != nil { + readCount, err := r.ReadAt(bytes, offset) + if err == io.EOF && int64(readCount) == bodyLength { + err = nil + } + if err != nil { + glog.Errorf("%s read %d bodyLength %d offset %d: %v", r.Name(), readCount, bodyLength, offset, err) return } diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 36ddc3320..4e1032de8 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -21,6 +21,9 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64 startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset count, err = r.ReadAt(data[:sizeToRead], startOffset) + if err == io.EOF && int64(count) == sizeToRead { + err = nil + } if err != nil { fileSize, _, _ := r.GetStat() glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err) @@ -40,6 +43,9 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size bytes := make([]byte, NeedleHeaderSize+DataSizeSize) count, err := r.ReadAt(bytes, offset) + if err == io.EOF && count == NeedleHeaderSize+DataSizeSize { + err = nil + } if count != NeedleHeaderSize+DataSizeSize || err != nil { return err } diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go index 51d3bcf40..95854bc27 100644 --- a/weed/storage/needle/needle_write.go +++ b/weed/storage/needle/needle_write.go @@ -133,6 +133,9 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u if err == nil { _, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset)) + if err != nil { + err = fmt.Errorf("failed to write %d bytes to %s at offset %d: %v", actualSize, w.Name(), offset, err) + } } return offset, size, actualSize, err diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index 7669180ba..d6d0a8730 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "io" "os" "sync/atomic" @@ -152,8 +153,11 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key remainingCount := entryCount - nextBatchSize for remainingCount >= 0 { - _, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount) + n, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount) // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e) + if e == io.EOF && n == int(NeedleMapEntrySize*nextBatchSize) { + e = nil + } if e != nil { return e } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 7f4d9797a..8caaafc4d 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -201,9 +201,12 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { - if _, err = shard.ReadAt(data, actualOffset); err != nil { - glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) - return + var readSize int + if readSize, err = shard.ReadAt(data, actualOffset); err != nil { + if readSize != int(interval.Size) { + glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) + return + } } } else { ecVolume.ShardLocationsLock.RLock() diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index b2bf21fcb..f6d14e25b 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -45,6 +45,15 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { return NewReplicaPlacementFromString(fmt.Sprintf("%03d", b)) } +func (a *ReplicaPlacement) Equals(b *ReplicaPlacement) bool { + if a == nil || b == nil { + return false + } + return (a.SameRackCount == b.SameRackCount && + a.DiffRackCount == b.DiffRackCount && + a.DiffDataCenterCount == b.DiffDataCenterCount) +} + func (rp *ReplicaPlacement) Byte() byte { if rp == nil { return 0 diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go index fd1a874dd..dba1cba72 100644 --- a/weed/storage/super_block/super_block_read.go.go +++ b/weed/storage/super_block/super_block_read.go.go @@ -15,9 +15,11 @@ import ( func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) { header := make([]byte, SuperBlockSize) - if _, e := datBackend.ReadAt(header, 0); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e) - return + if n, e := datBackend.ReadAt(header, 0); e != nil { + if n != SuperBlockSize { + err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e) + return + } } superBlock.Version = needle.Version(header[0]) diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index e3215d919..b69e0b8ac 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -227,7 +227,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast mNs, nsReadErr := v.readAppendAtNs(offset) if nsReadErr != nil { - err = fmt.Errorf("read entry %d offset %d: %v", m, offset, nsReadErr) + err = fmt.Errorf("read entry %d offset %d: %v", m, offset.ToActualOffset(), nsReadErr) return } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 9bd432f85..6d2335f70 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -80,7 +80,11 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } bytes = make([]byte, NeedleMapEntrySize) - _, err = indexFile.ReadAt(bytes, offset) + var readCount int + readCount, err = indexFile.ReadAt(bytes, offset) + if err == io.EOF && readCount == NeedleMapEntrySize { + err = nil + } return } @@ -97,7 +101,11 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, } if v == needle.Version3 { bytes := make([]byte, TimestampSize) - _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + var readCount int + readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF && readCount == TimestampSize { + err = nil + } if err == io.EOF { return 0, err } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 55827f314..3a360ff99 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -407,10 +407,10 @@ func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { break } } + vl.removeFromCrowded(vid) if toDeleteIndex >= 0 { glog.V(0).Infoln("Volume", vid, "becomes unwritable") vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...) - vl.removeFromCrowded(vid) return true } return false @@ -506,7 +506,10 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { } func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) { - delete(vl.crowded, vid) + if _, ok := vl.crowded[vid]; ok { + glog.V(0).Infoln("Volume", vid, "becomes uncrowded") + delete(vl.crowded, vid) + } } func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) { diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 3968f62e9..87f05d399 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -110,8 +110,10 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { } data := make([]byte, nv.Size) if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil { - return nil, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr) + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr) + } } else { if readSize != int(nv.Size) { return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) @@ -133,8 +135,10 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin } data := make([]byte, wanted) if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); readErr != nil { - return nil, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr) + if readSize != wanted { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr) + } } else { if readSize != wanted { return nil, fmt.Errorf("read %d, expected %d", readSize, wanted) @@ -155,8 +159,10 @@ func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, of return 0, ErrorOutOfBounds } if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil { - return n, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + if n != wanted { + return n, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + } } else { if n != wanted { return n, fmt.Errorf("read %d, expected %d", n, wanted) diff --git a/weed/util/constants.go b/weed/util/constants.go index bb36f5e52..6162439e7 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -8,7 +8,7 @@ const HttpStatusCancelled = 499 var ( MAJOR_VERSION = int32(3) - MINOR_VERSION = int32(79) + MINOR_VERSION = int32(80) VERSION_NUMBER = fmt.Sprintf("%d.%02d", MAJOR_VERSION, MINOR_VERSION) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index c3931a790..33d978d9e 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "net/http" "net/url" @@ -16,6 +16,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" ) +var ErrNotFound = fmt.Errorf("not found") + func Post(url string, values url.Values) ([]byte, error) { r, err := GetGlobalHttpClient().PostForm(url, values) if err != nil { @@ -311,7 +313,10 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } defer CloseResponse(r) if r.StatusCode >= 400 { - retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499 + if r.StatusCode == http.StatusNotFound { + return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound) + } + retryable = r.StatusCode >= 499 return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) } @@ -477,4 +482,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, return n, err -} \ No newline at end of file +} diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 03112cb08..175718cd2 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -19,10 +19,13 @@ const ( type ExclusiveLocker struct { token int64 lockTsNs int64 - isLocked bool + isLocked atomic.Bool masterClient *wdclient.MasterClient lockName string message string + clientName string + // Each lock has and only has one goroutine + renewGoroutineRunning atomic.Bool } func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker { @@ -33,7 +36,7 @@ func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *E } func (l *ExclusiveLocker) IsLocked() bool { - return l.isLocked + return l.isLocked.Load() } func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { @@ -45,7 +48,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { } func (l *ExclusiveLocker) RequestLock(clientName string) { - if l.isLocked { + if l.isLocked.Load() { return } @@ -74,43 +77,51 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { } } - l.isLocked = true - - // start a goroutine to renew the lease - go func() { - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - - for l.isLocked { - if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - LockName: l.lockName, - ClientName: clientName, - Message: l.message, - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - // println("ts", l.lockTsNs, "token", l.token) + l.isLocked.Store(true) + l.clientName = clientName + + // Each lock has and only has one goroutine + if l.renewGoroutineRunning.CompareAndSwap(false, true) { + // start a goroutine to renew the lease + go func() { + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + + for { + if l.isLocked.Load() { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ + PreviousToken: atomic.LoadInt64(&l.token), + PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), + LockName: l.lockName, + ClientName: l.clientName, + Message: l.message, + }) + if err == nil { + atomic.StoreInt64(&l.token, resp.Token) + atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) + // println("ts", l.lockTsNs, "token", l.token) + } + return err + }); err != nil { + glog.Errorf("failed to renew lock: %v", err) + l.isLocked.Store(false) + return + } else { + time.Sleep(RenewInterval) + } + } else { + time.Sleep(RenewInterval) } - return err - }); err != nil { - glog.Errorf("failed to renew lock: %v", err) - l.isLocked = false - return - } else { - time.Sleep(RenewInterval) } - - } - }() + }() + } } func (l *ExclusiveLocker) ReleaseLock() { - l.isLocked = false + l.isLocked.Store(false) + l.clientName = "" ctx, cancel := context.WithCancel(context.Background()) defer cancel()