Browse Source

Merge branch 'master' into message_send

message_send
chrislu 2 years ago
parent
commit
3f4400dc03
  1. 4
      .github/workflows/codeql.yml
  2. 89
      .github/workflows/e2e.yml
  3. 2
      .github/workflows/go.yml
  4. 30
      docker/Dockerfile.e2e
  5. 2
      docker/Dockerfile.gccgo_build
  6. 2
      docker/Dockerfile.go_build
  7. 1
      docker/Dockerfile.local
  8. 16
      docker/Dockerfile.rocksdb_dev_env
  9. 4
      docker/Dockerfile.rocksdb_large
  10. 45
      docker/Dockerfile.rocksdb_large_local
  11. 20
      docker/Makefile
  12. 53
      docker/compose/e2e-mount.yml
  13. 33
      go.mod
  14. 73
      go.sum
  15. 4
      k8s/helm_charts2/Chart.yaml
  16. 2
      other/java/hdfs2/pom.xml
  17. 10
      unmaintained/s3/presigned_put/presigned_put.go
  18. 31
      weed/command/filer.go
  19. 22
      weed/command/filer_sync.go
  20. 4
      weed/command/mount_linux.go
  21. 2
      weed/command/mount_std.go
  22. 2
      weed/command/s3.go
  23. 15
      weed/command/update.go
  24. 2
      weed/filer/arangodb/arangodb_store.go
  25. 3
      weed/filer/arangodb/arangodb_store_bucket.go
  26. 2
      weed/filer/arangodb/helpers.go
  27. 4
      weed/filer/filer.go
  28. 2
      weed/filer/filer_deletion.go
  29. 5
      weed/filer/meta_aggregator.go
  30. 2
      weed/filer/rocksdb/rocksdb_store.go
  31. 2
      weed/filer/s3iam_conf.go
  32. 35
      weed/filer/s3iam_conf_test.go
  33. 11
      weed/mount/filehandle.go
  34. 4
      weed/mount/weedfs_file_read.go
  35. 31
      weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
  36. 5
      weed/operation/chunked_file.go
  37. 3
      weed/operation/upload_content.go
  38. 1
      weed/replication/sink/filersink/filer_sink.go
  39. 2
      weed/replication/sub/notification_gocdk_pub_sub.go
  40. 3
      weed/s3api/s3api_object_copy_handlers.go
  41. 6
      weed/s3api/s3api_server.go
  42. 1
      weed/server/common.go
  43. 2
      weed/server/filer_server_handlers_read.go
  44. 2
      weed/server/filer_server_handlers_write_autochunk.go
  45. 12
      weed/server/filer_server_handlers_write_upload.go
  46. 4
      weed/server/raft_hashicorp.go
  47. 14
      weed/shell/command_fs_meta_load.go
  48. 4
      weed/shell/command_volume_check_disk.go
  49. 35
      weed/shell/command_volume_tier_move.go
  50. 15
      weed/storage/backend/s3_backend/s3_backend.go
  51. 2
      weed/storage/store.go
  52. 5
      weed/storage/volume.go
  53. 9
      weed/storage/volume_read.go
  54. 12
      weed/topology/volume_layout.go
  55. 2
      weed/util/constants.go
  56. 6
      weed/util/grace/signal_handling.go
  57. 19
      weed/util/http_util.go
  58. 36
      weed/wdclient/masterclient.go

4
.github/workflows/codeql.yml

@ -3,6 +3,10 @@ name: "Code Scanning - Action"
on:
pull_request:
concurrency:
group: ${{ github.head_ref }}/codeql
cancel-in-progress: true
jobs:
CodeQL-Build:
# CodeQL runs on ubuntu-latest, windows-latest, and macos-latest

89
.github/workflows/e2e.yml

@ -0,0 +1,89 @@
name: "End to End"
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
concurrency:
group: ${{ github.head_ref }}/e2e
cancel-in-progress: true
permissions:
contents: read
defaults:
run:
working-directory: docker
jobs:
e2e:
name: FUSE Mount
runs-on: ubuntu-22.04
timeout-minutes: 15
steps:
- name: Set up Go 1.x
uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@2541b1294d2704b0964813337f33b291d3f8596b # v2
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y fuse
- name: Start SeaweedFS
timeout-minutes: 5
run: make build_e2e && docker compose -f ./compose/e2e-mount.yml up --wait
- name: Run FIO
timeout-minutes: 5
run: |
echo "Starting FIO at: $(date)"
# Concurrent r/w
echo 'Run randrw with size=16M bs=4k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
echo 'Run randrw with size=16M bs=128k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=128k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
echo 'Run randrw with size=16M bs=1m'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randrw --bs=1m --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1
# Verified write
echo 'Run randwrite with size=16M bs=4k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=4k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
echo 'Run randwrite with size=16M bs=128k'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=128k --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
echo 'Run randwrite with size=16M bs=1m'
docker compose -f ./compose/e2e-mount.yml exec mount timeout -k5 40 fio --name=fiotest --filename=/mnt/seaweedfs/fiotest --size=16M --rw=randwrite --bs=1m --direct=1 --numjobs=8 --ioengine=libaio --iodepth=32 --group_reporting --runtime=30 --time_based=1 --do_verify=0 --verify=crc32c --verify_backlog=1
- name: Save logs
if: always()
run: |
docker compose -f ./compose/e2e-mount.yml logs > output.log
echo 'Showing last 500 log lines of mount service:'
docker compose -f ./compose/e2e-mount.yml logs --tail 500 mount
- name: Check for data races
if: always()
continue-on-error: true # TODO: remove this comment to enable build failure on data races (after all are fixed)
run: grep -A50 'DATA RACE' output.log && exit 1 || exit 0
- name: Archive logs
if: always()
uses: actions/upload-artifact@v3
with:
name: output-logs
path: docker/output.log
- name: Cleanup
if: always()
run: docker compose -f ./compose/e2e-mount.yml down --volumes --remove-orphans --rmi all

2
.github/workflows/go.yml

@ -21,7 +21,7 @@ jobs:
steps:
- name: Set up Go 1.x
uses: actions/setup-go@84cbf8094393cdc5fe1fe1671ff2647332956b1a # v2
uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f # v2
with:
go-version: ^1.13
id: go

30
docker/Dockerfile.e2e

@ -0,0 +1,30 @@
FROM ubuntu:22.04
LABEL author="Chris Lu"
RUN apt-get update && apt-get install -y curl fio fuse
RUN mkdir -p /etc/seaweedfs /data/filerldb2
COPY ./weed /usr/bin/
COPY ./filer.toml /etc/seaweedfs/filer.toml
COPY ./entrypoint.sh /entrypoint.sh
# volume server grpc port
EXPOSE 18080
# volume server http port
EXPOSE 8080
# filer server grpc port
EXPOSE 18888
# filer server http port
EXPOSE 8888
# master server shared grpc port
EXPOSE 19333
# master server shared http port
EXPOSE 9333
VOLUME /data
WORKDIR /data
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

2
docker/Dockerfile.gccgo_build

@ -1,5 +1,5 @@
FROM gcc:11 as builder
RUN mkdir -p /go/src/github.com/chrislusf/
RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/seaweedfs/seaweedfs && git checkout $BRANCH

2
docker/Dockerfile.go_build

@ -1,6 +1,6 @@
FROM golang:1.19-alpine as builder
RUN apk add git g++ fuse
RUN mkdir -p /go/src/github.com/chrislusf/
RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
ARG TAGS

1
docker/Dockerfile.local

@ -5,6 +5,7 @@ RUN mkdir -p /etc/seaweedfs
COPY ./filer.toml /etc/seaweedfs/filer.toml
COPY ./entrypoint.sh /entrypoint.sh
RUN apk add fuse # for weed mount
RUN apk add curl # for health checks
# volume server grpc port
EXPOSE 18080

16
docker/Dockerfile.rocksdb_dev_env

@ -0,0 +1,16 @@
FROM golang:1.19-buster as builder
RUN apt-get update
RUN apt-get install -y build-essential libsnappy-dev zlib1g-dev libbz2-dev libgflags-dev liblz4-dev libzstd-dev
ENV ROCKSDB_VERSION v7.5.3
# build RocksDB
RUN cd /tmp && \
git clone https://github.com/facebook/rocksdb.git /tmp/rocksdb --depth 1 --single-branch --branch $ROCKSDB_VERSION && \
cd rocksdb && \
PORTABLE=1 make static_lib && \
make install-static
ENV CGO_CFLAGS "-I/tmp/rocksdb/include"
ENV CGO_LDFLAGS "-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"

4
docker/Dockerfile.rocksdb_large

@ -3,7 +3,7 @@ FROM golang:1.19-buster as builder
RUN apt-get update
RUN apt-get install -y build-essential libsnappy-dev zlib1g-dev libbz2-dev libgflags-dev liblz4-dev libzstd-dev
ENV ROCKSDB_VERSION v7.4.4
ENV ROCKSDB_VERSION v7.5.3
# build RocksDB
RUN cd /tmp && \
@ -16,7 +16,7 @@ ENV CGO_CFLAGS "-I/tmp/rocksdb/include"
ENV CGO_LDFLAGS "-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
# build SeaweedFS
RUN mkdir -p /go/src/github.com/chrislusf/
RUN mkdir -p /go/src/github.com/seaweedfs/
RUN git clone https://github.com/seaweedfs/seaweedfs /go/src/github.com/seaweedfs/seaweedfs
ARG BRANCH=${BRANCH:-master}
RUN cd /go/src/github.com/seaweedfs/seaweedfs && git checkout $BRANCH

45
docker/Dockerfile.rocksdb_large_local

@ -0,0 +1,45 @@
FROM chrislusf/rocksdb_dev_env as builder
# build SeaweedFS
RUN mkdir -p /go/src/github.com/seaweedfs/
ADD . /go/src/github.com/seaweedfs/seaweedfs
RUN ls -al /go/src/github.com/seaweedfs/ && \
cd /go/src/github.com/seaweedfs/seaweedfs/weed \
&& export LDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \
&& go install -tags "5BytesOffset rocksdb" -ldflags "-extldflags -static ${LDFLAGS}"
FROM alpine AS final
LABEL author="Chris Lu"
COPY --from=builder /go/bin/weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY --from=builder /go/src/github.com/seaweedfs/seaweedfs/docker/filer_rocksdb.toml /etc/seaweedfs/filer.toml
COPY --from=builder /go/src/github.com/seaweedfs/seaweedfs/docker/entrypoint.sh /entrypoint.sh
RUN apk add fuse snappy gflags tmux
# volume server gprc port
EXPOSE 18080
# volume server http port
EXPOSE 8080
# filer server gprc port
EXPOSE 18888
# filer server http port
EXPOSE 8888
# master server shared gprc port
EXPOSE 19333
# master server shared http port
EXPOSE 9333
# s3 server http port
EXPOSE 8333
# webdav server http port
EXPOSE 7333
RUN mkdir -p /data/filer_rocksdb
VOLUME /data
WORKDIR /data
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

20
docker/Makefile

@ -8,11 +8,17 @@ cgo ?= 0
binary:
export SWCOMMIT=$(shell git rev-parse --short HEAD)
export SWLDFLAGS="-X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=$(SWCOMMIT)"
cd ../weed; CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-extldflags -static $(SWLDFLAGS)"; mv weed ../docker/
cd ../weed && CGO_ENABLED=$(cgo) GOOS=linux go build $(options) -tags "$(tags)" -ldflags "-extldflags -static $(SWLDFLAGS)" && mv weed ../docker/
binary_race: options = -race
binary_race: cgo = 1
binary_race: binary
build: binary
docker build --no-cache -t chrislusf/seaweedfs:local -f Dockerfile.local .
rm ./weed
build_e2e: binary_race
docker build --no-cache -t chrislusf/seaweedfs:e2e -f Dockerfile.e2e .
go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset
docker build --build-arg TAGS=$(tags) --no-cache -t chrislusf/seaweedfs:go_build -f Dockerfile.go_build .
@ -20,6 +26,12 @@ go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset
go_build_large_disk:
docker build --build-arg TAGS=large_disk --no-cache -t chrislusf/seaweedfs:large_disk -f Dockerfile.go_build .
build_rocksdb_dev_env:
docker build --no-cache -t chrislusf/rocksdb_dev_env -f Dockerfile.rocksdb_dev_env .
build_rocksdb_local:
cd .. ; docker build --no-cache -t chrislusf/seaweedfs:rocksdb_local -f docker/Dockerfile.rocksdb_large_local .
build_rocksdb:
docker build --no-cache -t chrislusf/seaweedfs:rocksdb -f Dockerfile.rocksdb_large .
@ -29,9 +41,7 @@ s3tests_build:
dev: build
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
dev_race: options = -race
dev_race: cgo = 1
dev_race: build
dev_race: binary_race
docker-compose -f compose/local-dev-compose.yml -p seaweedfs up
dev_tls: build certstrap

53
docker/compose/e2e-mount.yml

@ -0,0 +1,53 @@
version: '3.9'
services:
master:
image: chrislusf/seaweedfs:e2e
command: "-v=4 master -ip=master -ip.bind=0.0.0.0 -raftBootstrap"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
interval: 1s
timeout: 60s
volume:
image: chrislusf/seaweedfs:e2e
command: "-v=4 volume -mserver=master:9333 -ip=volume -ip.bind=0.0.0.0 -preStopSeconds=1"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://localhost:8080/healthz" ]
interval: 1s
timeout: 30s
depends_on:
master:
condition: service_healthy
filer:
image: chrislusf/seaweedfs:e2e
command: "-v=4 filer -master=master:9333 -ip=filer -ip.bind=0.0.0.0"
healthcheck:
test: [ "CMD", "curl", "--fail", "-I", "http://localhost:8888" ]
interval: 1s
timeout: 30s
depends_on:
volume:
condition: service_healthy
mount:
image: chrislusf/seaweedfs:e2e
command: "-v=4 mount -filer=filer:8888 -filer.path=/ -dirAutoCreate -dir=/mnt/seaweedfs"
cap_add:
- SYS_ADMIN
devices:
- /dev/fuse
security_opt:
- apparmor:unconfined
deploy:
resources:
limits:
memory: 4096m
healthcheck:
test: [ "CMD", "mountpoint", "-q", "--", "/mnt/seaweedfs" ]
interval: 1s
timeout: 30s
depends_on:
filer:
condition: service_healthy

33
go.mod

@ -3,13 +3,13 @@ module github.com/seaweedfs/seaweedfs
go 1.19
require (
cloud.google.com/go v0.102.1 // indirect
cloud.google.com/go/pubsub v1.24.0
cloud.google.com/go/storage v1.25.0
cloud.google.com/go v0.104.0 // indirect
cloud.google.com/go/pubsub v1.25.1
cloud.google.com/go/storage v1.26.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Shopify/sarama v1.36.0
github.com/aws/aws-sdk-go v1.44.76
github.com/aws/aws-sdk-go v1.44.91
github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
@ -61,7 +61,7 @@ require (
github.com/klauspost/reedsolomon v1.10.0
github.com/kurin/blazer v0.5.3
github.com/lib/pq v1.10.6
github.com/linxGnu/grocksdb v1.7.5
github.com/linxGnu/grocksdb v1.7.7
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-ieproxy v0.0.3 // indirect
@ -90,7 +90,6 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.12.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.0
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
@ -112,31 +111,31 @@ require (
go.opencensus.io v0.23.0 // indirect
gocloud.dev v0.26.0
gocloud.dev/pubsub/natspubsub v0.26.0
gocloud.dev/pubsub/rabbitpubsub v0.25.0
gocloud.dev/pubsub/rabbitpubsub v0.26.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd
golang.org/x/image v0.0.0-20200119044424-58c23975cae1
golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 // indirect
golang.org/x/sys v0.0.0-20220818161305-2296e01440c6
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/api v0.92.0
google.golang.org/api v0.94.0
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220720214146-176da50484ac // indirect
google.golang.org/grpc v1.48.0
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
gopkg.in/inf.v0 v0.9.1 // indirect
modernc.org/b v1.0.0 // indirect
modernc.org/cc/v3 v3.36.0 // indirect
modernc.org/ccgo/v3 v3.16.8 // indirect
modernc.org/libc v1.16.19 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/opt v0.1.1 // indirect
modernc.org/sqlite v1.18.1
modernc.org/strutil v1.1.2
modernc.org/strutil v1.1.3
modernc.org/token v1.0.0 // indirect
)
@ -144,13 +143,14 @@ require (
github.com/Jille/raft-grpc-transport v1.2.0
github.com/arangodb/go-driver v1.3.3
github.com/fluent/fluent-logger-golang v1.9.0
github.com/google/flatbuffers v2.0.6+incompatible
github.com/google/flatbuffers v2.0.8+incompatible
github.com/hanwen/go-fuse/v2 v2.1.1-0.20220627082937-d01fda7edf17
github.com/hashicorp/raft v1.3.10
github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0
github.com/hashicorp/raft-boltdb/v2 v2.2.2
github.com/rabbitmq/amqp091-go v1.4.0
github.com/tikv/client-go/v2 v2.0.1
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2
github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1
github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4
google.golang.org/grpc/security/advancedtls v0.0.0-20220622233350-5cdb09fa29c1
)
@ -218,6 +218,7 @@ require (
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f // indirect
github.com/ydb-platform/ydb-go-yc v0.8.3 // indirect
github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect
go.uber.org/atomic v1.9.0 // indirect

73
go.sum

@ -33,8 +33,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.104.0 h1:gSmWO7DY1vOm0MVU6DNXM11BWHHsTUmsC5cv1fuW5X8=
cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@ -66,8 +66,8 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.19.0/go.mod h1:/O9kmSe9bb9KRnIAWkzmqhPjHo6LtzGOBYd/kr06XSs=
cloud.google.com/go/pubsub v1.24.0 h1:aCS6wSMzrc602OeXUMA66KGlyXxpdkHdwN+FSBv/sUg=
cloud.google.com/go/pubsub v1.24.0/go.mod h1:rWv09Te1SsRpRGPiWOMDKraMQTJyJps4MkUCoMGUgqw=
cloud.google.com/go/pubsub v1.25.1 h1:l0wCNZKuEp2Q54wAy8283EV9O57+7biWOXnnU2/Tq/A=
cloud.google.com/go/pubsub v1.25.1/go.mod h1:bY6l7rF8kCcwz6V3RaQ6kK4p5g7qc7PqjRoE9wDOqOU=
cloud.google.com/go/secretmanager v1.3.0/go.mod h1:+oLTkouyiYiabAQNugCeTS3PAArGiMJuBqvJnJsyH+U=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
@ -77,8 +77,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
cloud.google.com/go/storage v1.25.0 h1:D2Dn0PslpK7Z3B2AvuUHyIC762bDbGJdlmQlCBR71os=
cloud.google.com/go/storage v1.25.0/go.mod h1:Qys4JU+jeup3QnuKKAosWuxrD95C4MSqxfVDnSirDsI=
cloud.google.com/go/storage v1.26.0 h1:lYAGjknyDJirSzfwUlkv4Nsnj7od7foxQNH/fqZqles=
cloud.google.com/go/storage v1.26.0/go.mod h1:mk/N7YwIKEWyTvXAWQCIeiCTdLoRH6Pd5xmSnolQLTI=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM=
contrib.go.opencensus.io/exporter/aws v0.0.0-20200617204711-c478e41e60e9/go.mod h1:uu1P0UCM/6RbsMrgPa98ll8ZcHM858i/AD06a9aLRCA=
@ -148,15 +148,14 @@ github.com/arangodb/go-driver v1.3.3/go.mod h1:5GAx3XvK72DJPhJgyjZOtYAGc4SpY7rZD
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g=
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo=
github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.76 h1:5e8yGO/XeNYKckOjpBKUd5wStf0So3CrQIiOMCVLpOI=
github.com/aws/aws-sdk-go v1.44.76/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.91 h1:SRWmuX7PTyhBdLuvSfM7KWrWISJsrRsUPcFDSFduRxY=
github.com/aws/aws-sdk-go v1.44.91/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
@ -408,8 +407,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/flatbuffers v2.0.6+incompatible h1:XHFReMv7nFFusa+CEokzWbzaYocKXI6C7hdU5Kgh9Lw=
github.com/google/flatbuffers v2.0.6+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@ -527,8 +526,10 @@ github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7H
github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.3.10 h1:LR5QZX1VQd0DFWZfeCwWawyeKfpS/Tm1yjnJIY5X4Tw=
github.com/hashicorp/raft v1.3.10/go.mod h1:J8naEwc6XaaCfts7+28whSeRvCqTd6e20BlCU3LtEO4=
github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 h1:CO8dBMLH6dvE1jTn/30ZZw3iuPsNfajshWoJTnVc5cc=
github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0=
github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea h1:RxcPJuutPRM8PUOyiweMmkuNO+RJyfy2jds2gfvgNmU=
github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea/go.mod h1:qRd6nFJYYS6Iqnc/8HcUmko2/2Gw8qTFEmxDLii6W5I=
github.com/hashicorp/raft-boltdb/v2 v2.2.2 h1:rlkPtOllgIcKLxVT4nutqlTH2NRFn+tO1wwZk/4Dxqw=
github.com/hashicorp/raft-boltdb/v2 v2.2.2/go.mod h1:N8YgaZgNJLpZC+h+by7vDu5rzsRgONThTEeUS3zWbfY=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@ -651,8 +652,8 @@ github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linxGnu/grocksdb v1.7.5 h1:nB/Z6OsxvO4VJqn4CYaUubYh0523YBsAPvrW5Gc7cnY=
github.com/linxGnu/grocksdb v1.7.5/go.mod h1:8RUIOKch4MgftfkmgG4FPVkPUwvbLWlX/FcDkJFXIbo=
github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg=
github.com/linxGnu/grocksdb v1.7.7/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
@ -807,6 +808,9 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
@ -850,8 +854,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
@ -919,8 +921,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mo
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.25.3/go.mod h1:PFizF/vJsdAgEwjK3DVSBD52kdmRkWfSIS2q2pA+e88=
github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1 h1:+DTSx1uMFMtNHEpC1ZyM6t2vaBQCnymqHV1+/abZDWM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4 h1:wQtx05MHEuYnIt56wos9vaz3N7/Ue04PiSYWk7o7Akw=
github.com/ydb-platform/ydb-go-sdk/v3 v3.37.4/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
github.com/ydb-platform/ydb-go-yc v0.8.3 h1:92UUUMsfvtMl6mho8eQ9lbkiPrF3a9CT+RrVRAKNRwo=
github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE=
github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 h1:nMtixUijP0Z7iHJNT9fOL+dbmEzZxqU6Xk87ll7hqXg=
@ -933,6 +935,9 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.2/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc=
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
@ -983,13 +988,12 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
gocloud.dev v0.25.0/go.mod h1:7HegHVCYZrMiU3IE1qtnzf/vRrDwLYnRNR3EhWX8x9Y=
gocloud.dev v0.26.0 h1:4rM/SVL0lLs+rhC0Gmc+gt/82DBpb7nbpIZKXXnfMXg=
gocloud.dev v0.26.0/go.mod h1:mkUgejbnbLotorqDyvedJO20XcZNTynmSeVSQS9btVg=
gocloud.dev/pubsub/natspubsub v0.26.0 h1:f1hynU3D37dREFD9JzZLpCMoFJB5MyrHsujf9c6q+lE=
gocloud.dev/pubsub/natspubsub v0.26.0/go.mod h1:eec1REZLqHheRuVb43qcLhyRj/3/hGNRO8xNZYrhzqw=
gocloud.dev/pubsub/rabbitpubsub v0.25.0 h1:jDAHvIH0h40quEuqusYXfK28sCABAMAnjLqLybu/aeo=
gocloud.dev/pubsub/rabbitpubsub v0.25.0/go.mod h1:gfOrMlNXnxzIYB3dK1mNenXeBwJjm2ZSRBgNzxan0/Y=
gocloud.dev/pubsub/rabbitpubsub v0.26.0 h1:oJqftlDNHWq5oB6VahB0QU1v0pecxXJVvIltRlBcehE=
gocloud.dev/pubsub/rabbitpubsub v0.26.0/go.mod h1:21IRXtS7aNovPySnKZNAwKmWcP9ju32eHdMRuG7RTbs=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
@ -1142,8 +1146,8 @@ golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 h1:+jnHzr9VPj32ykQVai5DNahi9+NSp7yYuCsl5eAQtL0=
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 h1:2o1E+E8TpNLklK9nHiPiK1uzIYrIHt+cQx3ynCwq9V8=
golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -1204,6 +1208,7 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1406,8 +1411,8 @@ google.golang.org/api v0.75.0/go.mod h1:pU9QmyHLnzlpar1Mjt4IbapUCy8J+6HD6GeELN69
google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6rKmw=
google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg=
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.92.0 h1:8JHk7q/+rJla+iRsWj9FQ9/wjv2M1SKtpKSdmLhxPT0=
google.golang.org/api v0.92.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/api v0.94.0 h1:KtKM9ru3nzQioV1HLlUf1cR7vMYJIpgls5VhAYQXIwA=
google.golang.org/api v0.94.0/go.mod h1:eADj+UBuxkh5zlrSntJghuNeg8HwQ1w5lTKkuqaETEI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -1513,10 +1518,9 @@ google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220720214146-176da50484ac h1:EOa+Yrhx1C0O+4pHeXeWrCwdI0tWI6IfUU56Vebs9wQ=
google.golang.org/genproto v0.0.0-20220720214146-176da50484ac/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc h1:Nf+EdcTLHR8qDNN/KfkQL0u0ssxt9OhbaWCl5C0ucEI=
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -1550,8 +1554,8 @@ google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b h1:NuxyvVZoDfHZwYW9LD4GJiF5/nhiSyP4/InTrvw9Ibk=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b/go.mod h1:IBqQ7wSUJ2Ep09a8rMWFsg4fmI2r38zwsq8a0GgxXpM=
@ -1633,8 +1637,9 @@ modernc.org/libc v1.16.19 h1:S8flPn5ZeXx6iw/8yNa986hwTQDrY8RXU7tObZuAozo=
modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU=
modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw=
modernc.org/opt v0.1.1 h1:/0RX92k9vwVeDXj+Xn23DKp2VJubL7k8qNffND6qn3A=
@ -1643,8 +1648,8 @@ modernc.org/sqlite v1.18.1 h1:ko32eKt3jf7eqIkCgPAeHMBXw3riNSLhl2f3loEF7o8=
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
modernc.org/strutil v1.1.2 h1:iFBDH6j1Z0bN/Q9udJnnFoFpENA4252qe/7/5woE5MI=
modernc.org/strutil v1.1.2/go.mod h1:OYajnUAcI/MX+XD/Wx7v1bbdvcQSvxgtb0gC+u3d3eg=
modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/tcl v1.13.1 h1:npxzTwFTZYM8ghWicVIX1cRWzj7Nd8i6AqqX2p+IYao=
modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk=
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

4
k8s/helm_charts2/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "3.24"
version: "3.24"
appVersion: "3.25"
version: "3.25"

2
other/java/hdfs2/pom.xml

@ -6,7 +6,7 @@
<properties>
<seaweedfs.client.version>3.13</seaweedfs.client.version>
<hadoop.version>2.10.1</hadoop.version>
<hadoop.version>3.2.4</hadoop.version>
</properties>
<groupId>com.github.chrislusf</groupId>

10
unmaintained/s3/presigned_put/presigned_put.go

@ -1,15 +1,16 @@
package main
import (
"crypto/md5"
"encoding/base64"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"encoding/base64"
"fmt"
"crypto/md5"
"github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"strings"
"time"
"net/http"
)
// Downloads an item from an S3 Bucket in the region configured in the shared config
@ -63,6 +64,7 @@ func main() {
fmt.Printf("error put request: %v\n", err)
return
}
defer util.CloseResponse(resp)
fmt.Printf("response: %+v\n", resp)
}

31
weed/command/filer.go

@ -173,29 +173,29 @@ func runFiler(cmd *Command, args []string) bool {
if *f.dataCenter != "" && *filerS3Options.dataCenter == "" {
filerS3Options.dataCenter = f.dataCenter
}
go func() {
time.Sleep(startDelay * time.Second)
go func(delay time.Duration) {
time.Sleep(delay * time.Second)
filerS3Options.startS3Server()
}()
}(startDelay)
startDelay++
}
if *filerStartWebDav {
filerWebDavOptions.filer = &filerAddress
go func() {
time.Sleep(startDelay * time.Second)
go func(delay time.Duration) {
time.Sleep(delay * time.Second)
filerWebDavOptions.startWebDav()
}()
}(startDelay)
startDelay++
}
if *filerStartIam {
filerIamOptions.filer = &filerAddress
filerIamOptions.masters = f.mastersString
go func() {
time.Sleep(startDelay * time.Second)
go func(delay time.Duration) {
time.Sleep(delay * time.Second)
filerIamOptions.startIamServer()
}()
}(startDelay)
}
f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
@ -293,17 +293,18 @@ func (fo *FilerOptions) startFiler() {
httpS := &http.Server{Handler: defaultMux}
if runtime.GOOS != "windows" {
if *fo.localSocket == "" {
*fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
localSocket := *fo.localSocket
if localSocket == "" {
localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
}
if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
if err := os.Remove(localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", localSocket, err.Error())
}
go func() {
// start on local unix socket
filerSocketListener, err := net.Listen("unix", *fo.localSocket)
filerSocketListener, err := net.Listen("unix", localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
glog.Fatalf("Failed to listen on %s: %v", localSocket, err)
}
httpS.Serve(filerSocketListener)
}()

22
weed/command/filer_sync.go

@ -44,10 +44,16 @@ type SyncOptions struct {
aProxyByFiler *bool
bProxyByFiler *bool
metricsHttpPort *int
concurrency *int
clientId int32
clientEpoch int32
}
const (
SyncKeyPrefix = "sync."
DefaultConcurrcyLimit = 32
)
var (
syncOptions SyncOptions
syncCpuProfile *string
@ -77,6 +83,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrcyLimit, "The maximum number of files that will be synced concurrently.")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
@ -153,6 +160,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
*syncOptions.concurrency,
aFilerSignature,
bFilerSignature)
if err != nil {
@ -189,6 +197,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
*syncOptions.concurrency,
bFilerSignature,
aFilerSignature)
if err != nil {
@ -221,7 +230,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time
@ -251,7 +260,12 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
return persistEventFn(resp)
}
processor := NewMetadataProcessor(processEventFn, 128)
if concurrency < 0 || concurrency > 1024 {
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrcyLimit)
concurrency = DefaultConcurrcyLimit
}
processor := NewMetadataProcessor(processEventFn, concurrency)
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
@ -276,10 +290,6 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
const (
SyncKeyPrefix = "sync."
)
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version

4
weed/command/mount_linux.go

@ -3,6 +3,7 @@ package command
import (
"bufio"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"io"
"os"
"strings"
@ -142,6 +143,9 @@ func checkMountPointAvailable(dir string) bool {
}
if mounted, err := mounted(mountPoint); err != nil || mounted {
if err != nil {
glog.Errorf("check %s: %v", mountPoint, err)
}
return false
}

2
weed/command/mount_std.go

@ -157,7 +157,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
glog.Fatalf("Target mount point is not available: %s, please check!", dir)
return true
}

2
weed/command/s3.go

@ -194,7 +194,7 @@ func (s3opt *S3Options) startS3Server() bool {
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: s3opt.localFilerSocket,
LocalFilerSocket: *s3opt.localFilerSocket,
DataCenter: *s3opt.dataCenter,
})
if s3ApiServer_err != nil {

15
weed/command/update.go

@ -199,6 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
if err != nil {
return Release{}, err
}
defer util.CloseResponse(res)
if res.StatusCode != http.StatusOK {
content := res.Header.Get("Content-Type")
@ -211,17 +212,10 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
}
}
_ = res.Body.Close()
return Release{}, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
}
buf, err := ioutil.ReadAll(res.Body)
if err != nil {
_ = res.Body.Close()
return Release{}, err
}
err = res.Body.Close()
if err != nil {
return Release{}, err
}
@ -265,18 +259,13 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) {
if err != nil {
return nil, err
}
defer util.CloseResponse(res)
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
}
buf, err := ioutil.ReadAll(res.Body)
if err != nil {
_ = res.Body.Close()
return nil, err
}
err = res.Body.Close()
if err != nil {
return nil, err
}

2
weed/filer/arangodb/arangodb_store.go

@ -121,7 +121,7 @@ func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Conte
return nil, err
}
return context.WithValue(ctx, transactionKey, txn), nil
return context.WithValue(driver.WithTransactionID(ctx, txn), transactionKey, txn), nil
}
func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {

3
weed/filer/arangodb/arangodb_store_bucket.go

@ -34,6 +34,9 @@ func (store *ArangodbStore) OnBucketDeletion(bucket string) {
glog.Errorf("bucket delete %s: %v", bucket, err)
return
}
store.mu.Lock()
delete(store.buckets, bucket)
store.mu.Unlock()
}
func (store *ArangodbStore) CanDropWholeBucket() bool {
return true

2
weed/filer/arangodb/helpers.go

@ -86,7 +86,7 @@ func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc
store.mu.RLock()
bc, ok = store.buckets[bucket]
store.mu.RUnlock()
if ok {
if ok && bc != nil {
return bc, nil
}
store.mu.Lock()

4
weed/filer/filer.go

@ -32,6 +32,8 @@ var (
)
type Filer struct {
UniqueFilerId int32
UniqueFilerEpoch int32
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
@ -45,8 +47,6 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
UniqueFilerId int32
UniqueFilerEpoch int32
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,

2
weed/filer/filer_deletion.go

@ -58,7 +58,7 @@ func (f *Filer) loopProcessingDeletion() {
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
}
} else {
glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
glog.V(2).Infof("deleting fileIds len=%d", deletionCount)
}
}
})

5
weed/filer/meta_aggregator.go

@ -9,6 +9,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
@ -194,13 +195,13 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ma.filer.UniqueFilerEpoch++
atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1)
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "filer:" + string(self),
PathPrefix: "/",
SinceNs: lastTsNs,
ClientId: ma.filer.UniqueFilerId,
ClientEpoch: ma.filer.UniqueFilerEpoch,
ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)

2
weed/filer/rocksdb/rocksdb_store.go

@ -123,7 +123,7 @@ func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
key := genKey(dir, name)
data, err := store.db.Get(store.ro, key)
if data == nil {
if data == nil || !data.Exists() {
return nil, filer_pb.ErrNotFound
}
defer data.Free()

2
weed/filer/s3iam_conf.go

@ -43,7 +43,7 @@ func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error {
for _, cred := range ident.Credentials {
if userName, found := accessKeySet[cred.AccessKey]; !found {
accessKeySet[cred.AccessKey] = ident.Name
} else {
} else if userName != ident.Name {
return fmt.Errorf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)
}
}

35
weed/filer/s3iam_conf_test.go

@ -97,6 +97,41 @@ func TestCheckDuplicateAccessKey(t *testing.T) {
},
"",
},
{
&iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{
{
Name: "some_name",
Credentials: []*iam_pb.Credential{
{
AccessKey: "some_access_key1",
SecretKey: "some_secret_key1",
},
},
Actions: []string{
ACTION_ADMIN,
ACTION_READ,
ACTION_WRITE,
},
},
{
Name: "some_name",
Credentials: []*iam_pb.Credential{
{
AccessKey: "some_access_key1",
SecretKey: "some_secret_key1",
},
},
Actions: []string{
ACTION_READ,
ACTION_TAGGING,
ACTION_LIST,
},
},
},
},
"",
},
{
&iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{

11
weed/mount/filehandle.go

@ -77,6 +77,10 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
if fh.entry == nil {
return
}
// find the earliest incoming chunk
newChunks := chunks
earliestChunk := newChunks[0]
@ -86,10 +90,6 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
}
}
if fh.entry == nil {
return
}
// pick out-of-order chunks from existing chunks
for _, chunk := range fh.entry.Chunks {
if lessThan(earliestChunk, chunk) {
@ -110,7 +110,8 @@ func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
func (fh *FileHandle) CloseReader() {
if fh.reader != nil {
fh.reader.Close()
_ = fh.reader.Close()
fh.reader = nil
}
}

4
weed/mount/weedfs_file_read.go

@ -39,8 +39,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.Lock()
defer fh.Unlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)

31
weed/notification/gocdk_pub_sub/gocdk_pub_sub.go

@ -20,13 +20,14 @@ package gocdk_pub_sub
import (
"context"
"fmt"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"google.golang.org/protobuf/proto"
"net/url"
"path"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -51,31 +52,42 @@ func getPath(rawUrl string) string {
type GoCDKPubSub struct {
topicURL string
topic *pubsub.Topic
topicLock sync.RWMutex
}
func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
k.topicLock.Lock()
k.topic = topic
k.topicLock.Unlock()
k.doReconnect()
}
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
k.topicLock.RLock()
defer k.topicLock.RUnlock()
if k.topic.As(&conn) {
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
conn.Close()
go func(c *amqp.Connection) {
<-c.NotifyClose(make(chan *amqp.Error))
c.Close()
k.topicLock.RLock()
k.topic.Shutdown(context.Background())
k.topicLock.RUnlock()
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
k.doReconnect()
k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
break
}
glog.Error(err)
time.Sleep(time.Second)
}
}()
}(conn)
}
}
@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
if err != nil {
glog.Fatalf("Failed to open topic: %v", err)
}
k.topic = topic
k.doReconnect()
k.setTopic(topic)
return nil
}
@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
if err != nil {
return err
}
k.topicLock.RLock()
defer k.topicLock.RUnlock()
err = k.topic.Send(context.Background(), &pubsub.Message{
Body: bytes,
Metadata: map[string]string{"key": key},

5
weed/operation/chunked_file.go

@ -106,10 +106,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
if err != nil {
return written, err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
defer util.CloseResponse(resp)
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:

3
weed/operation/upload_content.go

@ -313,19 +313,20 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
}
// print("+")
resp, post_err := HttpClient.Do(req)
defer util.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerRequestCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
resp, post_err = HttpClient.Do(req)
defer util.CloseResponse(resp)
}
}
if post_err != nil {
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
}
// print("-")
defer util.CloseResponse(resp)
var ret UploadResult
etag := getEtag(resp)

1
weed/replication/sink/filersink/filer_sink.go

@ -133,6 +133,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Name: name,
IsDirectory: entry.IsDirectory,
Attributes: entry.Attributes,
Extended: entry.Extended,
Chunks: replicatedChunks,
Content: entry.Content,
RemoteEntry: entry.RemoteEntry,

2
weed/replication/sub/notification_gocdk_pub_sub.go

@ -5,10 +5,10 @@ package sub
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"

3
weed/s3api/s3api_object_copy_handlers.go

@ -174,11 +174,12 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
defer util.CloseResponse(resp)
defer dataReader.Close()
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)

6
weed/s3api/s3api_server.go

@ -28,7 +28,7 @@ type S3ApiServerOption struct {
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
AllowDeleteBucketNotEmpty bool
LocalFilerSocket *string
LocalFilerSocket string
DataCenter string
}
@ -59,7 +59,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
cb: NewCircuitBreaker(option),
}
if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" {
if option.LocalFilerSocket == "" {
s3ApiServer.client = &http.Client{Transport: &http.Transport{
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
@ -68,7 +68,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.client = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", *option.LocalFilerSocket)
return net.Dial("unix", option.LocalFilerSocket)
},
},
}

1
weed/server/common.go

@ -110,6 +110,7 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
m := make(map[string]interface{})
m["error"] = err.Error()
glog.V(1).Infof("error JSON response status %d: %s", httpStatus, m["error"])
writeJsonQuiet(w, r, httpStatus, m)
}

2
weed/server/filer_server_handlers_read.go

@ -96,7 +96,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
if err == filer_pb.ErrNotFound {
glog.V(1).Infof("Not found %s: %v", path, err)
glog.V(2).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
w.WriteHeader(http.StatusNotFound)
} else {

2
weed/server/filer_server_handlers_write_autochunk.go

@ -46,7 +46,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
if err != nil {
if strings.HasPrefix(err.Error(), "read input:") {
if strings.HasPrefix(err.Error(), "read input:") || err.Error() == io.ErrUnexpectedEOF.Error() {
writeJsonError(w, r, 499, err)
} else if strings.HasSuffix(err.Error(), "is a file") {
writeJsonError(w, r, http.StatusConflict, err)

12
weed/server/filer_server_handlers_write_upload.go

@ -52,6 +52,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var bytesBufferCounter int64
bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
var fileChunksLock sync.Mutex
var uploadErrLock sync.Mutex
for {
// need to throttle used byte buffer
@ -77,7 +78,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
uploadErrLock.Lock()
uploadErr = err
uploadErrLock.Unlock()
break
}
if chunkOffset == 0 && !isAppend {
@ -105,14 +108,19 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}()
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if uploadErr == nil && toChunkErr != nil {
if toChunkErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = toChunkErr
}
uploadErrLock.Unlock()
}
if chunk != nil {
fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk)
fileChunksSize := len(fileChunks)
fileChunksLock.Unlock()
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
}
}(chunkOffset)

4
weed/server/raft_hashicorp.go

@ -7,7 +7,7 @@ import (
"fmt"
transport "github.com/Jille/raft-grpc-transport"
"github.com/hashicorp/raft"
boltdb "github.com/hashicorp/raft-boltdb"
boltdb "github.com/hashicorp/raft-boltdb/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@ -75,7 +75,7 @@ func (s *RaftServer) UpdatePeers() {
s.RaftHashicorp.AddVoter(
raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
}
for peer, _ := range existsPeerName {
for peer := range existsPeerName {
if _, found := s.peers[peer]; !found {
glog.V(0).Infof("removing old peer: %s", peer)
s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)

14
weed/shell/command_fs_meta_load.go

@ -83,6 +83,13 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
if *verbose || lastLogTime.Add(time.Second).Before(time.Now()) {
if !*verbose {
lastLogTime = time.Now()
}
fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
}
fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x")
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: fullEntry.Dir,
@ -91,13 +98,6 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
if *verbose || lastLogTime.Add(time.Second).Before(time.Now()) {
if !*verbose {
lastLogTime = time.Now()
}
fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
}
if fullEntry.Entry.IsDirectory {
dirCount++
} else {

4
weed/shell/command_volume_check_disk.go

@ -129,10 +129,10 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
// find and make up the differences
if aHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, nonRepairThreshold); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode, a.location.dataNode, b.info.Id, err)
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
}
if bHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, nonRepairThreshold); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode, b.location.dataNode, a.info.Id, err)
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
}
return
}

35
weed/shell/command_volume_tier_move.go

@ -12,7 +12,11 @@ import (
"path/filepath"
"sync"
"time"
"context"
"errors"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -40,7 +44,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4]
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed.
@ -59,6 +63,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
replicationString := tierCommand.String("toReplication", "", "the new target replication setting");
if err = tierCommand.Parse(args); err != nil {
return nil
}
@ -119,7 +125,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src)
if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
}
}
@ -220,7 +226,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string ) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@ -230,8 +236,9 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
@ -240,6 +247,26 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
// If move is successful and replication is not empty, alter moved volume's replication setting
if *replicationString != "" {
err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: *replicationString,
})
if configureErr != nil {
return configureErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
if err != nil {
glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
}
}
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {

15
weed/storage/backend/s3_backend/s3_backend.go

@ -124,8 +124,6 @@ func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
// glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
Bucket: &s3backendStorageFile.backendStorage.bucket,
Key: &s3backendStorageFile.key,
@ -137,13 +135,16 @@ func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n
}
defer getObjectOutput.Body.Close()
glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
// glog.V(3).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
// glog.V(3).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
var readCount int
for {
if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
p = p[n:]
} else {
p = p[readCount:]
readCount, err = getObjectOutput.Body.Read(p)
n += readCount
if err != nil {
break
}
}

2
weed/storage/store.go

@ -549,7 +549,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
maxVolumeCount += int32(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
}
atomic.StoreInt32(&diskLocation.MaxVolumeCount, maxVolumeCount)
glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
glog.V(4).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
hasChanges = hasChanges || currentMaxVolumeCount != atomic.LoadInt32(&diskLocation.MaxVolumeCount)
}

5
weed/storage/volume.go

@ -36,6 +36,7 @@ type Volume struct {
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
superBlockAccessLock sync.Mutex
asyncRequestsChan chan *needle.AsyncRequest
lastModifiedTsSeconds uint64 // unix time in seconds
lastAppendAtNs uint64 // unix time in nanoseconds
@ -97,6 +98,8 @@ func (v *Volume) FileName(ext string) (fileName string) {
}
func (v *Volume) Version() needle.Version {
v.superBlockAccessLock.Lock()
defer v.superBlockAccessLock.Unlock()
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
}
@ -281,7 +284,7 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
glog.V(3).Infof("collectStatus volume %d", v.Id)
glog.V(4).Infof("collectStatus volume %d", v.Id)
if v.nm == nil || v.DataBackend == nil {
return

9
weed/storage/volume_read.go

@ -107,7 +107,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
readOption.IsOutOfRange = false
err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
}
buf := mem.Allocate(1024 * 1024)
buf := mem.Allocate(min(1024*1024, int(size)))
defer mem.Free(buf)
actualOffset := nv.Offset.ToActualOffset()
if readOption.IsOutOfRange {
@ -117,6 +117,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size)
}
func min(x, y int) int {
if x < y {
return x
}
return y
}
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
v.dataFileAccessLock.RLock()

12
weed/topology/volume_layout.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -103,6 +104,8 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
growRequestCount int32
growRequestTime time.Time
rp *super_block.ReplicaPlacement
ttl *needle.TTL
diskType types.DiskType
@ -114,8 +117,6 @@ type VolumeLayout struct {
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
growRequestCount int
growRequestTime time.Time
}
type VolumeLayoutStats struct {
@ -319,18 +320,19 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
}
func (vl *VolumeLayout) HasGrowRequest() bool {
if vl.growRequestCount > 0 && vl.growRequestTime.Add(time.Minute).After(time.Now()) {
if atomic.LoadInt32(&vl.growRequestCount) > 0 &&
vl.growRequestTime.Add(time.Minute).After(time.Now()) {
return true
}
return false
}
func (vl *VolumeLayout) AddGrowRequest() {
vl.growRequestTime = time.Now()
vl.growRequestCount++
atomic.AddInt32(&vl.growRequestCount, 1)
}
func (vl *VolumeLayout) DoneGrowRequest() {
vl.growRequestTime = time.Unix(0, 0)
vl.growRequestCount = 0
atomic.StoreInt32(&vl.growRequestCount, 0)
}
func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.24)
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.25)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)

6
weed/util/grace/signal_handling.go

@ -12,7 +12,7 @@ import (
var signalChan chan os.Signal
var hooks = make([]func(), 0)
var hookLock sync.Mutex
var hookLock sync.RWMutex
func init() {
signalChan = make(chan os.Signal, 1)
@ -27,10 +27,12 @@ func init() {
// syscall.SIGQUIT,
)
go func() {
for _ = range signalChan {
for range signalChan {
hookLock.RLock()
for _, hook := range hooks {
hook()
}
hookLock.RUnlock()
os.Exit(0)
}
}()

19
weed/util/http_util.go

@ -60,7 +60,7 @@ func Get(url string) ([]byte, bool, error) {
if err != nil {
return nil, true, err
}
defer response.Body.Close()
defer CloseResponse(response)
var reader io.ReadCloser
switch response.Header.Get("Content-Encoding") {
@ -242,8 +242,8 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if err != nil {
return 0, err
}
defer CloseResponse(r)
defer r.Body.Close()
if r.StatusCode >= 400 {
return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
@ -370,11 +370,11 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
return false, nil
}
func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.ReadCloser, error) {
func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
return nil, err
return nil, nil, err
}
if rangeHeader != "" {
req.Header.Add("Range", rangeHeader)
@ -388,10 +388,11 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.R
r, err := client.Do(req)
if err != nil {
return nil, err
return nil, nil, err
}
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
CloseResponse(r)
return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@ -399,15 +400,17 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.R
switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
default:
reader = r.Body
}
return reader, nil
return r, reader, nil
}
func CloseResponse(resp *http.Response) {
if resp == nil || resp.Body == nil {
return
}
reader := &CountingReader{reader: resp.Body}
io.Copy(io.Discard, reader)
resp.Body.Close()

36
weed/wdclient/masterclient.go

@ -23,12 +23,12 @@ type MasterClient struct {
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters map[string]pb.ServerAddress
grpcDialOption grpc.DialOption
vidMap
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
}
@ -61,12 +61,12 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
if err != nil {
return fmt.Errorf("LookupVolume failed: %v", err)
return fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
}
for vid, vidLocation := range resp.VolumeIdLocations {
for _, vidLoc := range vidLocation.Locations {
@ -91,9 +91,21 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
return
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()
return mc.currentMaster
}
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Lock()
mc.currentMaster = master
mc.currentMasterLock.Unlock()
}
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
return mc.currentMaster
return mc.getCurrentMaster()
}
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
@ -103,7 +115,7 @@ func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
func (mc *MasterClient) WaitUntilConnected() {
for {
if mc.currentMaster != "" {
if mc.getCurrentMaster() != "" {
return
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@ -151,8 +163,7 @@ func (mc *MasterClient) tryAllMasters() {
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
}
mc.currentMaster = ""
mc.setCurrentMaster("")
}
}
@ -204,7 +215,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
} else {
mc.resetVidMap()
}
mc.currentMaster = master
mc.setCurrentMaster(master)
for {
resp, err := stream.Recv()
@ -216,8 +227,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@ -279,10 +290,7 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
for mc.currentMaster == "" {
time.Sleep(3 * time.Second)
}
return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})

|||||||
100:0
Loading…
Cancel
Save