hilimd
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
214 changed files with 9903 additions and 5172 deletions
-
62.github/workflows/binaries_dev.yml
-
63.github/workflows/binaries_release.yml
-
22.github/workflows/cleanup.yml
-
63.github/workflows/container_dev.yml
-
65.github/workflows/container_latest.yml
-
17.github/workflows/container_release.yml
-
6.github/workflows/go.yml
-
66.github/workflows/release.yml
-
46.travis.yml
-
144Makefile
-
11README.md
-
6docker/Dockerfile
-
43docker/Dockerfile.gccgo_build
-
2docker/Dockerfile.go_build
-
2docker/Dockerfile.go_build_large
-
158go.mod
-
617go.sum
-
0k8s/helm_charts2/.helmignore
-
4k8s/helm_charts2/Chart.yaml
-
0k8s/helm_charts2/README.md
-
0k8s/helm_charts2/dashboards/seaweedfs-grafana-dashboard.json
-
0k8s/helm_charts2/templates/_helpers.tpl
-
0k8s/helm_charts2/templates/ca-cert.yaml
-
0k8s/helm_charts2/templates/cert-clusterissuer.yaml
-
0k8s/helm_charts2/templates/client-cert.yaml
-
0k8s/helm_charts2/templates/cronjob.yaml
-
0k8s/helm_charts2/templates/filer-cert.yaml
-
0k8s/helm_charts2/templates/filer-service-client.yaml
-
0k8s/helm_charts2/templates/filer-service.yaml
-
0k8s/helm_charts2/templates/filer-servicemonitor.yaml
-
0k8s/helm_charts2/templates/filer-statefulset.yaml
-
0k8s/helm_charts2/templates/ingress.yaml
-
0k8s/helm_charts2/templates/master-cert.yaml
-
0k8s/helm_charts2/templates/master-service.yaml
-
0k8s/helm_charts2/templates/master-statefulset.yaml
-
0k8s/helm_charts2/templates/s3-deployment.yaml
-
0k8s/helm_charts2/templates/s3-service.yaml
-
0k8s/helm_charts2/templates/s3-servicemonitor.yaml
-
0k8s/helm_charts2/templates/seaweedfs-grafana-dashboard.yaml
-
0k8s/helm_charts2/templates/seaweedfs-s3-secret.yaml
-
0k8s/helm_charts2/templates/secret-seaweedfs-db.yaml
-
0k8s/helm_charts2/templates/security-configmap.yaml
-
0k8s/helm_charts2/templates/service-account.yaml
-
0k8s/helm_charts2/templates/volume-cert.yaml
-
0k8s/helm_charts2/templates/volume-service.yaml
-
0k8s/helm_charts2/templates/volume-servicemonitor.yaml
-
0k8s/helm_charts2/templates/volume-statefulset.yaml
-
0k8s/helm_charts2/values.yaml
-
91other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
2other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
-
5other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
-
26other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
-
7other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
-
20other/java/client/src/main/proto/filer.proto
-
3626other/metrics/grafana_seaweedfs.json
-
16unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
-
11unmaintained/repeated_vacuum/repeated_vacuum.go
-
8weed/Makefile
-
9weed/command/filer.go
-
2weed/command/filer_backup.go
-
5weed/command/filer_cat.go
-
92weed/command/filer_copy.go
-
2weed/command/filer_meta_backup.go
-
2weed/command/filer_meta_tail.go
-
254weed/command/filer_remote_sync.go
-
387weed/command/filer_remote_sync_buckets.go
-
221weed/command/filer_remote_sync_dir.go
-
2weed/command/filer_sync.go
-
1weed/command/filer_sync_std.go
-
4weed/command/imports.go
-
7weed/command/master.go
-
5weed/command/master_follower.go
-
5weed/command/mount_notsupported.go
-
1weed/command/mount_std.go
-
3weed/command/msg_broker.go
-
12weed/command/scaffold/filer.toml
-
2weed/command/server.go
-
10weed/command/volume.go
-
2weed/filer/etcd/etcd_store.go
-
5weed/filer/filechunk_manifest.go
-
11weed/filer/filer_notify_append.go
-
7weed/filer/leveldb/leveldb_store.go
-
7weed/filer/leveldb2/leveldb2_store.go
-
14weed/filer/leveldb3/leveldb3_store.go
-
23weed/filer/read_remote.go
-
121weed/filer/remote_mapping.go
-
110weed/filer/remote_storage.go
-
6weed/filer/remote_storage_test.go
-
1weed/filer/sqlite/sqlite_store.go
-
1weed/filer/sqlite/sqlite_store_unsupported.go
-
143weed/filer/stream.go
-
5weed/filer/tikv/tikv.go
-
389weed/filer/tikv/tikv_store.go
-
50weed/filer/tikv/tikv_store_kv.go
-
14weed/filesys/meta_cache/meta_cache_subscribe.go
-
11weed/filesys/wfs_write.go
-
2weed/filesys/xattr.go
-
4weed/ftpd/ftp_server.go
-
11weed/messaging/broker/broker_append.go
-
23weed/operation/assign_file_id.go
@ -0,0 +1,62 @@ |
|||
name: "go: build dev binaries" |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
|
|||
build-latest-docker-image: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=raw,value=latest |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile |
|||
platforms: linux/amd64, linux/arm, linux/arm64 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -0,0 +1,63 @@ |
|||
# This is a basic workflow to help you get started with Actions |
|||
|
|||
name: "go: build versioned binaries" |
|||
|
|||
on: |
|||
push: |
|||
tags: |
|||
- '*' |
|||
|
|||
# Allows you to run this workflow manually from the Actions tab |
|||
workflow_dispatch: |
|||
|
|||
# A workflow run is made up of one or more jobs that can run sequentially or in parallel |
|||
jobs: |
|||
|
|||
build-release-binaries: |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [linux, windows, darwin, freebsd] |
|||
goarch: [amd64, arm, arm64] |
|||
exclude: |
|||
- goarch: arm |
|||
goos: darwin |
|||
- goarch: 386 |
|||
goos: darwin |
|||
- goarch: arm |
|||
goos: windows |
|||
- goarch: arm64 |
|||
goos: windows |
|||
|
|||
# Steps represent a sequence of tasks that will be executed as part of the job |
|||
steps: |
|||
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it |
|||
- uses: actions/checkout@v2 |
|||
- name: Go Release Binaries Normal Volume Size |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
# build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}" |
|||
- name: Go Release Large Disk Binaries |
|||
uses: wangyoucao577/go-release-action@v1.20 |
|||
with: |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "${{ matrix.goos }}_${{ matrix.goarch }}_large_disk" |
@ -1,22 +0,0 @@ |
|||
name: Cleanup |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
build: |
|||
name: Build |
|||
runs-on: ubuntu-latest |
|||
|
|||
steps: |
|||
|
|||
- name: Delete old release assets |
|||
uses: mknejp/delete-release-assets@v1 |
|||
with: |
|||
token: ${{ github.token }} |
|||
tag: dev |
|||
fail-if-no-assets: false |
|||
assets: | |
|||
weed-* |
@ -0,0 +1,63 @@ |
|||
name: "docker: build dev containers" |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
|
|||
build-dev-containers: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=raw,value=dev |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -1,66 +0,0 @@ |
|||
name: Release |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
build: |
|||
name: Build |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [linux, windows, darwin, freebsd ] |
|||
goarch: [amd64, arm] |
|||
exclude: |
|||
- goarch: arm |
|||
goos: darwin |
|||
- goarch: arm |
|||
goos: windows |
|||
|
|||
steps: |
|||
|
|||
- name: Check out code into the Go module directory |
|||
uses: actions/checkout@v2 |
|||
|
|||
- name: Wait for the deletion |
|||
uses: jakejarvis/wait-action@master |
|||
with: |
|||
time: '30s' |
|||
|
|||
- name: Set BUILD_TIME env |
|||
run: echo BUILD_TIME=$(date -u +%Y%m%d-%H%M) >> ${GITHUB_ENV} |
|||
|
|||
- name: Go Release Binaries |
|||
uses: wangyoucao577/go-release-action@v1.17 |
|||
with: |
|||
goversion: 1.16 |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
release_tag: dev |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed-large-disk |
|||
asset_name: "weed-large-disk-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}" |
|||
|
|||
- name: Go Release Binaries |
|||
uses: wangyoucao577/go-release-action@v1.17 |
|||
with: |
|||
goversion: 1.16 |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
release_tag: dev |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "weed-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}" |
@ -1,46 +0,0 @@ |
|||
sudo: false |
|||
language: go |
|||
go: |
|||
- 1.16.x |
|||
|
|||
before_install: |
|||
- export PATH=/home/travis/gopath/bin:$PATH |
|||
|
|||
install: |
|||
- export CGO_ENABLED="0" |
|||
- go env |
|||
|
|||
script: |
|||
- env GO111MODULE=on go test ./weed/... |
|||
|
|||
before_deploy: |
|||
- make release |
|||
deploy: |
|||
provider: releases |
|||
skip_cleanup: true |
|||
api_key: |
|||
secure: ERL986+ncQ8lwAJUYDrQ8s2/FxF/cyNIwJIFCqspnWxQgGNNyokET9HapmlPSxjpFRF0q6L2WCg9OY3mSVRq4oI6hg1igOQ12KlLyN71XSJ3c8w0Ay5ho48TQ9l3f3Iu97mntBCe9l0R9pnT8wj1VI8YJxloXwUMG2yeTjA9aBI= |
|||
file: |
|||
- build/linux_arm.tar.gz |
|||
- build/linux_arm64.tar.gz |
|||
- build/linux_386.tar.gz |
|||
- build/linux_amd64.tar.gz |
|||
- build/linux_amd64_large_disk.tar.gz |
|||
- build/darwin_amd64.tar.gz |
|||
- build/darwin_amd64_large_disk.tar.gz |
|||
- build/windows_386.zip |
|||
- build/windows_amd64.zip |
|||
- build/windows_amd64_large_disk.zip |
|||
- build/freebsd_arm.tar.gz |
|||
- build/freebsd_amd64.tar.gz |
|||
- build/freebsd_386.tar.gz |
|||
- build/netbsd_arm.tar.gz |
|||
- build/netbsd_amd64.tar.gz |
|||
- build/netbsd_386.tar.gz |
|||
- build/openbsd_arm.tar.gz |
|||
- build/openbsd_amd64.tar.gz |
|||
- build/openbsd_386.tar.gz |
|||
on: |
|||
tags: true |
|||
repo: chrislusf/seaweedfs |
|||
go: 1.16.x |
@ -1,144 +0,0 @@ |
|||
BINARY = weed/weed |
|||
package = github.com/chrislusf/seaweedfs/weed |
|||
|
|||
GO_FLAGS = #-v |
|||
SOURCE_DIR = ./weed/ |
|||
|
|||
appname := weed |
|||
|
|||
sources := $(wildcard *.go) |
|||
|
|||
COMMIT ?= $(shell git rev-parse --short HEAD) |
|||
LDFLAGS ?= -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${COMMIT} |
|||
|
|||
build = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -ldflags "-extldflags -static $(LDFLAGS)" -o build/$(appname)$(3) $(SOURCE_DIR) |
|||
tar = cd build && tar -cvzf $(1)_$(2).tar.gz $(appname)$(3) && rm $(appname)$(3) |
|||
zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3) |
|||
|
|||
build_large = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -tags 5BytesOffset -ldflags "-extldflags -static $(LDFLAGS)" -o build/$(appname)$(3) $(SOURCE_DIR) |
|||
tar_large = cd build && tar -cvzf $(1)_$(2)_large_disk.tar.gz $(appname)$(3) && rm $(appname)$(3) |
|||
zip_large = cd build && zip $(1)_$(2)_large_disk.zip $(appname)$(3) && rm $(appname)$(3) |
|||
|
|||
all: build |
|||
|
|||
.PHONY : clean deps build linux release windows_build darwin_build linux_build bsd_build clean |
|||
|
|||
clean: |
|||
go clean -i $(GO_FLAGS) $(SOURCE_DIR) |
|||
rm -f $(BINARY) |
|||
rm -rf build/ |
|||
|
|||
deps: |
|||
go get $(GO_FLAGS) -d $(SOURCE_DIR) |
|||
rm -rf /home/travis/gopath/src/github.com/coreos/etcd/vendor/golang.org/x/net/trace |
|||
rm -rf /home/travis/gopath/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace |
|||
|
|||
build: deps |
|||
go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o $(BINARY) $(SOURCE_DIR) |
|||
|
|||
install: deps |
|||
go install $(GO_FLAGS) -ldflags "$(LDFLAGS)" $(SOURCE_DIR) |
|||
|
|||
linux: deps |
|||
mkdir -p linux |
|||
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o linux/$(BINARY) $(SOURCE_DIR) |
|||
|
|||
release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_build 5_byte_arm64_build 5_byte_darwin_build 5_byte_windows_build |
|||
|
|||
##### LINUX BUILDS #####
|
|||
5_byte_linux_build: |
|||
$(call build_large,linux,amd64,) |
|||
$(call tar_large,linux,amd64) |
|||
|
|||
5_byte_darwin_build: |
|||
$(call build_large,darwin,amd64,) |
|||
$(call tar_large,darwin,amd64) |
|||
|
|||
5_byte_windows_build: |
|||
$(call build_large,windows,amd64,.exe) |
|||
$(call zip_large,windows,amd64,.exe) |
|||
|
|||
5_byte_arm_build: $(sources) |
|||
$(call build_large,linux,arm,) |
|||
$(call tar_large,linux,arm) |
|||
|
|||
5_byte_arm64_build: $(sources) |
|||
$(call build_large,linux,arm64,) |
|||
$(call tar_large,linux,arm64) |
|||
|
|||
linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz |
|||
|
|||
build/linux_386.tar.gz: $(sources) |
|||
$(call build,linux,386,) |
|||
$(call tar,linux,386) |
|||
|
|||
build/linux_amd64.tar.gz: $(sources) |
|||
$(call build,linux,amd64,) |
|||
$(call tar,linux,amd64) |
|||
|
|||
build/linux_arm.tar.gz: $(sources) |
|||
$(call build,linux,arm,) |
|||
$(call tar,linux,arm) |
|||
|
|||
build/linux_arm64.tar.gz: $(sources) |
|||
$(call build,linux,arm64,) |
|||
$(call tar,linux,arm64) |
|||
|
|||
##### DARWIN (MAC) BUILDS #####
|
|||
darwin_build: build/darwin_amd64.tar.gz |
|||
|
|||
build/darwin_amd64.tar.gz: $(sources) |
|||
$(call build,darwin,amd64,) |
|||
$(call tar,darwin,amd64) |
|||
|
|||
##### WINDOWS BUILDS #####
|
|||
windows_build: build/windows_386.zip build/windows_amd64.zip |
|||
|
|||
build/windows_386.zip: $(sources) |
|||
$(call build,windows,386,.exe) |
|||
$(call zip,windows,386,.exe) |
|||
|
|||
build/windows_amd64.zip: $(sources) |
|||
$(call build,windows,amd64,.exe) |
|||
$(call zip,windows,amd64,.exe) |
|||
|
|||
##### BSD BUILDS #####
|
|||
bsd_build: build/freebsd_arm.tar.gz build/freebsd_386.tar.gz build/freebsd_amd64.tar.gz \ |
|||
build/netbsd_arm.tar.gz build/netbsd_386.tar.gz build/netbsd_amd64.tar.gz \
|
|||
build/openbsd_arm.tar.gz build/openbsd_386.tar.gz build/openbsd_amd64.tar.gz |
|||
|
|||
build/freebsd_386.tar.gz: $(sources) |
|||
$(call build,freebsd,386,) |
|||
$(call tar,freebsd,386) |
|||
|
|||
build/freebsd_amd64.tar.gz: $(sources) |
|||
$(call build,freebsd,amd64,) |
|||
$(call tar,freebsd,amd64) |
|||
|
|||
build/freebsd_arm.tar.gz: $(sources) |
|||
$(call build,freebsd,arm,) |
|||
$(call tar,freebsd,arm) |
|||
|
|||
build/netbsd_386.tar.gz: $(sources) |
|||
$(call build,netbsd,386,) |
|||
$(call tar,netbsd,386) |
|||
|
|||
build/netbsd_amd64.tar.gz: $(sources) |
|||
$(call build,netbsd,amd64,) |
|||
$(call tar,netbsd,amd64) |
|||
|
|||
build/netbsd_arm.tar.gz: $(sources) |
|||
$(call build,netbsd,arm,) |
|||
$(call tar,netbsd,arm) |
|||
|
|||
build/openbsd_386.tar.gz: $(sources) |
|||
$(call build,openbsd,386,) |
|||
$(call tar,openbsd,386) |
|||
|
|||
build/openbsd_amd64.tar.gz: $(sources) |
|||
$(call build,openbsd,amd64,) |
|||
$(call tar,openbsd,amd64) |
|||
|
|||
build/openbsd_arm.tar.gz: $(sources) |
|||
$(call build,openbsd,arm,) |
|||
$(call tar,openbsd,arm) |
@ -0,0 +1,43 @@ |
|||
FROM gcc:11 as builder |
|||
RUN mkdir -p /go/src/github.com/chrislusf/ |
|||
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs |
|||
ARG BRANCH=${BRANCH:-master} |
|||
RUN cd /go/src/github.com/chrislusf/seaweedfs && git checkout $BRANCH |
|||
RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \ |
|||
&& apt-get update \ |
|||
&& apt-get install -y golang-src \ |
|||
&& export LDFLAGS="-X github.com/chrislusf/seaweedfs/weed/util.COMMIT=$(git rev-parse --short HEAD)" \ |
|||
&& CGO_ENABLED=0 go install -ldflags "-extldflags -static ${LDFLAGS}" -compiler=gccgo -tags gccgo,noasm |
|||
|
|||
FROM alpine AS final |
|||
LABEL author="Chris Lu" |
|||
COPY --from=builder /go/bin/weed /usr/bin/ |
|||
RUN mkdir -p /etc/seaweedfs |
|||
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml |
|||
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh |
|||
RUN apk add fuse # for weed mount |
|||
|
|||
# volume server gprc port |
|||
EXPOSE 18080 |
|||
# volume server http port |
|||
EXPOSE 8080 |
|||
# filer server gprc port |
|||
EXPOSE 18888 |
|||
# filer server http port |
|||
EXPOSE 8888 |
|||
# master server shared gprc port |
|||
EXPOSE 19333 |
|||
# master server shared http port |
|||
EXPOSE 9333 |
|||
# s3 server http port |
|||
EXPOSE 8333 |
|||
# webdav server http port |
|||
EXPOSE 7333 |
|||
|
|||
RUN mkdir -p /data/filerldb2 |
|||
|
|||
VOLUME /data |
|||
|
|||
RUN chmod +x /entrypoint.sh |
|||
|
|||
ENTRYPOINT ["/entrypoint.sh"] |
617
go.sum
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,5 +1,5 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "2.62" |
|||
version: "2.62" |
|||
appVersion: "2.67" |
|||
version: "2.67" |
3626
other/metrics/grafana_seaweedfs.json
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,387 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"math" |
|||
"math/rand" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { |
|||
|
|||
// read filer remote storage mount mappings
|
|||
if detectErr := option.collectRemoteStorageConf(); detectErr != nil { |
|||
return fmt.Errorf("read mount info: %v", detectErr) |
|||
} |
|||
|
|||
eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { |
|||
lastTime := time.Unix(0, lastTsNs) |
|||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) |
|||
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs) |
|||
}) |
|||
|
|||
lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir) |
|||
|
|||
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", |
|||
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|||
} |
|||
|
|||
func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { |
|||
|
|||
handleCreateBucket := func(entry *filer_pb.Entry) error { |
|||
if !entry.IsDirectory { |
|||
return nil |
|||
} |
|||
if entry.RemoteEntry != nil { |
|||
// this directory is imported from "remote.mount.buckets" or "remote.mount"
|
|||
return nil |
|||
} |
|||
if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" { |
|||
*option.createBucketAt = option.mappings.PrimaryBucketStorageName |
|||
glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt) |
|||
} |
|||
if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" { |
|||
for k := range option.mappings.Mappings { |
|||
*option.createBucketAt = k |
|||
glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt) |
|||
} |
|||
} |
|||
if *option.createBucketAt == "" { |
|||
return nil |
|||
} |
|||
remoteConf, found := option.remoteConfs[*option.createBucketAt] |
|||
if !found { |
|||
return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) |
|||
} |
|||
|
|||
client, err := remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
bucketName := strings.ToLower(entry.Name) |
|||
if *option.createBucketRandomSuffix { |
|||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
|||
if len(bucketName)+5 > 63 { |
|||
bucketName = bucketName[:58] |
|||
} |
|||
bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000) |
|||
} |
|||
|
|||
glog.V(0).Infof("create bucket %s", bucketName) |
|||
if err := client.CreateBucket(bucketName); err != nil { |
|||
return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err) |
|||
} |
|||
|
|||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|||
remoteLocation := &remote_pb.RemoteStorageLocation{ |
|||
Name: *option.createBucketAt, |
|||
Bucket: bucketName, |
|||
Path: "/", |
|||
} |
|||
|
|||
// need to add new mapping here before getting upates from metadata tailing
|
|||
option.mappings.Mappings[string(bucketPath)] = remoteLocation |
|||
|
|||
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) |
|||
|
|||
} |
|||
handleDeleteBucket := func(entry *filer_pb.Entry) error { |
|||
if !entry.IsDirectory { |
|||
return nil |
|||
} |
|||
|
|||
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) |
|||
if err != nil { |
|||
return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err) |
|||
} |
|||
|
|||
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) |
|||
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { |
|||
return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err) |
|||
} |
|||
|
|||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|||
|
|||
return filer.DeleteMountMapping(option, string(bucketPath)) |
|||
} |
|||
|
|||
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if message.NewEntry != nil { |
|||
// update
|
|||
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|||
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|||
if readErr != nil { |
|||
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
option.mappings = newMappings |
|||
} |
|||
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|||
} |
|||
option.remoteConfs[conf.Name] = conf |
|||
} |
|||
} else if message.OldEntry != nil { |
|||
// deletion
|
|||
if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) |
|||
} |
|||
delete(option.remoteConfs, conf.Name) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { |
|||
return handleEtcRemoteChanges(resp) |
|||
} |
|||
|
|||
if message.OldEntry == nil && message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
if message.NewParentPath == option.bucketsDir { |
|||
return handleCreateBucket(message.NewEntry) |
|||
} |
|||
if !filer.HasData(message.NewEntry) { |
|||
return nil |
|||
} |
|||
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath) |
|||
if !ok { |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
glog.V(2).Infof("create: %+v", resp) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping creating: %+v", resp) |
|||
return nil |
|||
} |
|||
dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
if resp.Directory == option.bucketsDir { |
|||
return handleDeleteBucket(message.OldEntry) |
|||
} |
|||
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) |
|||
if !ok { |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
glog.V(2).Infof("delete: %+v", resp) |
|||
dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.RemoveDirectory(dest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) |
|||
return client.DeleteFile(dest) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry != nil { |
|||
if resp.Directory == option.bucketsDir { |
|||
if message.NewParentPath == option.bucketsDir { |
|||
if message.OldEntry.Name == message.NewEntry.Name { |
|||
return nil |
|||
} |
|||
if err := handleCreateBucket(message.NewEntry); err != nil { |
|||
return err |
|||
} |
|||
if err := handleDeleteBucket(message.OldEntry); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
} |
|||
oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) |
|||
newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) |
|||
if oldOk && newOk { |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { |
|||
// update the same entry
|
|||
if message.NewEntry.IsDirectory { |
|||
// update directory property
|
|||
return nil |
|||
} |
|||
if filer.IsSameData(message.OldEntry, message.NewEntry) { |
|||
glog.V(2).Infof("update meta: %+v", resp) |
|||
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) |
|||
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry) |
|||
} else { |
|||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) |
|||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// the following is entry rename
|
|||
if oldOk { |
|||
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
return client.RemoveDirectory(oldDest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) |
|||
if err := client.DeleteFile(oldDest); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
if newOk { |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(newRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
return client.WriteDirectory(newDest, message.NewEntry) |
|||
} |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) |
|||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
return eachEntryFunc, nil |
|||
} |
|||
|
|||
func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { |
|||
bucket := util.FullPath(option.bucketsDir).Child(bucketName) |
|||
|
|||
var isMounted bool |
|||
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] |
|||
if !isMounted { |
|||
return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) |
|||
} |
|||
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] |
|||
if !hasClient { |
|||
return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) |
|||
} |
|||
|
|||
client, err = remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return nil, remoteStorageMountLocation, err |
|||
} |
|||
return client, remoteStorageMountLocation, nil |
|||
} |
|||
|
|||
func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { |
|||
bucket, ok = extractBucketPath(option.bucketsDir, actualDir) |
|||
if !ok { |
|||
return "", nil, nil, false |
|||
} |
|||
var isMounted bool |
|||
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] |
|||
if !isMounted { |
|||
glog.Warningf("%s is not mounted", bucket) |
|||
return "", nil, nil, false |
|||
} |
|||
var hasClient bool |
|||
remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name] |
|||
if !hasClient { |
|||
glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) |
|||
return "", nil, nil, false |
|||
} |
|||
return bucket, remoteStorageMountLocation, remoteConf, true |
|||
} |
|||
|
|||
func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { |
|||
if !strings.HasPrefix(dir, bucketsDir+"/") { |
|||
return "", false |
|||
} |
|||
parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) |
|||
return util.FullPath(bucketsDir).Child(parts[0]), true |
|||
} |
|||
|
|||
func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { |
|||
|
|||
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil { |
|||
return err |
|||
} else { |
|||
option.mappings = mappings |
|||
} |
|||
|
|||
option.remoteConfs = make(map[string]*remote_pb.RemoteConf) |
|||
var lastConfName string |
|||
err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
return nil |
|||
} |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(entry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) |
|||
} |
|||
option.remoteConfs[conf.Name] = conf |
|||
lastConfName = conf.Name |
|||
return nil |
|||
}, "", false, math.MaxUint32) |
|||
|
|||
if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 { |
|||
glog.V(0).Infof("%s is set to the default remote storage", lastConfName) |
|||
option.mappings.PrimaryBucketStorageName = lastConfName |
|||
} |
|||
|
|||
return |
|||
} |
@ -0,0 +1,221 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"os" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { |
|||
|
|||
// read filer remote storage mount mappings
|
|||
_, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) |
|||
if detectErr != nil { |
|||
return fmt.Errorf("read mount info: %v", detectErr) |
|||
} |
|||
|
|||
eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { |
|||
lastTime := time.Unix(0, lastTsNs) |
|||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) |
|||
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) |
|||
}) |
|||
|
|||
lastOffsetTs := collectLastSyncOffset(option, mountedDir) |
|||
|
|||
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", |
|||
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|||
} |
|||
|
|||
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|||
mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|||
if readErr != nil { |
|||
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
if remoteLoc, found := mappings.Mappings[mountedDir]; found { |
|||
if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { |
|||
glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) |
|||
} |
|||
} else { |
|||
glog.V(0).Infof("unmounted %s exiting ...", mountedDir) |
|||
os.Exit(0) |
|||
} |
|||
} |
|||
if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|||
} |
|||
remoteStorage = conf |
|||
if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { |
|||
client = newClient |
|||
} else { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { |
|||
return handleEtcRemoteChanges(resp) |
|||
} |
|||
|
|||
if message.OldEntry == nil && message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
if !filer.HasData(message.NewEntry) { |
|||
return nil |
|||
} |
|||
glog.V(2).Infof("create: %+v", resp) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping creating: %+v", resp) |
|||
return nil |
|||
} |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
glog.V(2).Infof("delete: %+v", resp) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.RemoveDirectory(dest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) |
|||
return client.DeleteFile(dest) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry != nil { |
|||
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
if message.NewEntry.IsDirectory { |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { |
|||
if filer.IsSameData(message.OldEntry, message.NewEntry) { |
|||
glog.V(2).Infof("update meta: %+v", resp) |
|||
return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) |
|||
} |
|||
} |
|||
glog.V(2).Infof("update: %+v", resp) |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) |
|||
if err := client.DeleteFile(oldDest); err != nil { |
|||
return err |
|||
} |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
return eachEntryFunc, nil |
|||
} |
|||
|
|||
func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time { |
|||
// 1. specified by timeAgo
|
|||
// 2. last offset timestamp for this directory
|
|||
// 3. directory creation time
|
|||
var lastOffsetTs time.Time |
|||
if *option.timeAgo == 0 { |
|||
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) |
|||
if err != nil { |
|||
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) |
|||
return time.Now() |
|||
} |
|||
|
|||
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) |
|||
if mountedDirEntry != nil { |
|||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { |
|||
lastOffsetTs = time.Unix(0, lastOffsetTsNs) |
|||
glog.V(0).Infof("resume from %v", lastOffsetTs) |
|||
} else { |
|||
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) |
|||
} |
|||
} else { |
|||
lastOffsetTs = time.Now() |
|||
} |
|||
} else { |
|||
lastOffsetTs = time.Now().Add(-*option.timeAgo) |
|||
} |
|||
return lastOffsetTs |
|||
} |
|||
|
|||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { |
|||
source := string(sourcePath[len(mountDir):]) |
|||
dest := util.FullPath(remoteMountLocation.Path).Child(source) |
|||
return &remote_pb.RemoteStorageLocation{ |
|||
Name: remoteMountLocation.Name, |
|||
Bucket: remoteMountLocation.Bucket, |
|||
Path: string(dest), |
|||
} |
|||
} |
|||
|
|||
func shouldSendToRemote(entry *filer_pb.Entry) bool { |
|||
if entry.RemoteEntry == nil { |
|||
return true |
|||
} |
|||
if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime { |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { |
|||
remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() |
|||
entry.RemoteEntry = remoteEntry |
|||
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: entry, |
|||
}) |
|||
return err |
|||
}) |
|||
} |
@ -0,0 +1,121 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/golang/protobuf/proto" |
|||
"google.golang.org/grpc" |
|||
) |
|||
|
|||
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) { |
|||
var oldContent []byte |
|||
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) |
|||
return readErr |
|||
}); readErr != nil { |
|||
return nil, readErr |
|||
} |
|||
|
|||
mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) |
|||
if readErr != nil { |
|||
return nil, fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) { |
|||
|
|||
// read current mapping
|
|||
var oldContent, newContent []byte |
|||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) |
|||
return err |
|||
}) |
|||
if err != nil { |
|||
if err != filer_pb.ErrNotFound { |
|||
return fmt.Errorf("read existing mapping: %v", err) |
|||
} |
|||
} |
|||
|
|||
// add new mapping
|
|||
newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation) |
|||
if err != nil { |
|||
return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) |
|||
} |
|||
|
|||
// save back
|
|||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("save mapping: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) { |
|||
|
|||
// read current mapping
|
|||
var oldContent, newContent []byte |
|||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) |
|||
return err |
|||
}) |
|||
if err != nil { |
|||
if err != filer_pb.ErrNotFound { |
|||
return fmt.Errorf("read existing mapping: %v", err) |
|||
} |
|||
} |
|||
|
|||
// add new mapping
|
|||
newContent, err = removeRemoteStorageMapping(oldContent, dir) |
|||
if err != nil { |
|||
return fmt.Errorf("delete mount %s: %v", dir, err) |
|||
} |
|||
|
|||
// save back
|
|||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("save mapping: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) { |
|||
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) |
|||
if unmarshalErr != nil { |
|||
// skip
|
|||
} |
|||
|
|||
// set the new mapping
|
|||
mappings.Mappings[dir] = storageLocation |
|||
|
|||
if newContent, err = proto.Marshal(mappings); err != nil { |
|||
return oldContent, fmt.Errorf("marshal mappings: %v", err) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { |
|||
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) |
|||
if unmarshalErr != nil { |
|||
return nil, unmarshalErr |
|||
} |
|||
|
|||
// set the new mapping
|
|||
delete(mappings.Mappings, dir) |
|||
|
|||
if newContent, err = proto.Marshal(mappings); err != nil { |
|||
return oldContent, fmt.Errorf("marshal mappings: %v", err) |
|||
} |
|||
|
|||
return |
|||
} |
@ -1,20 +1,20 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { |
|||
conf := &filer_pb.RemoteConf{ |
|||
conf := &remote_pb.RemoteConf{ |
|||
Name: "s7", |
|||
Type: "s3", |
|||
} |
|||
rs := NewFilerRemoteStorage() |
|||
rs.storageNameToConf[conf.Name] = conf |
|||
|
|||
rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{ |
|||
rs.mapDirectoryToRemoteStorage("/a/b/c", &remote_pb.RemoteStorageLocation{ |
|||
Name: "s7", |
|||
Bucket: "some", |
|||
Path: "/dir", |
@ -0,0 +1,5 @@ |
|||
package tikv |
|||
|
|||
/* |
|||
* This empty file is let go build can work without tikv tag |
|||
*/ |
@ -0,0 +1,389 @@ |
|||
//go:build tikv
|
|||
// +build tikv
|
|||
|
|||
package tikv |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"crypto/sha1" |
|||
"fmt" |
|||
"io" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/tikv/client-go/v2/tikv" |
|||
"github.com/tikv/client-go/v2/txnkv" |
|||
) |
|||
|
|||
var ( |
|||
_ filer.FilerStore = ((*TikvStore)(nil)) |
|||
) |
|||
|
|||
func init() { |
|||
filer.Stores = append(filer.Stores, &TikvStore{}) |
|||
} |
|||
|
|||
type TikvStore struct { |
|||
client *tikv.KVStore |
|||
deleteRangeConcurrency int |
|||
} |
|||
|
|||
// Basic APIs
|
|||
func (store *TikvStore) GetName() string { |
|||
return "tikv" |
|||
} |
|||
|
|||
func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { |
|||
pdAddrs := []string{} |
|||
pdAddrsStr := config.GetString(prefix + "pdaddrs") |
|||
for _, item := range strings.Split(pdAddrsStr, ",") { |
|||
pdAddrs = append(pdAddrs, strings.TrimSpace(item)) |
|||
} |
|||
drc := config.GetInt(prefix + "deleterange_concurrency") |
|||
if drc <= 0 { |
|||
drc = 1 |
|||
} |
|||
store.deleteRangeConcurrency = drc |
|||
return store.initialize(pdAddrs) |
|||
} |
|||
|
|||
func (store *TikvStore) initialize(pdAddrs []string) error { |
|||
client, err := tikv.NewTxnClient(pdAddrs) |
|||
store.client = client |
|||
return err |
|||
} |
|||
|
|||
func (store *TikvStore) Shutdown() { |
|||
err := store.client.Close() |
|||
if err != nil { |
|||
glog.V(0).Infof("Shutdown TiKV client got error: %v", err) |
|||
} |
|||
} |
|||
|
|||
// ~ Basic APIs
|
|||
|
|||
// Entry APIs
|
|||
func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { |
|||
dir, name := entry.DirAndName() |
|||
key := generateKey(dir, name) |
|||
|
|||
value, err := entry.EncodeAttributesAndChunks() |
|||
if err != nil { |
|||
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) |
|||
} |
|||
txn, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
return txn.Set(key, value) |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("persisting %s : %v", entry.FullPath, err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { |
|||
return store.InsertEntry(ctx, entry) |
|||
} |
|||
|
|||
func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { |
|||
dir, name := path.DirAndName() |
|||
key := generateKey(dir, name) |
|||
|
|||
txn, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
var value []byte = nil |
|||
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
val, err := txn.Get(context.TODO(), key) |
|||
if err == nil { |
|||
value = val |
|||
} |
|||
return err |
|||
}) |
|||
|
|||
if isNotExists(err) || value == nil { |
|||
return nil, filer_pb.ErrNotFound |
|||
} |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("get %s : %v", path, err) |
|||
} |
|||
|
|||
entry := &filer.Entry{ |
|||
FullPath: path, |
|||
} |
|||
err = entry.DecodeAttributesAndChunks(value) |
|||
if err != nil { |
|||
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) |
|||
} |
|||
return entry, nil |
|||
} |
|||
|
|||
func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { |
|||
dir, name := path.DirAndName() |
|||
key := generateKey(dir, name) |
|||
|
|||
txn, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
return txn.Delete(key) |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("delete %s : %v", path, err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// ~ Entry APIs
|
|||
|
|||
// Directory APIs
|
|||
func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { |
|||
directoryPrefix := genDirectoryKeyPrefix(path, "") |
|||
|
|||
txn, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
var ( |
|||
startKey []byte = nil |
|||
endKey []byte = nil |
|||
) |
|||
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
iter, err := txn.Iter(directoryPrefix, nil) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer iter.Close() |
|||
for iter.Valid() { |
|||
key := iter.Key() |
|||
endKey = key |
|||
if !bytes.HasPrefix(key, directoryPrefix) { |
|||
break |
|||
} |
|||
if startKey == nil { |
|||
startKey = key |
|||
} |
|||
|
|||
err = iter.Next() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
// Only one Key matched just delete it.
|
|||
if startKey != nil && bytes.Equal(startKey, endKey) { |
|||
return txn.Delete(startKey) |
|||
} |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("delete %s : %v", path, err) |
|||
} |
|||
|
|||
if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { |
|||
// has startKey and endKey and they are not equals, so use delete range
|
|||
_, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) |
|||
if err != nil { |
|||
return fmt.Errorf("delete %s : %v", path, err) |
|||
} |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { |
|||
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) |
|||
} |
|||
|
|||
func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { |
|||
lastFileName := "" |
|||
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) |
|||
lastFileStart := directoryPrefix |
|||
if startFileName != "" { |
|||
lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) |
|||
} |
|||
|
|||
txn, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return lastFileName, err |
|||
} |
|||
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
iter, err := txn.Iter(lastFileStart, nil) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer iter.Close() |
|||
i := int64(0) |
|||
first := true |
|||
for iter.Valid() { |
|||
if first { |
|||
first = false |
|||
if !includeStartFile { |
|||
if iter.Valid() { |
|||
// Check first item is lastFileStart
|
|||
if bytes.Equal(iter.Key(), lastFileStart) { |
|||
// Is lastFileStart and not include start file, just
|
|||
// ignore it.
|
|||
err = iter.Next() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
continue |
|||
} |
|||
} |
|||
} |
|||
} |
|||
// Check for limitation
|
|||
if limit > 0 { |
|||
i++ |
|||
if i > limit { |
|||
break |
|||
} |
|||
} |
|||
// Validate key prefix
|
|||
key := iter.Key() |
|||
if !bytes.HasPrefix(key, directoryPrefix) { |
|||
break |
|||
} |
|||
value := iter.Value() |
|||
|
|||
// Start process
|
|||
fileName := getNameFromKey(key) |
|||
if fileName != "" { |
|||
// Got file name, then generate the Entry
|
|||
entry := &filer.Entry{ |
|||
FullPath: util.NewFullPath(string(dirPath), fileName), |
|||
} |
|||
// Update lastFileName
|
|||
lastFileName = fileName |
|||
// Check for decode value.
|
|||
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { |
|||
// Got error just return the error
|
|||
glog.V(0).Infof("list %s : %v", entry.FullPath, err) |
|||
return err |
|||
} |
|||
// Run for each callback if return false just break the iteration
|
|||
if !eachEntryFunc(entry) { |
|||
break |
|||
} |
|||
} |
|||
// End process
|
|||
|
|||
err = iter.Next() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) |
|||
} |
|||
return lastFileName, nil |
|||
} |
|||
|
|||
// ~ Directory APIs
|
|||
|
|||
// Transaction Related APIs
|
|||
func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { |
|||
tx, err := store.client.Begin() |
|||
if err != nil { |
|||
return ctx, err |
|||
} |
|||
return context.WithValue(ctx, "tx", tx), nil |
|||
} |
|||
|
|||
func (store *TikvStore) CommitTransaction(ctx context.Context) error { |
|||
if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { |
|||
return tx.Commit(context.Background()) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (store *TikvStore) RollbackTransaction(ctx context.Context) error { |
|||
if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { |
|||
return tx.Rollback() |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// ~ Transaction Related APIs
|
|||
|
|||
// Transaction Wrapper
|
|||
type TxnWrapper struct { |
|||
*txnkv.KVTxn |
|||
inContext bool |
|||
} |
|||
|
|||
func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { |
|||
err := f(w.KVTxn) |
|||
if !w.inContext { |
|||
if err != nil { |
|||
w.KVTxn.Rollback() |
|||
return err |
|||
} |
|||
w.KVTxn.Commit(context.Background()) |
|||
return nil |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { |
|||
if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { |
|||
return &TxnWrapper{tx, true}, nil |
|||
} |
|||
txn, err := store.client.Begin() |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return &TxnWrapper{txn, false}, nil |
|||
} |
|||
|
|||
// ~ Transaction Wrapper
|
|||
|
|||
// Encoding Functions
|
|||
func hashToBytes(dir string) []byte { |
|||
h := sha1.New() |
|||
io.WriteString(h, dir) |
|||
b := h.Sum(nil) |
|||
return b |
|||
} |
|||
|
|||
func generateKey(dirPath, fileName string) []byte { |
|||
key := hashToBytes(dirPath) |
|||
key = append(key, []byte(fileName)...) |
|||
return key |
|||
} |
|||
|
|||
func getNameFromKey(key []byte) string { |
|||
return string(key[sha1.Size:]) |
|||
} |
|||
|
|||
func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { |
|||
keyPrefix = hashToBytes(string(fullpath)) |
|||
if len(startFileName) > 0 { |
|||
keyPrefix = append(keyPrefix, []byte(startFileName)...) |
|||
} |
|||
return keyPrefix |
|||
} |
|||
|
|||
func isNotExists(err error) bool { |
|||
if err == nil { |
|||
return false |
|||
} |
|||
if err.Error() == "not exist" { |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
// ~ Encoding Functions
|
@ -0,0 +1,50 @@ |
|||
//go:build tikv
|
|||
// +build tikv
|
|||
|
|||
package tikv |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/tikv/client-go/v2/txnkv" |
|||
) |
|||
|
|||
func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error { |
|||
tw, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return tw.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
return txn.Set(key, value) |
|||
}) |
|||
} |
|||
|
|||
func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { |
|||
tw, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
var data []byte = nil |
|||
err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
val, err := txn.Get(context.TODO(), key) |
|||
if err == nil { |
|||
data = val |
|||
} |
|||
return err |
|||
}) |
|||
if isNotExists(err) { |
|||
return data, filer.ErrKvNotFound |
|||
} |
|||
return data, err |
|||
} |
|||
|
|||
func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { |
|||
tw, err := store.getTxn(ctx) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return tw.RunInTxn(func(txn *txnkv.KVTxn) error { |
|||
return txn.Delete(key) |
|||
}) |
|||
} |
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue