hilimd
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
237 changed files with 7747 additions and 4333 deletions
-
105.github/workflows/binaries_dev.yml
-
54.github/workflows/binaries_release0.yml
-
15.github/workflows/binaries_release1.yml
-
54.github/workflows/binaries_release2.yml
-
54.github/workflows/binaries_release3.yml
-
2.github/workflows/container_dev.yml
-
121.github/workflows/container_release.yml
-
54.github/workflows/container_release1.yml
-
55.github/workflows/container_release2.yml
-
55.github/workflows/container_release3.yml
-
2README.md
-
59docker/Dockerfile.rocksdb_large
-
6docker/Makefile
-
35go.mod
-
335go.sum
-
4k8s/helm_charts2/Chart.yaml
-
2k8s/helm_charts2/templates/filer-servicemonitor.yaml
-
2k8s/helm_charts2/templates/ingress.yaml
-
2k8s/helm_charts2/templates/s3-servicemonitor.yaml
-
2k8s/helm_charts2/templates/volume-servicemonitor.yaml
-
2k8s/helm_charts2/values.yaml
-
BINnote/SeaweedFS_Gateway_RemoteObjectStore.png
-
4other/java/client/pom.xml
-
4other/java/client/pom.xml.deploy
-
2other/java/client/pom_debug.xml
-
2other/java/client/src/main/java/seaweedfs/client/ChunkCache.java
-
6other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
8other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
-
2other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
-
2other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
-
4other/java/client/src/main/proto/filer.proto
-
4other/java/examples/pom.xml
-
4other/java/hdfs2/dependency-reduced-pom.xml
-
4other/java/hdfs2/pom.xml
-
8other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
5other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
-
4other/java/hdfs3/dependency-reduced-pom.xml
-
4other/java/hdfs3/pom.xml
-
8other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
5other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
-
10test/s3/multipart/aws_upload.go
-
19unmaintained/diff_volume_servers/diff_volume_servers.go
-
4unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
-
5unmaintained/repeated_vacuum/repeated_vacuum.go
-
64unmaintained/stream_read_volume/stream_read_volume.go
-
3unmaintained/volume_tailer/volume_tailer.go
-
5weed/command/backup.go
-
4weed/command/benchmark.go
-
1weed/command/command.go
-
3weed/command/download.go
-
27weed/command/filer.go
-
4weed/command/filer_backup.go
-
4weed/command/filer_cat.go
-
123weed/command/filer_copy.go
-
4weed/command/filer_meta_backup.go
-
8weed/command/filer_meta_tail.go
-
117weed/command/filer_remote_gateway.go
-
29weed/command/filer_remote_gateway_buckets.go
-
73weed/command/filer_remote_sync.go
-
19weed/command/filer_remote_sync_dir.go
-
18weed/command/filer_sync.go
-
23weed/command/iam.go
-
2weed/command/imports.go
-
32weed/command/master.go
-
24weed/command/master_follower.go
-
20weed/command/mount_std.go
-
16weed/command/msg_broker.go
-
19weed/command/s3.go
-
30weed/command/scaffold/filer.toml
-
5weed/command/scaffold/master.toml
-
16weed/command/server.go
-
9weed/command/shell.go
-
8weed/command/upload.go
-
21weed/command/volume.go
-
35weed/command/webdav.go
-
5weed/filer.toml
-
13weed/filer/filer.go
-
12weed/filer/filer_notify.go
-
2weed/filer/filer_notify_append.go
-
8weed/filer/filer_search.go
-
5weed/filer/filerstore_translate_path.go
-
4weed/filer/filerstore_wrapper.go
-
6weed/filer/leveldb/leveldb_store_test.go
-
4weed/filer/leveldb2/leveldb2_store_test.go
-
4weed/filer/leveldb3/leveldb3_store_test.go
-
18weed/filer/meta_aggregator.go
-
2weed/filer/redis/universal_redis_store.go
-
2weed/filer/redis2/universal_redis_store.go
-
507weed/filer/redis3/ItemList.go
-
75weed/filer/redis3/item_list_serde.go
-
138weed/filer/redis3/kv_directory_children.go
-
210weed/filer/redis3/kv_directory_children_test.go
-
45weed/filer/redis3/redis_cluster_store.go
-
39weed/filer/redis3/redis_store.go
-
62weed/filer/redis3/skiplist_element_store.go
-
179weed/filer/redis3/universal_redis_store.go
-
42weed/filer/redis3/universal_redis_store_kv.go
-
2weed/filer/remote_mapping.go
-
4weed/filer/remote_storage.go
-
2weed/filer/rocksdb/rocksdb_store.go
@ -0,0 +1,54 @@ |
|||
# This is a basic workflow to help you get started with Actions |
|||
|
|||
name: "go: build versioned binaries for windows" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
|
|||
# Allows you to run this workflow manually from the Actions tab |
|||
workflow_dispatch: |
|||
|
|||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel |
|||
jobs: |
|||
|
|||
build-release-binaries_windows: |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [windows] |
|||
goarch: [amd64] |
|||
|
|||
# Steps represent a sequence of tasks that will be executed as part of the job |
|||
steps: |
|||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it |
|||
- uses: actions/checkout@v2 |
|||
- name: Go Release Binaries Normal Volume Size |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
# build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}" |
|||
- name: Go Release Large Disk Binaries |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}_large_disk" |
@ -0,0 +1,54 @@ |
|||
# This is a basic workflow to help you get started with Actions |
|||
|
|||
name: "go: build versioned binaries for darwin" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
|
|||
# Allows you to run this workflow manually from the Actions tab |
|||
workflow_dispatch: |
|||
|
|||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel |
|||
jobs: |
|||
|
|||
build-release-binaries_darwin: |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [darwin] |
|||
goarch: [amd64, arm64] |
|||
|
|||
# Steps represent a sequence of tasks that will be executed as part of the job |
|||
steps: |
|||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it |
|||
- uses: actions/checkout@v2 |
|||
- name: Go Release Binaries Normal Volume Size |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
# build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}" |
|||
- name: Go Release Large Disk Binaries |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}_large_disk" |
@ -0,0 +1,54 @@ |
|||
# This is a basic workflow to help you get started with Actions |
|||
|
|||
name: "go: build versioned binaries for freebsd" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
|
|||
# Allows you to run this workflow manually from the Actions tab |
|||
workflow_dispatch: |
|||
|
|||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel |
|||
jobs: |
|||
|
|||
build-release-binaries_freebsd: |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [freebsd] |
|||
goarch: [amd64, arm, arm64] |
|||
|
|||
# Steps represent a sequence of tasks that will be executed as part of the job |
|||
steps: |
|||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it |
|||
- uses: actions/checkout@v2 |
|||
- name: Go Release Binaries Normal Volume Size |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
# build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}" |
|||
- name: Go Release Large Disk Binaries |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}_large_disk" |
@ -1,121 +0,0 @@ |
|||
name: "docker: build release containers" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
build-default-release-container: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=ref,event=tag |
|||
flavor: | |
|||
latest=false |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
|||
build-large-release-container: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=ref,event=tag,suffix=_large_disk |
|||
flavor: | |
|||
latest=false |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build_large |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -0,0 +1,54 @@ |
|||
name: "docker: build release containers for normal volume" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
build-default-release-container: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
tags: | |
|||
type=ref,event=tag |
|||
flavor: | |
|||
latest=false |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -0,0 +1,55 @@ |
|||
name: "docker: build release containers for large volume" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
|
|||
build-large-release-container: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
tags: | |
|||
type=ref,event=tag,suffix=_large_disk |
|||
flavor: | |
|||
latest=false |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build_large |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -0,0 +1,55 @@ |
|||
name: "docker: build release containers for rocksdb" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
|
|||
build-large-release-container_rocksdb: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
tags: | |
|||
type=ref,event=tag,suffix=_large_disk_rocksdb |
|||
flavor: | |
|||
latest=false |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.rocksdb_large |
|||
platforms: linux/amd64 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -0,0 +1,59 @@ |
|||
FROM golang:1.17-buster as builder |
|||
|
|||
RUN apt-get update |
|||
RUN apt-get install -y build-essential libsnappy-dev zlib1g-dev libbz2-dev libgflags-dev liblz4-dev libzstd-dev |
|||
|
|||
ENV ROCKSDB_VERSION v6.22.1 |
|||
|
|||
# build RocksDB |
|||
RUN cd /tmp && \ |
|||
git clone https://github.com/facebook/rocksdb.git /tmp/rocksdb --depth 1 --single-branch --branch $ROCKSDB_VERSION && \ |
|||
cd rocksdb && \ |
|||
make static_lib && \ |
|||
make install-static |
|||
|
|||
ENV CGO_CFLAGS "-I/tmp/rocksdb/include" |
|||
ENV CGO_LDFLAGS "-L/tmp/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd" |
|||
|
|||
# build SeaweedFS |
|||
RUN mkdir -p /go/src/github.com/chrislusf/ |
|||
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs |
|||
ARG BRANCH=${BRANCH:-master} |
|||
RUN cd /go/src/github.com/chrislusf/seaweedfs && git checkout $BRANCH |
|||
RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \ |
|||
&& export LDFLAGS="-X github.com/chrislusf/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \ |
|||
&& go install -tags "5BytesOffset rocksdb" -ldflags "-extldflags -static ${LDFLAGS}" |
|||
|
|||
|
|||
FROM alpine AS final |
|||
LABEL author="Chris Lu" |
|||
COPY --from=builder /go/bin/weed /usr/bin/ |
|||
RUN mkdir -p /etc/seaweedfs |
|||
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml |
|||
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh |
|||
RUN apk add fuse snappy gflags |
|||
|
|||
# volume server gprc port |
|||
EXPOSE 18080 |
|||
# volume server http port |
|||
EXPOSE 8080 |
|||
# filer server gprc port |
|||
EXPOSE 18888 |
|||
# filer server http port |
|||
EXPOSE 8888 |
|||
# master server shared gprc port |
|||
EXPOSE 19333 |
|||
# master server shared http port |
|||
EXPOSE 9333 |
|||
# s3 server http port |
|||
EXPOSE 8333 |
|||
# webdav server http port |
|||
EXPOSE 7333 |
|||
|
|||
RUN mkdir -p /data/filerldb2 |
|||
|
|||
VOLUME /data |
|||
|
|||
RUN chmod +x /entrypoint.sh |
|||
|
|||
ENTRYPOINT ["/entrypoint.sh"] |
335
go.sum
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,5 +1,5 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "2.67" |
|||
version: "2.67" |
|||
appVersion: "2.71" |
|||
version: "2.71" |
After Width: 1017 | Height: 633 | Size: 127 KiB |
@ -0,0 +1,64 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"google.golang.org/grpc" |
|||
"io" |
|||
) |
|||
|
|||
var ( |
|||
volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") |
|||
volumeId = flag.Int("volumeId", -1, "a volume id to stream read") |
|||
grpcDialOption grpc.DialOption |
|||
) |
|||
|
|||
func main() { |
|||
flag.Parse() |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
|
|||
vid := uint32(*volumeId) |
|||
|
|||
eachNeedleFunc := func(resp *volume_server_pb.ReadAllNeedlesResponse) error { |
|||
fmt.Printf("%d,%x%08x %d\n", resp.VolumeId, resp.NeedleId, resp.Cookie, len(resp.NeedleBlob)) |
|||
return nil |
|||
} |
|||
|
|||
err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
defer cancel() |
|||
copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ |
|||
VolumeIds: []uint32{vid}, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
for { |
|||
resp, err := copyFileClient.Recv() |
|||
if errors.Is(err, io.EOF) { |
|||
break |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if err = eachNeedleFunc(resp); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
fmt.Printf("read %s: %v\n", *volumeServer, err) |
|||
} |
|||
|
|||
} |
|||
|
@ -0,0 +1,117 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"google.golang.org/grpc" |
|||
"os" |
|||
"time" |
|||
) |
|||
|
|||
type RemoteGatewayOptions struct { |
|||
filerAddress *string |
|||
grpcDialOption grpc.DialOption |
|||
readChunkFromFiler *bool |
|||
timeAgo *time.Duration |
|||
createBucketAt *string |
|||
createBucketRandomSuffix *bool |
|||
include *string |
|||
exclude *string |
|||
|
|||
mappings *remote_pb.RemoteStorageMapping |
|||
remoteConfs map[string]*remote_pb.RemoteConf |
|||
bucketsDir string |
|||
} |
|||
|
|||
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) |
|||
|
|||
func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
return fn(client) |
|||
}) |
|||
} |
|||
func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string { |
|||
return location.Url |
|||
} |
|||
|
|||
var ( |
|||
remoteGatewayOptions RemoteGatewayOptions |
|||
) |
|||
|
|||
func init() { |
|||
cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle
|
|||
remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") |
|||
remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") |
|||
remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") |
|||
remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") |
|||
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") |
|||
remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*") |
|||
remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*") |
|||
} |
|||
|
|||
var cmdFilerRemoteGateway = &Command{ |
|||
UsageLine: "filer.remote.gateway", |
|||
Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote object store", |
|||
Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote object store |
|||
|
|||
filer.remote.gateway listens on filer local buckets update events. |
|||
If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. |
|||
|
|||
weed filer.remote.sync -createBucketAt=cloud1 |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runFilerRemoteGateway(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
remoteGatewayOptions.grpcDialOption = grpcDialOption |
|||
|
|||
filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress) |
|||
|
|||
filerSource := &source.FilerSource{} |
|||
filerSource.DoInitialize( |
|||
filerAddress.ToHttpAddress(), |
|||
filerAddress.ToGrpcAddress(), |
|||
"/", // does not matter
|
|||
*remoteGatewayOptions.readChunkFromFiler, |
|||
) |
|||
|
|||
remoteGatewayOptions.bucketsDir = "/buckets" |
|||
// check buckets again
|
|||
remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { |
|||
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
remoteGatewayOptions.bucketsDir = resp.DirBuckets |
|||
return nil |
|||
}) |
|||
|
|||
// read filer remote storage mount mappings
|
|||
if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil { |
|||
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) |
|||
return true |
|||
} |
|||
|
|||
// synchronize /buckets folder
|
|||
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) |
|||
util.RetryForever("filer.remote.sync buckets", func() error { |
|||
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) |
|||
}, func(err error) bool { |
|||
if err != nil { |
|||
glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err) |
|||
} |
|||
return true |
|||
}) |
|||
return true |
|||
|
|||
} |
@ -0,0 +1,5 @@ |
|||
[redis3] |
|||
enabled = true |
|||
address = "localhost:6379" |
|||
password = "" |
|||
database = 0 |
@ -0,0 +1,507 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/util/skiplist" |
|||
"github.com/go-redis/redis/v8" |
|||
) |
|||
|
|||
type ItemList struct { |
|||
skipList *skiplist.SkipList |
|||
batchSize int |
|||
client redis.UniversalClient |
|||
prefix string |
|||
} |
|||
|
|||
func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList { |
|||
return &ItemList{ |
|||
skipList: skiplist.New(store), |
|||
batchSize: batchSize, |
|||
client: client, |
|||
prefix: prefix, |
|||
} |
|||
} |
|||
|
|||
/* |
|||
Be reluctant to create new nodes. Try to fit into either previous node or next node. |
|||
Prefer to add to previous node. |
|||
|
|||
There are multiple cases after finding the name for greater or equal node |
|||
1. found and node.Key == name |
|||
The node contains a batch with leading key the same as the name |
|||
nothing to do |
|||
2. no such node found or node.Key > name |
|||
|
|||
if no such node found |
|||
prevNode = list.LargestNode |
|||
|
|||
// case 2.1
|
|||
if previousNode contains name |
|||
nothing to do |
|||
|
|||
// prefer to add to previous node
|
|||
if prevNode != nil { |
|||
// case 2.2
|
|||
if prevNode has capacity |
|||
prevNode.add name, and save |
|||
return |
|||
// case 2.3
|
|||
split prevNode by name |
|||
} |
|||
|
|||
// case 2.4
|
|||
// merge into next node. Avoid too many nodes if adding data in reverse order.
|
|||
if nextNode is not nil and nextNode has capacity |
|||
delete nextNode.Key |
|||
nextNode.Key = name |
|||
nextNode.batch.add name |
|||
insert nodeNode.Key |
|||
return |
|||
|
|||
// case 2.5
|
|||
if prevNode is nil |
|||
insert new node with key = name, value = batch{name} |
|||
return |
|||
|
|||
*/ |
|||
|
|||
func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) { |
|||
ctx := context.Background() |
|||
pipe := nl.client.TxPipeline() |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
countOperation := pipe.ZLexCount(ctx, key, "-", "+") |
|||
scoreOperationt := pipe.ZScore(ctx, key, name) |
|||
if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil { |
|||
return false, 0, err |
|||
} |
|||
if err == redis.Nil { |
|||
err = nil |
|||
} |
|||
alreadyContains = scoreOperationt.Err() == nil |
|||
nodeSize = int(countOperation.Val()) |
|||
return |
|||
} |
|||
|
|||
func (nl *ItemList) WriteName(name string) error { |
|||
|
|||
lookupKey := []byte(name) |
|||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
// case 1: the name already exists as one leading key in the batch
|
|||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
var prevNodeReference *skiplist.SkipListElementReference |
|||
if !found { |
|||
prevNodeReference = nl.skipList.GetLargestNodeReference() |
|||
} |
|||
|
|||
if nextNode != nil && prevNode == nil { |
|||
prevNodeReference = nextNode.Prev |
|||
} |
|||
|
|||
if prevNodeReference != nil { |
|||
alreadyContains, nodeSize, err := nl.canAddMember(prevNodeReference, name) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if alreadyContains { |
|||
// case 2.1
|
|||
return nil |
|||
} |
|||
|
|||
// case 2.2
|
|||
if nodeSize < nl.batchSize { |
|||
return nl.NodeAddMember(prevNodeReference, name) |
|||
} |
|||
|
|||
// case 2.3
|
|||
x := nl.NodeInnerPosition(prevNodeReference, name) |
|||
y := nodeSize - x |
|||
addToX := x <= y |
|||
// add to a new node
|
|||
if x == 0 || y == 0 { |
|||
if err := nl.ItemAdd(lookupKey, 0, name); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
if addToX { |
|||
// collect names before name, add them to X
|
|||
namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
// delete skiplist reference to old node
|
|||
if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil { |
|||
return err |
|||
} |
|||
// add namesToY and name to a new X
|
|||
namesToX = append(namesToX, name) |
|||
if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil { |
|||
return nil |
|||
} |
|||
// remove names less than name from current Y
|
|||
if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
// point skip list to current Y
|
|||
if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil { |
|||
return nil |
|||
} |
|||
return nil |
|||
} else { |
|||
// collect names after name, add them to Y
|
|||
namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name) |
|||
if err != nil { |
|||
return nil |
|||
} |
|||
// add namesToY and name to a new Y
|
|||
namesToY = append(namesToY, name) |
|||
if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil { |
|||
return nil |
|||
} |
|||
// remove names after name from current X
|
|||
if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil { |
|||
return nil |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
} |
|||
|
|||
// case 2.4
|
|||
if nextNode != nil { |
|||
nodeSize := nl.NodeSize(nextNode.Reference()) |
|||
if nodeSize < nl.batchSize { |
|||
if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { |
|||
return err |
|||
} else { |
|||
if err := nl.ItemAdd(lookupKey, id, name); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
} |
|||
|
|||
// case 2.5
|
|||
// now prevNode is nil
|
|||
return nl.ItemAdd(lookupKey, 0, name) |
|||
} |
|||
|
|||
/* |
|||
// case 1: exists in nextNode
|
|||
if nextNode != nil && nextNode.Key == name { |
|||
remove from nextNode, update nextNode |
|||
// TODO: merge with prevNode if possible?
|
|||
return |
|||
} |
|||
if nextNode is nil |
|||
prevNode = list.Largestnode |
|||
if prevNode == nil and nextNode.Prev != nil |
|||
prevNode = load(nextNode.Prev) |
|||
|
|||
// case 2: does not exist
|
|||
// case 2.1
|
|||
if prevNode == nil { |
|||
return |
|||
} |
|||
// case 2.2
|
|||
if prevNameBatch does not contain name { |
|||
return |
|||
} |
|||
|
|||
// case 3
|
|||
delete from prevNameBatch |
|||
if prevNameBatch + nextNode < capacityList |
|||
// case 3.1
|
|||
merge |
|||
else |
|||
// case 3.2
|
|||
update prevNode |
|||
|
|||
|
|||
*/ |
|||
func (nl *ItemList) DeleteName(name string) error { |
|||
lookupKey := []byte(name) |
|||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// case 1
|
|||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { |
|||
if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { |
|||
return err |
|||
} |
|||
if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil { |
|||
return err |
|||
} |
|||
minName := nl.NodeMin(nextNode.Reference()) |
|||
if minName == "" { |
|||
return nl.NodeDelete(nextNode.Reference()) |
|||
} |
|||
return nl.ItemAdd([]byte(minName), nextNode.Id) |
|||
} |
|||
|
|||
if !found { |
|||
prevNode, err = nl.skipList.GetLargestNode() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if nextNode != nil && prevNode == nil { |
|||
prevNode, err = nl.skipList.LoadElement(nextNode.Prev) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
// case 2
|
|||
if prevNode == nil { |
|||
// case 2.1
|
|||
return nil |
|||
} |
|||
if !nl.NodeContainsItem(prevNode.Reference(), name) { |
|||
return nil |
|||
} |
|||
|
|||
// case 3
|
|||
if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil { |
|||
return err |
|||
} |
|||
prevSize := nl.NodeSize(prevNode.Reference()) |
|||
if prevSize == 0 { |
|||
if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
nextSize := nl.NodeSize(nextNode.Reference()) |
|||
if nextSize > 0 && prevSize+nextSize < nl.batchSize { |
|||
// case 3.1 merge nextNode and prevNode
|
|||
if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { |
|||
return err |
|||
} |
|||
nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "") |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil { |
|||
return err |
|||
} |
|||
return nl.NodeDelete(nextNode.Reference()) |
|||
} else { |
|||
// case 3.2 update prevNode
|
|||
// no action to take
|
|||
return nil |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error { |
|||
lookupKey := []byte(startFrom) |
|||
prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { |
|||
prevNode = nil |
|||
} |
|||
if !found { |
|||
prevNode, err = nl.skipList.GetLargestNode() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if prevNode != nil { |
|||
if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) { |
|||
return nil |
|||
} |
|||
} |
|||
|
|||
for nextNode != nil { |
|||
if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) { |
|||
return nil |
|||
} |
|||
nextNode, err = nl.skipList.LoadElement(nextNode.Next[0]) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (nl *ItemList) RemoteAllListElement() error { |
|||
|
|||
t := nl.skipList |
|||
|
|||
nodeRef := t.StartLevels[0] |
|||
for nodeRef != nil { |
|||
node, err := t.LoadElement(nodeRef) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if node == nil { |
|||
return nil |
|||
} |
|||
if err := t.DeleteElement(node); err != nil { |
|||
return err |
|||
} |
|||
if err := nl.NodeDelete(node.Reference()); err != nil { |
|||
return err |
|||
} |
|||
nodeRef = node.Next[0] |
|||
} |
|||
return nil |
|||
|
|||
} |
|||
|
|||
func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
_, err := nl.client.ZScore(context.Background(), key, item).Result() |
|||
if err == redis.Nil { |
|||
return false |
|||
} |
|||
if err == nil { |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int { |
|||
if node == nil { |
|||
return 0 |
|||
} |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val()) |
|||
} |
|||
|
|||
func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
var members []*redis.Z |
|||
for _, name := range names { |
|||
members = append(members, &redis.Z{ |
|||
Score: 0, |
|||
Member: name, |
|||
}) |
|||
} |
|||
return nl.client.ZAddNX(context.Background(), key, members...).Err() |
|||
} |
|||
func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
return nl.client.ZRem(context.Background(), key, name).Err() |
|||
} |
|||
|
|||
func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
return nl.client.Del(context.Background(), key).Err() |
|||
} |
|||
|
|||
func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val()) |
|||
} |
|||
|
|||
func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
slice := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ |
|||
Min: "-", |
|||
Max: "+", |
|||
Offset: 0, |
|||
Count: 1, |
|||
}).Val() |
|||
if len(slice) > 0 { |
|||
s := slice[0] |
|||
return s |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
if startFrom == "" { |
|||
startFrom = "-" |
|||
} else { |
|||
startFrom = "[" + startFrom |
|||
} |
|||
names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ |
|||
Min: startFrom, |
|||
Max: "+", |
|||
}).Val() |
|||
for _, n := range names { |
|||
if !visitNamesFn(n) { |
|||
return false |
|||
} |
|||
} |
|||
return true |
|||
} |
|||
|
|||
func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
if stopAt == "" { |
|||
stopAt = "+" |
|||
} else { |
|||
stopAt = "(" + stopAt |
|||
} |
|||
return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ |
|||
Min: "-", |
|||
Max: stopAt, |
|||
}).Result() |
|||
} |
|||
func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
if startFrom == "" { |
|||
startFrom = "-" |
|||
} else { |
|||
startFrom = "(" + startFrom |
|||
} |
|||
return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{ |
|||
Min: startFrom, |
|||
Max: "+", |
|||
}).Result() |
|||
} |
|||
|
|||
func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
if stopAt == "" { |
|||
stopAt = "+" |
|||
} else { |
|||
stopAt = "(" + stopAt |
|||
} |
|||
return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err() |
|||
} |
|||
func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error { |
|||
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer) |
|||
if startFrom == "" { |
|||
startFrom = "-" |
|||
} else { |
|||
startFrom = "(" + startFrom |
|||
} |
|||
return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err() |
|||
} |
|||
|
|||
func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error { |
|||
if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil { |
|||
return err |
|||
} else { |
|||
if len(names) > 0 { |
|||
return nl.NodeAddMember(&skiplist.SkipListElementReference{ |
|||
ElementPointer: id, |
|||
Key: lookupKey, |
|||
}, names...) |
|||
} |
|||
} |
|||
return nil |
|||
} |
@ -0,0 +1,75 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/util/skiplist" |
|||
"github.com/go-redis/redis/v8" |
|||
"github.com/golang/protobuf/proto" |
|||
) |
|||
|
|||
func LoadItemList(data []byte, prefix string, client redis.UniversalClient, store skiplist.ListStore, batchSize int) *ItemList { |
|||
|
|||
nl := &ItemList{ |
|||
skipList: skiplist.New(store), |
|||
batchSize: batchSize, |
|||
client: client, |
|||
prefix: prefix, |
|||
} |
|||
|
|||
if len(data) == 0 { |
|||
return nl |
|||
} |
|||
|
|||
message := &skiplist.SkipListProto{} |
|||
if err := proto.Unmarshal(data, message); err != nil { |
|||
glog.Errorf("loading skiplist: %v", err) |
|||
} |
|||
nl.skipList.MaxNewLevel = int(message.MaxNewLevel) |
|||
nl.skipList.MaxLevel = int(message.MaxLevel) |
|||
for i, ref := range message.StartLevels { |
|||
nl.skipList.StartLevels[i] = &skiplist.SkipListElementReference{ |
|||
ElementPointer: ref.ElementPointer, |
|||
Key: ref.Key, |
|||
} |
|||
} |
|||
for i, ref := range message.EndLevels { |
|||
nl.skipList.EndLevels[i] = &skiplist.SkipListElementReference{ |
|||
ElementPointer: ref.ElementPointer, |
|||
Key: ref.Key, |
|||
} |
|||
} |
|||
return nl |
|||
} |
|||
|
|||
func (nl *ItemList) HasChanges() bool { |
|||
return nl.skipList.HasChanges |
|||
} |
|||
|
|||
func (nl *ItemList) ToBytes() []byte { |
|||
message := &skiplist.SkipListProto{} |
|||
message.MaxNewLevel = int32(nl.skipList.MaxNewLevel) |
|||
message.MaxLevel = int32(nl.skipList.MaxLevel) |
|||
for _, ref := range nl.skipList.StartLevels { |
|||
if ref == nil { |
|||
break |
|||
} |
|||
message.StartLevels = append(message.StartLevels, &skiplist.SkipListElementReference{ |
|||
ElementPointer: ref.ElementPointer, |
|||
Key: ref.Key, |
|||
}) |
|||
} |
|||
for _, ref := range nl.skipList.EndLevels { |
|||
if ref == nil { |
|||
break |
|||
} |
|||
message.EndLevels = append(message.EndLevels, &skiplist.SkipListElementReference{ |
|||
ElementPointer: ref.ElementPointer, |
|||
Key: ref.Key, |
|||
}) |
|||
} |
|||
data, err := proto.Marshal(message) |
|||
if err != nil { |
|||
glog.Errorf("marshal skiplist: %v", err) |
|||
} |
|||
return data |
|||
} |
@ -0,0 +1,138 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/go-redis/redis/v8" |
|||
) |
|||
|
|||
const maxNameBatchSizeLimit = 1000000 |
|||
|
|||
func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { |
|||
|
|||
// lock and unlock
|
|||
mutex := redisStore.redsync.NewMutex(key + "lock") |
|||
if err := mutex.Lock(); err != nil { |
|||
return fmt.Errorf("lock %s: %v", key, err) |
|||
} |
|||
defer func() { |
|||
mutex.Unlock() |
|||
}() |
|||
|
|||
client := redisStore.Client |
|||
data, err := client.Get(ctx, key).Result() |
|||
if err != nil { |
|||
if err != redis.Nil { |
|||
return fmt.Errorf("read %s: %v", key, err) |
|||
} |
|||
} |
|||
store := newSkipListElementStore(key, client) |
|||
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) |
|||
|
|||
if err := nameList.WriteName(name); err != nil { |
|||
glog.Errorf("add %s %s: %v", key, name, err) |
|||
return err |
|||
} |
|||
|
|||
if !nameList.HasChanges() { |
|||
return nil |
|||
} |
|||
|
|||
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error { |
|||
|
|||
// lock and unlock
|
|||
mutex := redisStore.redsync.NewMutex(key + "lock") |
|||
if err := mutex.Lock(); err != nil { |
|||
return fmt.Errorf("lock %s: %v", key, err) |
|||
} |
|||
defer mutex.Unlock() |
|||
|
|||
client := redisStore.Client |
|||
data, err := client.Get(ctx, key).Result() |
|||
if err != nil { |
|||
if err != redis.Nil { |
|||
return fmt.Errorf("read %s: %v", key, err) |
|||
} |
|||
} |
|||
store := newSkipListElementStore(key, client) |
|||
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) |
|||
|
|||
if err := nameList.DeleteName(name); err != nil { |
|||
return err |
|||
} |
|||
if !nameList.HasChanges() { |
|||
return nil |
|||
} |
|||
|
|||
if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error { |
|||
|
|||
// lock and unlock
|
|||
mutex := redisStore.redsync.NewMutex(key + "lock") |
|||
if err := mutex.Lock(); err != nil { |
|||
return fmt.Errorf("lock %s: %v", key, err) |
|||
} |
|||
defer mutex.Unlock() |
|||
|
|||
client := redisStore.Client |
|||
data, err := client.Get(ctx, key).Result() |
|||
if err != nil { |
|||
if err != redis.Nil { |
|||
return fmt.Errorf("read %s: %v", key, err) |
|||
} |
|||
} |
|||
store := newSkipListElementStore(key, client) |
|||
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) |
|||
|
|||
if err = nameList.ListNames("", func(name string) bool { |
|||
if err := onDeleteFn(name); err != nil { |
|||
glog.Errorf("delete %s child %s: %v", key, name, err) |
|||
return false |
|||
} |
|||
return true |
|||
}); err != nil { |
|||
return err |
|||
} |
|||
|
|||
if err = nameList.RemoteAllListElement(); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
|||
|
|||
func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error { |
|||
client := redisStore.Client |
|||
data, err := client.Get(ctx, key).Result() |
|||
if err != nil { |
|||
if err != redis.Nil { |
|||
return fmt.Errorf("read %s: %v", key, err) |
|||
} |
|||
} |
|||
store := newSkipListElementStore(key, client) |
|||
nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit) |
|||
|
|||
if err = nameList.ListNames(startFileName, func(name string) bool { |
|||
return eachFn(name) |
|||
}); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
|
|||
} |
@ -0,0 +1,210 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/go-redis/redis/v8" |
|||
"github.com/stvp/tempredis" |
|||
"strconv" |
|||
"testing" |
|||
"time" |
|||
) |
|||
|
|||
var names = []string{ |
|||
"cassandra.in.sh", |
|||
"cassandra", |
|||
"debug-cql.bat", |
|||
"nodetool", |
|||
"nodetool.bat", |
|||
"source-conf.ps1", |
|||
"sstableloader", |
|||
"sstableloader.bat", |
|||
"sstablescrub", |
|||
"sstablescrub.bat", |
|||
"sstableupgrade", |
|||
"sstableupgrade.bat", |
|||
"sstableutil", |
|||
"sstableutil.bat", |
|||
"sstableverify", |
|||
"sstableverify.bat", |
|||
"stop-server", |
|||
"stop-server.bat", |
|||
"stop-server.ps1", |
|||
"cassandra.in.bat", |
|||
"cqlsh.py", |
|||
"cqlsh", |
|||
"cassandra.ps1", |
|||
"cqlsh.bat", |
|||
"debug-cql", |
|||
"cassandra.bat", |
|||
} |
|||
|
|||
func yTestNameList(t *testing.T) { |
|||
server, err := tempredis.Start(tempredis.Config{}) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer server.Term() |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Network: "unix", |
|||
Addr: server.Socket(), |
|||
}) |
|||
|
|||
store := newSkipListElementStore("/yyy/bin", client) |
|||
var data []byte |
|||
for _, name := range names { |
|||
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) |
|||
nameList.WriteName(name) |
|||
|
|||
nameList.ListNames("", func(name string) bool { |
|||
println(name) |
|||
return true |
|||
}) |
|||
|
|||
if nameList.HasChanges() { |
|||
data = nameList.ToBytes() |
|||
} |
|||
println() |
|||
} |
|||
|
|||
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) |
|||
nameList.ListNames("", func(name string) bool { |
|||
println(name) |
|||
return true |
|||
}) |
|||
|
|||
} |
|||
|
|||
func yBenchmarkNameList(b *testing.B) { |
|||
|
|||
server, err := tempredis.Start(tempredis.Config{}) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer server.Term() |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Network: "unix", |
|||
Addr: server.Socket(), |
|||
}) |
|||
|
|||
store := newSkipListElementStore("/yyy/bin", client) |
|||
var data []byte |
|||
for i := 0; i < b.N; i++ { |
|||
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) |
|||
|
|||
nameList.WriteName(strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx") |
|||
|
|||
if nameList.HasChanges() { |
|||
data = nameList.ToBytes() |
|||
} |
|||
} |
|||
} |
|||
|
|||
func BenchmarkRedis(b *testing.B) { |
|||
|
|||
server, err := tempredis.Start(tempredis.Config{}) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer server.Term() |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Network: "unix", |
|||
Addr: server.Socket(), |
|||
}) |
|||
|
|||
for i := 0; i < b.N; i++ { |
|||
client.ZAddNX(context.Background(), "/yyy/bin", &redis.Z{Score: 0, Member: strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx"}) |
|||
} |
|||
} |
|||
|
|||
func xTestNameListAdd(t *testing.T) { |
|||
|
|||
server, err := tempredis.Start(tempredis.Config{}) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer server.Term() |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Addr: "localhost:6379", |
|||
Password: "", |
|||
DB: 0, |
|||
}) |
|||
|
|||
client.FlushAll(context.Background()) |
|||
|
|||
N := 364800 |
|||
|
|||
ts0 := time.Now() |
|||
store := newSkipListElementStore("/y", client) |
|||
var data []byte |
|||
nameList := LoadItemList(data, "/y", client, store, 100000) |
|||
for i := 0; i < N; i++ { |
|||
nameList.WriteName(fmt.Sprintf("%8d", i)) |
|||
} |
|||
|
|||
ts1 := time.Now() |
|||
|
|||
for i := 0; i < N; i++ { |
|||
client.ZAddNX(context.Background(), "/x", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)}) |
|||
} |
|||
ts2 := time.Now() |
|||
|
|||
fmt.Printf("%v %v", ts1.Sub(ts0), ts2.Sub(ts1)) |
|||
|
|||
/* |
|||
keys := client.Keys(context.Background(), "/*m").Val() |
|||
for _, k := range keys { |
|||
println("key", k) |
|||
for i, v := range client.ZRangeByLex(context.Background(), k, &redis.ZRangeBy{ |
|||
Min: "-", |
|||
Max: "+", |
|||
}).Val() { |
|||
println(" ", i, v) |
|||
} |
|||
} |
|||
*/ |
|||
} |
|||
|
|||
func xBenchmarkNameList(b *testing.B) { |
|||
|
|||
server, err := tempredis.Start(tempredis.Config{}) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer server.Term() |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Addr: "localhost:6379", |
|||
Password: "", |
|||
DB: 0, |
|||
}) |
|||
|
|||
store := newSkipListElementStore("/yyy/bin", client) |
|||
var data []byte |
|||
for i := 0; i < b.N; i++ { |
|||
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit) |
|||
|
|||
nameList.WriteName(fmt.Sprintf("name %8d", i)) |
|||
|
|||
if nameList.HasChanges() { |
|||
data = nameList.ToBytes() |
|||
} |
|||
} |
|||
} |
|||
|
|||
func xBenchmarkRedis(b *testing.B) { |
|||
|
|||
client := redis.NewClient(&redis.Options{ |
|||
Addr: "localhost:6379", |
|||
Password: "", |
|||
DB: 0, |
|||
}) |
|||
|
|||
for i := 0; i < b.N; i++ { |
|||
client.ZAddNX(context.Background(), "/xxx/bin", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)}) |
|||
} |
|||
} |
@ -0,0 +1,45 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/go-redis/redis/v8" |
|||
"github.com/go-redsync/redsync/v4" |
|||
"github.com/go-redsync/redsync/v4/redis/goredis/v8" |
|||
) |
|||
|
|||
func init() { |
|||
filer.Stores = append(filer.Stores, &RedisCluster3Store{}) |
|||
} |
|||
|
|||
type RedisCluster3Store struct { |
|||
UniversalRedis3Store |
|||
} |
|||
|
|||
func (store *RedisCluster3Store) GetName() string { |
|||
return "redis_cluster3" |
|||
} |
|||
|
|||
func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) { |
|||
|
|||
configuration.SetDefault(prefix+"useReadOnly", false) |
|||
configuration.SetDefault(prefix+"routeByLatency", false) |
|||
|
|||
return store.initialize( |
|||
configuration.GetStringSlice(prefix+"addresses"), |
|||
configuration.GetString(prefix+"password"), |
|||
configuration.GetBool(prefix+"useReadOnly"), |
|||
configuration.GetBool(prefix+"routeByLatency"), |
|||
) |
|||
} |
|||
|
|||
func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { |
|||
store.Client = redis.NewClusterClient(&redis.ClusterOptions{ |
|||
Addrs: addresses, |
|||
Password: password, |
|||
ReadOnly: readOnly, |
|||
RouteByLatency: routeByLatency, |
|||
}) |
|||
store.redsync = redsync.New(goredis.NewPool(store.Client)) |
|||
return |
|||
} |
@ -0,0 +1,39 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/go-redis/redis/v8" |
|||
"github.com/go-redsync/redsync/v4" |
|||
"github.com/go-redsync/redsync/v4/redis/goredis/v8" |
|||
) |
|||
|
|||
func init() { |
|||
filer.Stores = append(filer.Stores, &Redis3Store{}) |
|||
} |
|||
|
|||
type Redis3Store struct { |
|||
UniversalRedis3Store |
|||
} |
|||
|
|||
func (store *Redis3Store) GetName() string { |
|||
return "redis3" |
|||
} |
|||
|
|||
func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) { |
|||
return store.initialize( |
|||
configuration.GetString(prefix+"address"), |
|||
configuration.GetString(prefix+"password"), |
|||
configuration.GetInt(prefix+"database"), |
|||
) |
|||
} |
|||
|
|||
func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) { |
|||
store.Client = redis.NewClient(&redis.Options{ |
|||
Addr: hostPort, |
|||
Password: password, |
|||
DB: database, |
|||
}) |
|||
store.redsync = redsync.New(goredis.NewPool(store.Client)) |
|||
return |
|||
} |
@ -0,0 +1,62 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/util/skiplist" |
|||
"github.com/go-redis/redis/v8" |
|||
"github.com/golang/protobuf/proto" |
|||
) |
|||
|
|||
type SkipListElementStore struct { |
|||
Prefix string |
|||
client redis.UniversalClient |
|||
} |
|||
|
|||
var _ = skiplist.ListStore(&SkipListElementStore{}) |
|||
|
|||
func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore { |
|||
return &SkipListElementStore{ |
|||
Prefix: prefix, |
|||
client: client, |
|||
} |
|||
} |
|||
|
|||
func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error { |
|||
key := fmt.Sprintf("%s%d", m.Prefix, id) |
|||
data, err := proto.Marshal(element) |
|||
if err != nil { |
|||
glog.Errorf("marshal %s: %v", key, err) |
|||
} |
|||
return m.client.Set(context.Background(), key, data, 0).Err() |
|||
} |
|||
|
|||
func (m *SkipListElementStore) DeleteElement(id int64) error { |
|||
key := fmt.Sprintf("%s%d", m.Prefix, id) |
|||
return m.client.Del(context.Background(), key).Err() |
|||
} |
|||
|
|||
func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) { |
|||
key := fmt.Sprintf("%s%d", m.Prefix, id) |
|||
data, err := m.client.Get(context.Background(), key).Result() |
|||
if err != nil { |
|||
if err == redis.Nil { |
|||
return nil, nil |
|||
} |
|||
return nil, err |
|||
} |
|||
t := &skiplist.SkipListElement{} |
|||
err = proto.Unmarshal([]byte(data), t) |
|||
if err == nil { |
|||
for i := 0; i < len(t.Next); i++ { |
|||
if t.Next[i].IsNil() { |
|||
t.Next[i] = nil |
|||
} |
|||
} |
|||
if t.Prev.IsNil() { |
|||
t.Prev = nil |
|||
} |
|||
} |
|||
return t, err |
|||
} |
@ -0,0 +1,179 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/go-redsync/redsync/v4" |
|||
"time" |
|||
|
|||
"github.com/go-redis/redis/v8" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
const ( |
|||
DIR_LIST_MARKER = "\x00" |
|||
) |
|||
|
|||
type UniversalRedis3Store struct { |
|||
Client redis.UniversalClient |
|||
redsync *redsync.Redsync |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) { |
|||
return ctx, nil |
|||
} |
|||
func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error { |
|||
return nil |
|||
} |
|||
func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error { |
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|||
|
|||
value, err := entry.EncodeAttributesAndChunks() |
|||
if err != nil { |
|||
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) |
|||
} |
|||
|
|||
if len(entry.Chunks) > 50 { |
|||
value = util.MaybeGzipData(value) |
|||
} |
|||
|
|||
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { |
|||
return fmt.Errorf("persisting %s : %v", entry.FullPath, err) |
|||
} |
|||
|
|||
dir, name := entry.FullPath.DirAndName() |
|||
|
|||
if name != "" { |
|||
if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil { |
|||
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|||
|
|||
return store.InsertEntry(ctx, entry) |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { |
|||
|
|||
data, err := store.Client.Get(ctx, string(fullpath)).Result() |
|||
if err == redis.Nil { |
|||
return nil, filer_pb.ErrNotFound |
|||
} |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("get %s : %v", fullpath, err) |
|||
} |
|||
|
|||
entry = &filer.Entry{ |
|||
FullPath: fullpath, |
|||
} |
|||
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) |
|||
if err != nil { |
|||
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) |
|||
} |
|||
|
|||
return entry, nil |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { |
|||
|
|||
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result() |
|||
if err != nil { |
|||
return fmt.Errorf("delete dir list %s : %v", fullpath, err) |
|||
} |
|||
|
|||
_, err = store.Client.Del(ctx, string(fullpath)).Result() |
|||
if err != nil { |
|||
return fmt.Errorf("delete %s : %v", fullpath, err) |
|||
} |
|||
|
|||
dir, name := fullpath.DirAndName() |
|||
|
|||
if name != "" { |
|||
if err = removeChild(ctx, store, genDirectoryListKey(dir), name); err != nil { |
|||
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { |
|||
|
|||
return removeChildren(ctx, store, genDirectoryListKey(string(fullpath)), func(name string) error { |
|||
path := util.NewFullPath(string(fullpath), name) |
|||
_, err = store.Client.Del(ctx, string(path)).Result() |
|||
if err != nil { |
|||
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) |
|||
} |
|||
// not efficient, but need to remove if it is a directory
|
|||
store.Client.Del(ctx, genDirectoryListKey(string(path))) |
|||
return nil |
|||
}) |
|||
|
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|||
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|||
|
|||
dirListKey := genDirectoryListKey(string(dirPath)) |
|||
counter := int64(0) |
|||
|
|||
err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool { |
|||
if startFileName != "" { |
|||
if !includeStartFile && startFileName == fileName { |
|||
return true |
|||
} |
|||
} |
|||
|
|||
path := util.NewFullPath(string(dirPath), fileName) |
|||
entry, err := store.FindEntry(ctx, path) |
|||
lastFileName = fileName |
|||
if err != nil { |
|||
glog.V(0).Infof("list %s : %v", path, err) |
|||
if err == filer_pb.ErrNotFound { |
|||
return true |
|||
} |
|||
} else { |
|||
if entry.TtlSec > 0 { |
|||
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { |
|||
store.Client.Del(ctx, string(path)).Result() |
|||
store.Client.ZRem(ctx, dirListKey, fileName).Result() |
|||
return true |
|||
} |
|||
} |
|||
counter++ |
|||
if !eachEntryFunc(entry) { |
|||
return false |
|||
} |
|||
if counter >= limit { |
|||
return false |
|||
} |
|||
} |
|||
return true |
|||
}) |
|||
|
|||
return lastFileName, err |
|||
} |
|||
|
|||
func genDirectoryListKey(dir string) (dirList string) { |
|||
return dir + DIR_LIST_MARKER |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) Shutdown() { |
|||
store.Client.Close() |
|||
} |
@ -0,0 +1,42 @@ |
|||
package redis3 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/go-redis/redis/v8" |
|||
) |
|||
|
|||
func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
_, err = store.Client.Set(ctx, string(key), value, 0).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv put: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
data, err := store.Client.Get(ctx, string(key)).Result() |
|||
|
|||
if err == redis.Nil { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return []byte(data), err |
|||
} |
|||
|
|||
func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
_, err = store.Client.Del(ctx, string(key)).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv delete: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue