297 changed files with 10627 additions and 6073 deletions
-
62.github/workflows/binaries_dev.yml
-
24.github/workflows/binaries_release.yml
-
22.github/workflows/cleanup.yml
-
63.github/workflows/container_dev.yml
-
65.github/workflows/container_latest.yml
-
17.github/workflows/container_release.yml
-
6.github/workflows/go.yml
-
66.github/workflows/release.yml
-
46.travis.yml.disabled
-
147Makefile
-
6README.md
-
4docker/Dockerfile
-
109go.mod
-
591go.sum
-
0k8s/helm_charts2/.helmignore
-
4k8s/helm_charts2/Chart.yaml
-
0k8s/helm_charts2/README.md
-
0k8s/helm_charts2/dashboards/seaweedfs-grafana-dashboard.json
-
0k8s/helm_charts2/templates/_helpers.tpl
-
0k8s/helm_charts2/templates/ca-cert.yaml
-
0k8s/helm_charts2/templates/cert-clusterissuer.yaml
-
0k8s/helm_charts2/templates/client-cert.yaml
-
0k8s/helm_charts2/templates/cronjob.yaml
-
0k8s/helm_charts2/templates/filer-cert.yaml
-
0k8s/helm_charts2/templates/filer-service-client.yaml
-
0k8s/helm_charts2/templates/filer-service.yaml
-
0k8s/helm_charts2/templates/filer-servicemonitor.yaml
-
0k8s/helm_charts2/templates/filer-statefulset.yaml
-
0k8s/helm_charts2/templates/ingress.yaml
-
0k8s/helm_charts2/templates/master-cert.yaml
-
0k8s/helm_charts2/templates/master-service.yaml
-
0k8s/helm_charts2/templates/master-statefulset.yaml
-
0k8s/helm_charts2/templates/s3-deployment.yaml
-
0k8s/helm_charts2/templates/s3-service.yaml
-
0k8s/helm_charts2/templates/s3-servicemonitor.yaml
-
0k8s/helm_charts2/templates/seaweedfs-grafana-dashboard.yaml
-
0k8s/helm_charts2/templates/seaweedfs-s3-secret.yaml
-
0k8s/helm_charts2/templates/secret-seaweedfs-db.yaml
-
0k8s/helm_charts2/templates/security-configmap.yaml
-
0k8s/helm_charts2/templates/service-account.yaml
-
0k8s/helm_charts2/templates/volume-cert.yaml
-
0k8s/helm_charts2/templates/volume-service.yaml
-
0k8s/helm_charts2/templates/volume-servicemonitor.yaml
-
0k8s/helm_charts2/templates/volume-statefulset.yaml
-
0k8s/helm_charts2/values.yaml
-
4other/java/client/pom.xml
-
4other/java/client/pom.xml.deploy
-
2other/java/client/pom_debug.xml
-
73other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
8other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
-
5other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
-
26other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
-
2other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
-
7other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java
-
24other/java/client/src/main/proto/filer.proto
-
4other/java/examples/pom.xml
-
4other/java/hdfs2/dependency-reduced-pom.xml
-
4other/java/hdfs2/pom.xml
-
8other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
5other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
-
4other/java/hdfs3/dependency-reduced-pom.xml
-
4other/java/hdfs3/pom.xml
-
8other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
5other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
-
644other/metrics/grafana_seaweedfs.json
-
19unmaintained/diff_volume_servers/diff_volume_servers.go
-
10unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
-
16unmaintained/repeated_vacuum/repeated_vacuum.go
-
3unmaintained/volume_tailer/volume_tailer.go
-
6weed/Makefile
-
5weed/command/backup.go
-
4weed/command/benchmark.go
-
1weed/command/command.go
-
3weed/command/download.go
-
31weed/command/filer.go
-
6weed/command/filer_backup.go
-
9weed/command/filer_cat.go
-
96weed/command/filer_copy.go
-
6weed/command/filer_meta_backup.go
-
4weed/command/filer_meta_tail.go
-
117weed/command/filer_remote_gateway.go
-
398weed/command/filer_remote_gateway_buckets.go
-
173weed/command/filer_remote_sync.go
-
222weed/command/filer_remote_sync_dir.go
-
20weed/command/filer_sync.go
-
1weed/command/filer_sync_std.go
-
19weed/command/iam.go
-
4weed/command/imports.go
-
33weed/command/master.go
-
25weed/command/master_follower.go
-
5weed/command/mount_notsupported.go
-
21weed/command/mount_std.go
-
19weed/command/msg_broker.go
-
17weed/command/s3.go
-
12weed/command/scaffold/filer.toml
-
17weed/command/server.go
-
9weed/command/shell.go
-
8weed/command/upload.go
-
26weed/command/volume.go
-
17weed/command/webdav.go
@ -0,0 +1,62 @@ |
|||
name: "go: build dev binaries" |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
|
|||
build-latest-docker-image: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=raw,value=latest |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile |
|||
platforms: linux/amd64, linux/arm, linux/arm64 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -1,22 +0,0 @@ |
|||
name: Cleanup |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
build: |
|||
name: Build |
|||
runs-on: ubuntu-latest |
|||
|
|||
steps: |
|||
|
|||
- name: Delete old release assets |
|||
uses: mknejp/delete-release-assets@v1 |
|||
with: |
|||
token: ${{ github.token }} |
|||
tag: dev |
|||
fail-if-no-assets: false |
|||
assets: | |
|||
weed-* |
@ -0,0 +1,63 @@ |
|||
name: "docker: build dev containers" |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
workflow_dispatch: [] |
|||
|
|||
jobs: |
|||
|
|||
build-dev-containers: |
|||
runs-on: [ubuntu-latest] |
|||
|
|||
steps: |
|||
- |
|||
name: Checkout |
|||
uses: actions/checkout@v2 |
|||
- |
|||
name: Docker meta |
|||
id: docker_meta |
|||
uses: docker/metadata-action@v3 |
|||
with: |
|||
images: | |
|||
chrislusf/seaweedfs |
|||
ghcr.io/chrislusf/seaweedfs |
|||
tags: | |
|||
type=raw,value=dev |
|||
labels: | |
|||
org.opencontainers.image.title=seaweedfs |
|||
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast! |
|||
org.opencontainers.image.vendor=Chris Lu |
|||
- |
|||
name: Set up QEMU |
|||
uses: docker/setup-qemu-action@v1 |
|||
- |
|||
name: Set up Docker Buildx |
|||
uses: docker/setup-buildx-action@v1 |
|||
with: |
|||
buildkitd-flags: "--debug" |
|||
- |
|||
name: Login to Docker Hub |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
username: ${{ secrets.DOCKER_USERNAME }} |
|||
password: ${{ secrets.DOCKER_PASSWORD }} |
|||
- |
|||
name: Login to GHCR |
|||
if: github.event_name != 'pull_request' |
|||
uses: docker/login-action@v1 |
|||
with: |
|||
registry: ghcr.io |
|||
username: ${{ secrets.GHCR_USERNAME }} |
|||
password: ${{ secrets.GHCR_TOKEN }} |
|||
- |
|||
name: Build |
|||
uses: docker/build-push-action@v2 |
|||
with: |
|||
context: ./docker |
|||
push: ${{ github.event_name != 'pull_request' }} |
|||
file: ./docker/Dockerfile.go_build |
|||
platforms: linux/amd64, linux/arm, linux/arm64, linux/386 |
|||
tags: ${{ steps.docker_meta.outputs.tags }} |
|||
labels: ${{ steps.docker_meta.outputs.labels }} |
@ -1,66 +0,0 @@ |
|||
name: Build Dev Binaries |
|||
|
|||
on: |
|||
push: |
|||
branches: [ master ] |
|||
|
|||
jobs: |
|||
|
|||
build: |
|||
name: Build |
|||
runs-on: ubuntu-latest |
|||
strategy: |
|||
matrix: |
|||
goos: [linux, windows, darwin, freebsd ] |
|||
goarch: [amd64, arm] |
|||
exclude: |
|||
- goarch: arm |
|||
goos: darwin |
|||
- goarch: arm |
|||
goos: windows |
|||
|
|||
steps: |
|||
|
|||
- name: Check out code into the Go module directory |
|||
uses: actions/checkout@v2 |
|||
|
|||
- name: Wait for the deletion |
|||
uses: jakejarvis/wait-action@master |
|||
with: |
|||
time: '30s' |
|||
|
|||
- name: Set BUILD_TIME env |
|||
run: echo BUILD_TIME=$(date -u +%Y%m%d-%H%M) >> ${GITHUB_ENV} |
|||
|
|||
- name: Go Release Binaries |
|||
uses: wangyoucao577/go-release-action@v1.17 |
|||
with: |
|||
goversion: 1.17 |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
release_tag: dev |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
build_flags: -tags 5BytesOffset # optional, default is |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed-large-disk |
|||
asset_name: "weed-large-disk-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}" |
|||
|
|||
- name: Go Release Binaries |
|||
uses: wangyoucao577/go-release-action@v1.17 |
|||
with: |
|||
goversion: 1.17 |
|||
github_token: ${{ secrets.GITHUB_TOKEN }} |
|||
goos: ${{ matrix.goos }} |
|||
goarch: ${{ matrix.goarch }} |
|||
release_tag: dev |
|||
overwrite: true |
|||
pre_command: export CGO_ENABLED=0 |
|||
ldflags: -extldflags -static -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${{github.sha}} |
|||
# Where to run `go build .` |
|||
project_path: weed |
|||
binary_name: weed |
|||
asset_name: "weed-${{ env.BUILD_TIME }}-${{ matrix.goos }}-${{ matrix.goarch }}" |
@ -1,46 +0,0 @@ |
|||
sudo: false |
|||
language: go |
|||
go: |
|||
- 1.17.x |
|||
|
|||
before_install: |
|||
- export PATH=/home/travis/gopath/bin:$PATH |
|||
|
|||
install: |
|||
- export CGO_ENABLED="0" |
|||
- go env |
|||
|
|||
script: |
|||
- env GO111MODULE=on go test ./weed/... |
|||
|
|||
before_deploy: |
|||
- make release |
|||
deploy: |
|||
provider: releases |
|||
skip_cleanup: true |
|||
api_key: |
|||
secure: ERL986+ncQ8lwAJUYDrQ8s2/FxF/cyNIwJIFCqspnWxQgGNNyokET9HapmlPSxjpFRF0q6L2WCg9OY3mSVRq4oI6hg1igOQ12KlLyN71XSJ3c8w0Ay5ho48TQ9l3f3Iu97mntBCe9l0R9pnT8wj1VI8YJxloXwUMG2yeTjA9aBI= |
|||
file: |
|||
- build/linux_arm.tar.gz |
|||
- build/linux_arm64.tar.gz |
|||
- build/linux_386.tar.gz |
|||
- build/linux_amd64.tar.gz |
|||
- build/linux_amd64_large_disk.tar.gz |
|||
- build/darwin_amd64.tar.gz |
|||
- build/darwin_amd64_large_disk.tar.gz |
|||
- build/windows_386.zip |
|||
- build/windows_amd64.zip |
|||
- build/windows_amd64_large_disk.zip |
|||
- build/freebsd_arm.tar.gz |
|||
- build/freebsd_amd64.tar.gz |
|||
- build/freebsd_386.tar.gz |
|||
- build/netbsd_arm.tar.gz |
|||
- build/netbsd_amd64.tar.gz |
|||
- build/netbsd_386.tar.gz |
|||
- build/openbsd_arm.tar.gz |
|||
- build/openbsd_amd64.tar.gz |
|||
- build/openbsd_386.tar.gz |
|||
on: |
|||
tags: true |
|||
repo: chrislusf/seaweedfs |
|||
go: 1.17.x |
@ -1,147 +0,0 @@ |
|||
BINARY = weed/weed |
|||
package = github.com/chrislusf/seaweedfs/weed |
|||
|
|||
GO_FLAGS = #-v |
|||
SOURCE_DIR = ./weed/ |
|||
|
|||
appname := weed |
|||
|
|||
sources := $(wildcard *.go) |
|||
|
|||
COMMIT ?= $(shell git rev-parse --short HEAD) |
|||
LDFLAGS ?= -X github.com/chrislusf/seaweedfs/weed/util.COMMIT=${COMMIT} |
|||
|
|||
build = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -ldflags "-extldflags -static $(LDFLAGS)" -o build/$(appname)$(3) $(SOURCE_DIR) |
|||
tar = cd build && tar -cvzf $(1)_$(2).tar.gz $(appname)$(3) && rm $(appname)$(3) |
|||
zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3) |
|||
|
|||
build_large = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -tags 5BytesOffset -ldflags "-extldflags -static $(LDFLAGS)" -o build/$(appname)$(3) $(SOURCE_DIR) |
|||
tar_large = cd build && tar -cvzf $(1)_$(2)_large_disk.tar.gz $(appname)$(3) && rm $(appname)$(3) |
|||
zip_large = cd build && zip $(1)_$(2)_large_disk.zip $(appname)$(3) && rm $(appname)$(3) |
|||
|
|||
all: build |
|||
|
|||
.PHONY : clean deps build linux release windows_build darwin_build linux_build bsd_build clean |
|||
|
|||
clean: |
|||
go clean -i $(GO_FLAGS) $(SOURCE_DIR) |
|||
rm -f $(BINARY) |
|||
rm -rf build/ |
|||
|
|||
deps: |
|||
go get $(GO_FLAGS) -d $(SOURCE_DIR) |
|||
rm -rf /home/travis/gopath/src/github.com/coreos/etcd/vendor/golang.org/x/net/trace |
|||
rm -rf /home/travis/gopath/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace |
|||
|
|||
build: deps |
|||
go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o $(BINARY) $(SOURCE_DIR) |
|||
|
|||
gccgo_build: deps |
|||
go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -compiler=gccgo -tags gccgo,noasm -o $(BINARY) $(SOURCE_DIR) |
|||
|
|||
install: deps |
|||
go install $(GO_FLAGS) -ldflags "$(LDFLAGS)" $(SOURCE_DIR) |
|||
|
|||
linux: deps |
|||
mkdir -p linux |
|||
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o linux/$(BINARY) $(SOURCE_DIR) |
|||
|
|||
release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_build 5_byte_arm64_build 5_byte_darwin_build 5_byte_windows_build |
|||
|
|||
##### LINUX BUILDS #####
|
|||
5_byte_linux_build: |
|||
$(call build_large,linux,amd64,) |
|||
$(call tar_large,linux,amd64) |
|||
|
|||
5_byte_darwin_build: |
|||
$(call build_large,darwin,amd64,) |
|||
$(call tar_large,darwin,amd64) |
|||
|
|||
5_byte_windows_build: |
|||
$(call build_large,windows,amd64,.exe) |
|||
$(call zip_large,windows,amd64,.exe) |
|||
|
|||
5_byte_arm_build: $(sources) |
|||
$(call build_large,linux,arm,) |
|||
$(call tar_large,linux,arm) |
|||
|
|||
5_byte_arm64_build: $(sources) |
|||
$(call build_large,linux,arm64,) |
|||
$(call tar_large,linux,arm64) |
|||
|
|||
linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz |
|||
|
|||
build/linux_386.tar.gz: $(sources) |
|||
$(call build,linux,386,) |
|||
$(call tar,linux,386) |
|||
|
|||
build/linux_amd64.tar.gz: $(sources) |
|||
$(call build,linux,amd64,) |
|||
$(call tar,linux,amd64) |
|||
|
|||
build/linux_arm.tar.gz: $(sources) |
|||
$(call build,linux,arm,) |
|||
$(call tar,linux,arm) |
|||
|
|||
build/linux_arm64.tar.gz: $(sources) |
|||
$(call build,linux,arm64,) |
|||
$(call tar,linux,arm64) |
|||
|
|||
##### DARWIN (MAC) BUILDS #####
|
|||
darwin_build: build/darwin_amd64.tar.gz |
|||
|
|||
build/darwin_amd64.tar.gz: $(sources) |
|||
$(call build,darwin,amd64,) |
|||
$(call tar,darwin,amd64) |
|||
|
|||
##### WINDOWS BUILDS #####
|
|||
windows_build: build/windows_386.zip build/windows_amd64.zip |
|||
|
|||
build/windows_386.zip: $(sources) |
|||
$(call build,windows,386,.exe) |
|||
$(call zip,windows,386,.exe) |
|||
|
|||
build/windows_amd64.zip: $(sources) |
|||
$(call build,windows,amd64,.exe) |
|||
$(call zip,windows,amd64,.exe) |
|||
|
|||
##### BSD BUILDS #####
|
|||
bsd_build: build/freebsd_arm.tar.gz build/freebsd_386.tar.gz build/freebsd_amd64.tar.gz \ |
|||
build/netbsd_arm.tar.gz build/netbsd_386.tar.gz build/netbsd_amd64.tar.gz \
|
|||
build/openbsd_arm.tar.gz build/openbsd_386.tar.gz build/openbsd_amd64.tar.gz |
|||
|
|||
build/freebsd_386.tar.gz: $(sources) |
|||
$(call build,freebsd,386,) |
|||
$(call tar,freebsd,386) |
|||
|
|||
build/freebsd_amd64.tar.gz: $(sources) |
|||
$(call build,freebsd,amd64,) |
|||
$(call tar,freebsd,amd64) |
|||
|
|||
build/freebsd_arm.tar.gz: $(sources) |
|||
$(call build,freebsd,arm,) |
|||
$(call tar,freebsd,arm) |
|||
|
|||
build/netbsd_386.tar.gz: $(sources) |
|||
$(call build,netbsd,386,) |
|||
$(call tar,netbsd,386) |
|||
|
|||
build/netbsd_amd64.tar.gz: $(sources) |
|||
$(call build,netbsd,amd64,) |
|||
$(call tar,netbsd,amd64) |
|||
|
|||
build/netbsd_arm.tar.gz: $(sources) |
|||
$(call build,netbsd,arm,) |
|||
$(call tar,netbsd,arm) |
|||
|
|||
build/openbsd_386.tar.gz: $(sources) |
|||
$(call build,openbsd,386,) |
|||
$(call tar,openbsd,386) |
|||
|
|||
build/openbsd_amd64.tar.gz: $(sources) |
|||
$(call build,openbsd,amd64,) |
|||
$(call tar,openbsd,amd64) |
|||
|
|||
build/openbsd_arm.tar.gz: $(sources) |
|||
$(call build,openbsd,arm,) |
|||
$(call tar,openbsd,arm) |
591
go.sum
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,5 +1,5 @@ |
|||
apiVersion: v1 |
|||
description: SeaweedFS |
|||
name: seaweedfs |
|||
appVersion: "2.63" |
|||
version: "2.63" |
|||
appVersion: "2.68" |
|||
version: "2.68" |
644
other/metrics/grafana_seaweedfs.json
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,117 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"google.golang.org/grpc" |
|||
"os" |
|||
"time" |
|||
) |
|||
|
|||
type RemoteGatewayOptions struct { |
|||
filerAddress *string |
|||
grpcDialOption grpc.DialOption |
|||
readChunkFromFiler *bool |
|||
timeAgo *time.Duration |
|||
createBucketAt *string |
|||
createBucketRandomSuffix *bool |
|||
include *string |
|||
exclude *string |
|||
|
|||
mappings *remote_pb.RemoteStorageMapping |
|||
remoteConfs map[string]*remote_pb.RemoteConf |
|||
bucketsDir string |
|||
} |
|||
|
|||
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) |
|||
|
|||
func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
return fn(client) |
|||
}) |
|||
} |
|||
func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string { |
|||
return location.Url |
|||
} |
|||
|
|||
var ( |
|||
remoteGatewayOptions RemoteGatewayOptions |
|||
) |
|||
|
|||
func init() { |
|||
cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle
|
|||
remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") |
|||
remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") |
|||
remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") |
|||
remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") |
|||
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") |
|||
remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*") |
|||
remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*") |
|||
} |
|||
|
|||
var cmdFilerRemoteGateway = &Command{ |
|||
UsageLine: "filer.remote.gateway", |
|||
Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote object store", |
|||
Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote object store |
|||
|
|||
filer.remote.gateway listens on filer local buckets update events. |
|||
If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. |
|||
|
|||
weed filer.remote.sync -createBucketAt=cloud1 |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runFilerRemoteGateway(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
remoteGatewayOptions.grpcDialOption = grpcDialOption |
|||
|
|||
filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress) |
|||
|
|||
filerSource := &source.FilerSource{} |
|||
filerSource.DoInitialize( |
|||
filerAddress.ToHttpAddress(), |
|||
filerAddress.ToGrpcAddress(), |
|||
"/", // does not matter
|
|||
*remoteGatewayOptions.readChunkFromFiler, |
|||
) |
|||
|
|||
remoteGatewayOptions.bucketsDir = "/buckets" |
|||
// check buckets again
|
|||
remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { |
|||
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
remoteGatewayOptions.bucketsDir = resp.DirBuckets |
|||
return nil |
|||
}) |
|||
|
|||
// read filer remote storage mount mappings
|
|||
if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil { |
|||
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) |
|||
return true |
|||
} |
|||
|
|||
// synchronize /buckets folder
|
|||
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) |
|||
util.RetryForever("filer.remote.sync buckets", func() error { |
|||
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) |
|||
}, func(err error) bool { |
|||
if err != nil { |
|||
glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err) |
|||
} |
|||
return true |
|||
}) |
|||
return true |
|||
|
|||
} |
@ -0,0 +1,398 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"math" |
|||
"math/rand" |
|||
"path/filepath" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { |
|||
|
|||
// read filer remote storage mount mappings
|
|||
if detectErr := option.collectRemoteStorageConf(); detectErr != nil { |
|||
return fmt.Errorf("read mount info: %v", detectErr) |
|||
} |
|||
|
|||
eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { |
|||
lastTime := time.Unix(0, lastTsNs) |
|||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) |
|||
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) |
|||
}) |
|||
|
|||
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) |
|||
|
|||
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", |
|||
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|||
} |
|||
|
|||
func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { |
|||
|
|||
handleCreateBucket := func(entry *filer_pb.Entry) error { |
|||
if !entry.IsDirectory { |
|||
return nil |
|||
} |
|||
if entry.RemoteEntry != nil { |
|||
// this directory is imported from "remote.mount.buckets" or "remote.mount"
|
|||
return nil |
|||
} |
|||
if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" { |
|||
*option.createBucketAt = option.mappings.PrimaryBucketStorageName |
|||
glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt) |
|||
} |
|||
if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" { |
|||
for k := range option.mappings.Mappings { |
|||
*option.createBucketAt = k |
|||
glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt) |
|||
} |
|||
} |
|||
if *option.createBucketAt == "" { |
|||
return nil |
|||
} |
|||
remoteConf, found := option.remoteConfs[*option.createBucketAt] |
|||
if !found { |
|||
return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) |
|||
} |
|||
|
|||
client, err := remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
bucketName := strings.ToLower(entry.Name) |
|||
if *option.include != "" { |
|||
if ok, _ := filepath.Match(*option.include, entry.Name); !ok { |
|||
return nil |
|||
} |
|||
} |
|||
if *option.exclude != "" { |
|||
if ok, _ := filepath.Match(*option.exclude, entry.Name); ok { |
|||
return nil |
|||
} |
|||
} |
|||
if *option.createBucketRandomSuffix { |
|||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
|||
if len(bucketName)+5 > 63 { |
|||
bucketName = bucketName[:58] |
|||
} |
|||
bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000) |
|||
} |
|||
|
|||
glog.V(0).Infof("create bucket %s", bucketName) |
|||
if err := client.CreateBucket(bucketName); err != nil { |
|||
return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err) |
|||
} |
|||
|
|||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|||
remoteLocation := &remote_pb.RemoteStorageLocation{ |
|||
Name: *option.createBucketAt, |
|||
Bucket: bucketName, |
|||
Path: "/", |
|||
} |
|||
|
|||
// need to add new mapping here before getting upates from metadata tailing
|
|||
option.mappings.Mappings[string(bucketPath)] = remoteLocation |
|||
|
|||
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) |
|||
|
|||
} |
|||
handleDeleteBucket := func(entry *filer_pb.Entry) error { |
|||
if !entry.IsDirectory { |
|||
return nil |
|||
} |
|||
|
|||
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) |
|||
if err != nil { |
|||
return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err) |
|||
} |
|||
|
|||
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) |
|||
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { |
|||
return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err) |
|||
} |
|||
|
|||
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) |
|||
|
|||
return filer.DeleteMountMapping(option, string(bucketPath)) |
|||
} |
|||
|
|||
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if message.NewEntry != nil { |
|||
// update
|
|||
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|||
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|||
if readErr != nil { |
|||
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
option.mappings = newMappings |
|||
} |
|||
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|||
} |
|||
option.remoteConfs[conf.Name] = conf |
|||
} |
|||
} else if message.OldEntry != nil { |
|||
// deletion
|
|||
if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) |
|||
} |
|||
delete(option.remoteConfs, conf.Name) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { |
|||
return handleEtcRemoteChanges(resp) |
|||
} |
|||
|
|||
if message.OldEntry == nil && message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
if message.NewParentPath == option.bucketsDir { |
|||
return handleCreateBucket(message.NewEntry) |
|||
} |
|||
if !filer.HasData(message.NewEntry) { |
|||
return nil |
|||
} |
|||
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath) |
|||
if !ok { |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
glog.V(2).Infof("create: %+v", resp) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping creating: %+v", resp) |
|||
return nil |
|||
} |
|||
dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
if resp.Directory == option.bucketsDir { |
|||
return handleDeleteBucket(message.OldEntry) |
|||
} |
|||
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) |
|||
if !ok { |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
glog.V(2).Infof("delete: %+v", resp) |
|||
dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.RemoveDirectory(dest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) |
|||
return client.DeleteFile(dest) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry != nil { |
|||
if resp.Directory == option.bucketsDir { |
|||
if message.NewParentPath == option.bucketsDir { |
|||
if message.OldEntry.Name == message.NewEntry.Name { |
|||
return nil |
|||
} |
|||
if err := handleCreateBucket(message.NewEntry); err != nil { |
|||
return err |
|||
} |
|||
if err := handleDeleteBucket(message.OldEntry); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
} |
|||
oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) |
|||
newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) |
|||
if oldOk && newOk { |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { |
|||
// update the same entry
|
|||
if message.NewEntry.IsDirectory { |
|||
// update directory property
|
|||
return nil |
|||
} |
|||
if filer.IsSameData(message.OldEntry, message.NewEntry) { |
|||
glog.V(2).Infof("update meta: %+v", resp) |
|||
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) |
|||
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry) |
|||
} else { |
|||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) |
|||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// the following is entry rename
|
|||
if oldOk { |
|||
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
return client.RemoveDirectory(oldDest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) |
|||
if err := client.DeleteFile(oldDest); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
if newOk { |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
client, err := remote_storage.GetRemoteStorage(newRemoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
return client.WriteDirectory(newDest, message.NewEntry) |
|||
} |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) |
|||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
return eachEntryFunc, nil |
|||
} |
|||
|
|||
func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { |
|||
bucket := util.FullPath(option.bucketsDir).Child(bucketName) |
|||
|
|||
var isMounted bool |
|||
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] |
|||
if !isMounted { |
|||
return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) |
|||
} |
|||
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] |
|||
if !hasClient { |
|||
return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) |
|||
} |
|||
|
|||
client, err = remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return nil, remoteStorageMountLocation, err |
|||
} |
|||
return client, remoteStorageMountLocation, nil |
|||
} |
|||
|
|||
func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { |
|||
bucket, ok = extractBucketPath(option.bucketsDir, actualDir) |
|||
if !ok { |
|||
return "", nil, nil, false |
|||
} |
|||
var isMounted bool |
|||
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] |
|||
if !isMounted { |
|||
glog.Warningf("%s is not mounted", bucket) |
|||
return "", nil, nil, false |
|||
} |
|||
var hasClient bool |
|||
remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name] |
|||
if !hasClient { |
|||
glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) |
|||
return "", nil, nil, false |
|||
} |
|||
return bucket, remoteStorageMountLocation, remoteConf, true |
|||
} |
|||
|
|||
func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { |
|||
if !strings.HasPrefix(dir, bucketsDir+"/") { |
|||
return "", false |
|||
} |
|||
parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) |
|||
return util.FullPath(bucketsDir).Child(parts[0]), true |
|||
} |
|||
|
|||
func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) { |
|||
|
|||
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { |
|||
return err |
|||
} else { |
|||
option.mappings = mappings |
|||
} |
|||
|
|||
option.remoteConfs = make(map[string]*remote_pb.RemoteConf) |
|||
var lastConfName string |
|||
err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { |
|||
return nil |
|||
} |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(entry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) |
|||
} |
|||
option.remoteConfs[conf.Name] = conf |
|||
lastConfName = conf.Name |
|||
return nil |
|||
}, "", false, math.MaxUint32) |
|||
|
|||
if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 { |
|||
glog.V(0).Infof("%s is set to the default remote storage", lastConfName) |
|||
option.mappings.PrimaryBucketStorageName = lastConfName |
|||
} |
|||
|
|||
return |
|||
} |
@ -0,0 +1,222 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"google.golang.org/grpc" |
|||
"os" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { |
|||
|
|||
// read filer remote storage mount mappings
|
|||
_, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) |
|||
if detectErr != nil { |
|||
return fmt.Errorf("read mount info: %v", detectErr) |
|||
} |
|||
|
|||
eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { |
|||
lastTime := time.Unix(0, lastTsNs) |
|||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) |
|||
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs) |
|||
}) |
|||
|
|||
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) |
|||
|
|||
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", |
|||
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|||
} |
|||
|
|||
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { |
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { |
|||
mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) |
|||
if readErr != nil { |
|||
return fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
if remoteLoc, found := mappings.Mappings[mountedDir]; found { |
|||
if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { |
|||
glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) |
|||
} |
|||
} else { |
|||
glog.V(0).Infof("unmounted %s exiting ...", mountedDir) |
|||
os.Exit(0) |
|||
} |
|||
} |
|||
if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { |
|||
conf := &remote_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) |
|||
} |
|||
remoteStorage = conf |
|||
if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { |
|||
client = newClient |
|||
} else { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { |
|||
return handleEtcRemoteChanges(resp) |
|||
} |
|||
|
|||
if message.OldEntry == nil && message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
if !filer.HasData(message.NewEntry) { |
|||
return nil |
|||
} |
|||
glog.V(2).Infof("create: %+v", resp) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping creating: %+v", resp) |
|||
return nil |
|||
} |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if message.NewEntry.IsDirectory { |
|||
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
glog.V(2).Infof("delete: %+v", resp) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
if message.OldEntry.IsDirectory { |
|||
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) |
|||
return client.RemoveDirectory(dest) |
|||
} |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) |
|||
return client.DeleteFile(dest) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry != nil { |
|||
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
glog.V(2).Infof("skipping updating: %+v", resp) |
|||
return nil |
|||
} |
|||
if message.NewEntry.IsDirectory { |
|||
return client.WriteDirectory(dest, message.NewEntry) |
|||
} |
|||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { |
|||
if filer.IsSameData(message.OldEntry, message.NewEntry) { |
|||
glog.V(2).Infof("update meta: %+v", resp) |
|||
return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) |
|||
} |
|||
} |
|||
glog.V(2).Infof("update: %+v", resp) |
|||
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) |
|||
if err := client.DeleteFile(oldDest); err != nil { |
|||
return err |
|||
} |
|||
reader := filer.NewFileReader(filerSource, message.NewEntry) |
|||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
return eachEntryFunc, nil |
|||
} |
|||
|
|||
func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time { |
|||
// 1. specified by timeAgo
|
|||
// 2. last offset timestamp for this directory
|
|||
// 3. directory creation time
|
|||
var lastOffsetTs time.Time |
|||
if timeAgo == 0 { |
|||
mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir)) |
|||
if err != nil { |
|||
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) |
|||
return time.Now() |
|||
} |
|||
|
|||
lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir) |
|||
if mountedDirEntry != nil { |
|||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { |
|||
lastOffsetTs = time.Unix(0, lastOffsetTsNs) |
|||
glog.V(0).Infof("resume from %v", lastOffsetTs) |
|||
} else { |
|||
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) |
|||
} |
|||
} else { |
|||
lastOffsetTs = time.Now() |
|||
} |
|||
} else { |
|||
lastOffsetTs = time.Now().Add(-timeAgo) |
|||
} |
|||
return lastOffsetTs |
|||
} |
|||
|
|||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { |
|||
source := string(sourcePath[len(mountDir):]) |
|||
dest := util.FullPath(remoteMountLocation.Path).Child(source) |
|||
return &remote_pb.RemoteStorageLocation{ |
|||
Name: remoteMountLocation.Name, |
|||
Bucket: remoteMountLocation.Bucket, |
|||
Path: string(dest), |
|||
} |
|||
} |
|||
|
|||
func shouldSendToRemote(entry *filer_pb.Entry) bool { |
|||
if entry.RemoteEntry == nil { |
|||
return true |
|||
} |
|||
if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime { |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { |
|||
remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() |
|||
entry.RemoteEntry = remoteEntry |
|||
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: entry, |
|||
}) |
|||
return err |
|||
}) |
|||
} |
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue