From ba3afd18037919861470f99a3554bd4c20c1d95c Mon Sep 17 00:00:00 2001 From: MaratKarimov Date: Sun, 30 Mar 2025 07:12:06 +0300 Subject: [PATCH] Tarantool filer store (#6669) Co-authored-by: Marat Karimov --- .github/workflows/binaries_release4.yml | 4 +- .github/workflows/container_release4.yml | 2 +- .github/workflows/container_release5.yml | 2 +- .github/workflows/go.yml | 4 +- Makefile | 4 +- docker/Dockerfile.tarantool.dev_env | 17 + docker/Makefile | 10 +- docker/compose/test-tarantool-filer.yml | 30 ++ docker/tarantool/app-scm-1.rockspec | 14 + docker/tarantool/config.yaml | 145 +++++++++ docker/tarantool/instances.yaml | 7 + docker/tarantool/router.lua | 77 +++++ docker/tarantool/storage.lua | 97 ++++++ go.mod | 4 + go.sum | 8 + weed/command/imports.go | 1 + weed/command/scaffold/filer.toml | 7 + weed/command/update_full.go | 4 +- weed/filer/store_test/test_suite.go | 17 +- weed/filer/tarantool/doc.go | 7 + weed/filer/tarantool/readme.md | 11 + weed/filer/tarantool/tarantool_store.go | 318 +++++++++++++++++++ weed/filer/tarantool/tarantool_store_kv.go | 95 ++++++ weed/filer/tarantool/tarantool_store_test.go | 24 ++ weed/server/filer_server.go | 1 + 25 files changed, 896 insertions(+), 14 deletions(-) create mode 100644 docker/Dockerfile.tarantool.dev_env create mode 100644 docker/compose/test-tarantool-filer.yml create mode 100644 docker/tarantool/app-scm-1.rockspec create mode 100644 docker/tarantool/config.yaml create mode 100644 docker/tarantool/instances.yaml create mode 100644 docker/tarantool/router.lua create mode 100644 docker/tarantool/storage.lua create mode 100644 weed/filer/tarantool/doc.go create mode 100644 weed/filer/tarantool/readme.md create mode 100644 weed/filer/tarantool/tarantool_store.go create mode 100644 weed/filer/tarantool/tarantool_store_kv.go create mode 100644 weed/filer/tarantool/tarantool_store_test.go diff --git a/.github/workflows/binaries_release4.yml b/.github/workflows/binaries_release4.yml index 447103ede..7ebbceff3 100644 --- a/.github/workflows/binaries_release4.yml +++ b/.github/workflows/binaries_release4.yml @@ -36,7 +36,7 @@ jobs: goos: ${{ matrix.goos }} goarch: ${{ matrix.goarch }} overwrite: true - build_flags: -tags elastic,gocdk,rclone,sqlite,tikv,ydb + build_flags: -tags elastic,gocdk,rclone,sqlite,tarantool,tikv,ydb pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 # build_flags: -tags 5BytesOffset # optional, default is ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} @@ -52,7 +52,7 @@ jobs: goarch: ${{ matrix.goarch }} overwrite: true pre_command: export CGO_ENABLED=0 && export GODEBUG=http2client=0 - build_flags: -tags 5BytesOffset,elastic,gocdk,rclone,sqlite,tikv,ydb + build_flags: -tags 5BytesOffset,elastic,gocdk,rclone,sqlite,tarantool,tikv,ydb ldflags: -s -w -extldflags -static -X github.com/seaweedfs/seaweedfs/weed/util.COMMIT=${{github.sha}} # Where to run `go build .` project_path: weed diff --git a/.github/workflows/container_release4.yml b/.github/workflows/container_release4.yml index 6119eee6d..bd7f04094 100644 --- a/.github/workflows/container_release4.yml +++ b/.github/workflows/container_release4.yml @@ -52,7 +52,7 @@ jobs: context: ./docker push: ${{ github.event_name != 'pull_request' }} file: ./docker/Dockerfile.go_build - build-args: TAGS=elastic,gocdk,rclone,sqlite,tikv,ydb + build-args: TAGS=elastic,gocdk,rclone,sqlite,tarantool,tikv,ydb platforms: linux/amd64 tags: ${{ steps.docker_meta.outputs.tags }} labels: ${{ steps.docker_meta.outputs.labels }} diff --git a/.github/workflows/container_release5.yml b/.github/workflows/container_release5.yml index 3c33cbe1f..0f8bc8705 100644 --- a/.github/workflows/container_release5.yml +++ b/.github/workflows/container_release5.yml @@ -52,7 +52,7 @@ jobs: context: ./docker push: ${{ github.event_name != 'pull_request' }} file: ./docker/Dockerfile.go_build - build-args: TAGS=5BytesOffset,elastic,gocdk,rclone,sqlite,tikv,ydb + build-args: TAGS=5BytesOffset,elastic,gocdk,rclone,sqlite,tarantool,tikv,ydb platforms: linux/amd64 tags: ${{ steps.docker_meta.outputs.tags }} labels: ${{ steps.docker_meta.outputs.labels }} diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b4de6707b..9888550be 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -34,7 +34,7 @@ jobs: cd weed; go get -v -t -d ./... - name: Build - run: cd weed; go build -tags "elastic gocdk sqlite ydb tikv rclone" -v . + run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v . - name: Test - run: cd weed; go test -tags "elastic gocdk sqlite ydb tikv rclone" -v ./... + run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./... diff --git a/Makefile b/Makefile index 17eceafd3..509f23e85 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ warp_install: go install github.com/minio/warp@v0.7.6 full_install: - cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone" + cd weed; go install -tags "elastic gocdk sqlite ydb tarantool tikv rclone" server: install weed -v 0 server -s3 -filer -filer.maxMB=64 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=true -s3.config=./docker/compose/s3.json -metricsPort=9324 @@ -34,4 +34,4 @@ benchmark_with_pprof: debug = 1 benchmark_with_pprof: benchmark test: - cd weed; go test -tags "elastic gocdk sqlite ydb tikv rclone" -v ./... + cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./... diff --git a/docker/Dockerfile.tarantool.dev_env b/docker/Dockerfile.tarantool.dev_env new file mode 100644 index 000000000..4ce0fc9af --- /dev/null +++ b/docker/Dockerfile.tarantool.dev_env @@ -0,0 +1,17 @@ +FROM tarantool/tarantool:3.3.1 AS builder + +# install dependencies +RUN apt update && \ + apt install -y git unzip cmake tt=2.7.0 + +# init tt dir structure, create dir for app, create symlink +RUN tt init && \ + mkdir app && \ + ln -sfn ${PWD}/app/ ${PWD}/instances.enabled/app + +# copy cluster configs +COPY tarantool /opt/tarantool/app + +# build app +RUN tt build app + diff --git a/docker/Makefile b/docker/Makefile index d4dd70b6b..a4f207c89 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -22,7 +22,7 @@ build: binary build_e2e: binary_race docker build --no-cache -t chrislusf/seaweedfs:e2e -f Dockerfile.e2e . -go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset +go_build: # make go_build tags=elastic,ydb,gocdk,hdfs,5BytesOffset,tarantool docker build --build-arg TAGS=$(tags) --no-cache -t chrislusf/seaweedfs:go_build -f Dockerfile.go_build . go_build_large_disk: @@ -37,6 +37,9 @@ build_rocksdb_local: build_rocksdb_dev_env build_rocksdb: docker build --no-cache -t chrislusf/seaweedfs:rocksdb -f Dockerfile.rocksdb_large . +build_tarantool_dev_env: + docker build --no-cache -t chrislusf/tarantool_dev_env -f Dockerfile.tarantool.dev_env . + s3tests_build: docker build --no-cache -t chrislusf/ceph-s3-tests:local -f Dockerfile.s3tests . @@ -106,9 +109,12 @@ test_etcd: build test_ydb: tags = ydb test_ydb: build - export docker compose -f compose/test-ydb-filer.yml -p seaweedfs up +test_tarantool: tags = tarantool +test_tarantool: build_tarantool_dev_env build + docker compose -f compose/test-tarantool-filer.yml -p seaweedfs up + clean: rm ./weed diff --git a/docker/compose/test-tarantool-filer.yml b/docker/compose/test-tarantool-filer.yml new file mode 100644 index 000000000..8f31bf855 --- /dev/null +++ b/docker/compose/test-tarantool-filer.yml @@ -0,0 +1,30 @@ +version: '3.9' + +services: + tarantool: + image: chrislusf/tarantool_dev_env + entrypoint: "tt start app -i" + environment: + APP_USER_PASSWORD: "app" + CLIENT_USER_PASSWORD: "client" + REPLICATOR_USER_PASSWORD: "replicator" + STORAGE_USER_PASSWORD: "storage" + network_mode: "host" + ports: + - "3303:3303" + + s3: + image: chrislusf/seaweedfs:local + command: "server -ip=127.0.0.1 -filer -master.volumeSizeLimitMB=16 -volume.max=0 -volume -volume.preStopSeconds=1 -s3 -s3.config=/etc/seaweedfs/s3.json -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false" + volumes: + - ./s3.json:/etc/seaweedfs/s3.json + environment: + WEED_LEVELDB2_ENABLED: "false" + WEED_TARANTOOL_ENABLED: "true" + WEED_TARANTOOL_ADDRESS: "127.0.0.1:3303" + WEED_TARANTOOL_USER: "client" + WEED_TARANTOOL_PASSWORD: "client" + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 + network_mode: "host" + depends_on: + - tarantool \ No newline at end of file diff --git a/docker/tarantool/app-scm-1.rockspec b/docker/tarantool/app-scm-1.rockspec new file mode 100644 index 000000000..79eb1ca38 --- /dev/null +++ b/docker/tarantool/app-scm-1.rockspec @@ -0,0 +1,14 @@ +package = 'app' +version = 'scm-1' +source = { + url = '/dev/null', +} +dependencies = { + 'crud == 1.5.2-1', + 'expirationd == 1.6.0-1', + 'metrics-export-role == 0.3.0-1', + 'vshard == 0.1.32-1' +} +build = { + type = 'none'; +} \ No newline at end of file diff --git a/docker/tarantool/config.yaml b/docker/tarantool/config.yaml new file mode 100644 index 000000000..00a693a2e --- /dev/null +++ b/docker/tarantool/config.yaml @@ -0,0 +1,145 @@ +config: + context: + app_user_password: + from: env + env: APP_USER_PASSWORD + client_user_password: + from: env + env: CLIENT_USER_PASSWORD + replicator_user_password: + from: env + env: REPLICATOR_USER_PASSWORD + storage_user_password: + from: env + env: STORAGE_USER_PASSWORD + +credentials: + roles: + crud-role: + privileges: + - permissions: [ "execute" ] + lua_call: [ "crud.delete", "crud.get", "crud.upsert" ] + users: + app: + password: '{{ context.app_user_password }}' + roles: [ public, crud-role ] + client: + password: '{{ context.client_user_password }}' + roles: [ super ] + replicator: + password: '{{ context.replicator_user_password }}' + roles: [ replication ] + storage: + password: '{{ context.storage_user_password }}' + roles: [ sharding ] + +iproto: + advertise: + peer: + login: replicator + sharding: + login: storage + +sharding: + bucket_count: 10000 + +metrics: + include: [ all ] + exclude: [ vinyl ] + labels: + alias: '{{ instance_name }}' + + +groups: + storages: + roles: + - roles.crud-storage + - roles.expirationd + - roles.metrics-export + roles_cfg: + roles.expirationd: + cfg: + metrics: true + filer_metadata_task: + space: filer_metadata + is_expired: filer_metadata.is_expired + options: + atomic_iteration: true + force: true + index: 'expire_at_idx' + iterator_type: GT + start_key: + - 0 + tuples_per_iteration: 10000 + app: + module: storage + sharding: + roles: [ storage ] + replication: + failover: election + database: + use_mvcc_engine: true + replicasets: + storage-001: + instances: + storage-001-a: + roles_cfg: + roles.metrics-export: + http: + - listen: '0.0.0.0:8081' + endpoints: + - path: /metrics/prometheus/ + format: prometheus + - path: /metrics/json + format: json + iproto: + listen: + - uri: 127.0.0.1:3301 + advertise: + client: 127.0.0.1:3301 + storage-001-b: + roles_cfg: + roles.metrics-export: + http: + - listen: '0.0.0.0:8082' + endpoints: + - path: /metrics/prometheus/ + format: prometheus + - path: /metrics/json + format: json + iproto: + listen: + - uri: 127.0.0.1:3302 + advertise: + client: 127.0.0.1:3302 + routers: + roles: + - roles.crud-router + - roles.metrics-export + roles_cfg: + roles.crud-router: + stats: true + stats_driver: metrics + stats_quantiles: true + app: + module: router + sharding: + roles: [ router ] + replicasets: + router-001: + instances: + router-001-a: + roles_cfg: + roles.metrics-export: + http: + - listen: '0.0.0.0:8083' + endpoints: + - path: /metrics/prometheus/ + format: prometheus + - path: /metrics/json + format: json + iproto: + listen: + - uri: 127.0.0.1:3303 + advertise: + client: 127.0.0.1:3303 \ No newline at end of file diff --git a/docker/tarantool/instances.yaml b/docker/tarantool/instances.yaml new file mode 100644 index 000000000..225b7382f --- /dev/null +++ b/docker/tarantool/instances.yaml @@ -0,0 +1,7 @@ +--- +storage-001-a: + +storage-001-b: + +router-001-a: + diff --git a/docker/tarantool/router.lua b/docker/tarantool/router.lua new file mode 100644 index 000000000..359a8c49b --- /dev/null +++ b/docker/tarantool/router.lua @@ -0,0 +1,77 @@ +local vshard = require('vshard') +local log = require('log') + +-- Bootstrap the vshard router. +while true do + local ok, err = vshard.router.bootstrap({ + if_not_bootstrapped = true, + }) + if ok then + break + end + log.info(('Router bootstrap error: %s'):format(err)) +end + +-- functions for filer_metadata space +local filer_metadata = { + delete_by_directory_idx = function(directory) + -- find all storages + local storages = require('vshard').router.routeall() + -- on each storage + for _, storage in pairs(storages) do + -- call local function + local result, err = storage:callrw('filer_metadata.delete_by_directory_idx', { directory }) + -- check for error + if err then + error("Failed to call function on storage: " .. tostring(err)) + end + end + -- return + return true + end, + find_by_directory_idx_and_name = function(dirPath, startFileName, includeStartFile, limit) + -- init results + local results = {} + -- find all storages + local storages = require('vshard').router.routeall() + -- on each storage + for _, storage in pairs(storages) do + -- call local function + local result, err = storage:callro('filer_metadata.find_by_directory_idx_and_name', { + dirPath, + startFileName, + includeStartFile, + limit + }) + -- check for error + if err then + error("Failed to call function on storage: " .. tostring(err)) + end + -- add to results + for _, tuple in ipairs(result) do + table.insert(results, tuple) + end + end + -- sort + table.sort(results, function(a, b) return a[3] < b[3] end) + -- apply limit + if #results > limit then + local limitedResults = {} + for i = 1, limit do + table.insert(limitedResults, results[i]) + end + results = limitedResults + end + -- return + return results + end, +} + +rawset(_G, 'filer_metadata', filer_metadata) + +-- register functions for filer_metadata space, set grants +for name, _ in pairs(filer_metadata) do + box.schema.func.create('filer_metadata.' .. name, { if_not_exists = true }) + box.schema.user.grant('app', 'execute', 'function', 'filer_metadata.' .. name, { if_not_exists = true }) + box.schema.user.grant('client', 'execute', 'function', 'filer_metadata.' .. name, { if_not_exists = true }) +end diff --git a/docker/tarantool/storage.lua b/docker/tarantool/storage.lua new file mode 100644 index 000000000..ff1ec0288 --- /dev/null +++ b/docker/tarantool/storage.lua @@ -0,0 +1,97 @@ +box.watch('box.status', function() + if box.info.ro then + return + end + + -- ==================================== + -- key_value space + -- ==================================== + box.schema.create_space('key_value', { + format = { + { name = 'key', type = 'string' }, + { name = 'bucket_id', type = 'unsigned' }, + { name = 'value', type = 'string' } + }, + if_not_exists = true + }) + + -- create key_value space indexes + box.space.key_value:create_index('id', {type = 'tree', parts = { 'key' }, unique = true, if_not_exists = true}) + box.space.key_value:create_index('bucket_id', { type = 'tree', parts = { 'bucket_id' }, unique = false, if_not_exists = true }) + + -- ==================================== + -- filer_metadata space + -- ==================================== + box.schema.create_space('filer_metadata', { + format = { + { name = 'directory', type = 'string' }, + { name = 'bucket_id', type = 'unsigned' }, + { name = 'name', type = 'string' }, + { name = 'expire_at', type = 'unsigned' }, + { name = 'data', type = 'string' } + }, + if_not_exists = true + }) + + -- create filer_metadata space indexes + box.space.filer_metadata:create_index('id', {type = 'tree', parts = { 'directory', 'name' }, unique = true, if_not_exists = true}) + box.space.filer_metadata:create_index('bucket_id', { type = 'tree', parts = { 'bucket_id' }, unique = false, if_not_exists = true }) + box.space.filer_metadata:create_index('directory_idx', { type = 'tree', parts = { 'directory' }, unique = false, if_not_exists = true }) + box.space.filer_metadata:create_index('name_idx', { type = 'tree', parts = { 'name' }, unique = false, if_not_exists = true }) + box.space.filer_metadata:create_index('expire_at_idx', { type = 'tree', parts = { 'expire_at' }, unique = false, if_not_exists = true}) +end) + +-- functions for filer_metadata space +local filer_metadata = { + delete_by_directory_idx = function(directory) + local space = box.space.filer_metadata + local index = space.index.directory_idx + -- for each finded directories + for _, tuple in index:pairs({ directory }, { iterator = 'EQ' }) do + space:delete({ tuple[1], tuple[3] }) + end + return true + end, + find_by_directory_idx_and_name = function(dirPath, startFileName, includeStartFile, limit) + local space = box.space.filer_metadata + local directory_idx = space.index.directory_idx + -- choose filter name function + local filter_filename_func + if includeStartFile then + filter_filename_func = function(value) return value >= startFileName end + else + filter_filename_func = function(value) return value > startFileName end + end + -- init results + local results = {} + -- for each finded directories + for _, tuple in directory_idx:pairs({ dirPath }, { iterator = 'EQ' }) do + -- filter by name + if filter_filename_func(tuple[3]) then + table.insert(results, tuple) + end + end + -- sort + table.sort(results, function(a, b) return a[3] < b[3] end) + -- apply limit + if #results > limit then + local limitedResults = {} + for i = 1, limit do + table.insert(limitedResults, results[i]) + end + results = limitedResults + end + -- return + return results + end, + is_expired = function(args, tuple) + return (tuple[4] > 0) and (require('fiber').time() > tuple[4]) + end +} + +-- register functions for filer_metadata space, set grants +rawset(_G, 'filer_metadata', filer_metadata) +for name, _ in pairs(filer_metadata) do + box.schema.func.create('filer_metadata.' .. name, { setuid = true, if_not_exists = true }) + box.schema.user.grant('storage', 'execute', 'function', 'filer_metadata.' .. name, { if_not_exists = true }) +end diff --git a/go.mod b/go.mod index 43a736029..f7787952f 100644 --- a/go.mod +++ b/go.mod @@ -326,6 +326,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/t3rm1n4l/go-mega v0.0.0-20241213150454-ec0027fb0002 // indirect + github.com/tarantool/go-iproto v1.1.0 // indirect + github.com/tarantool/go-tarantool/v2 v2.3.0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect github.com/tikv/pd/client v0.0.0-20230329114254-1948c247c2b1 // indirect github.com/tinylib/msgp v1.1.8 // indirect @@ -333,6 +335,8 @@ require ( github.com/tklauser/numcpus v0.7.0 // indirect github.com/twmb/murmur3 v1.1.3 // indirect github.com/unknwon/goconfig v1.0.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e // indirect github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 // indirect diff --git a/go.sum b/go.sum index 753b13d5d..b23aa7740 100644 --- a/go.sum +++ b/go.sum @@ -1579,6 +1579,10 @@ github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJ github.com/t3rm1n4l/go-mega v0.0.0-20241213150454-ec0027fb0002 h1:jevGbwKzMmHLgHAaDaMJLQX3jpXUWjUvnsrPeMgkM7o= github.com/t3rm1n4l/go-mega v0.0.0-20241213150454-ec0027fb0002/go.mod h1:0Mv/XWQoRWF7d7jkc4DufsAJQg8xyZ5NtCkY59wECQY= github.com/tailscale/depaware v0.0.0-20210622194025-720c4b409502/go.mod h1:p9lPsd+cx33L3H9nNoecRRxPssFKUwwI50I3pZ0yT+8= +github.com/tarantool/go-iproto v1.1.0 h1:HULVOIHsiehI+FnHfM7wMDntuzUddO09DKqu2WnFQ5A= +github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= +github.com/tarantool/go-tarantool/v2 v2.3.0 h1:oLEWqQ5rQGT05JdSPaKXNSJyqCXTN7oDWgS11WPlAgk= +github.com/tarantool/go-tarantool/v2 v2.3.0/go.mod h1:hKKeZeCP8Y8+U6ZFS32ot1jHV/n4WKVP4fjRAvQznMY= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= @@ -1618,6 +1622,10 @@ github.com/viant/ptrie v1.0.1 h1:3fFC8XqCSchf11sCSS5sbb8eGDNEP2g2Hj96lNdHlZY= github.com/viant/ptrie v1.0.1/go.mod h1:Y+mwwNCIUgFrCZcrG4/QChfi4ubvnNBsyrENBIgigu0= github.com/viant/toolbox v0.34.5 h1:szWNPiGHjo8Dd4v2a59saEhG31DRL2Xf3aJ0ZtTSuqc= github.com/viant/toolbox v0.34.5/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlVjOeLOBz+CPAI8dnbqNSVwUwRrkp7vQ= github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= diff --git a/weed/command/imports.go b/weed/command/imports.go index bcc9e173b..d3cefc703 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -31,6 +31,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3" _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite" + _ "github.com/seaweedfs/seaweedfs/weed/filer/tarantool" _ "github.com/seaweedfs/seaweedfs/weed/filer/tikv" _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb" ) diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index a85f4c257..ca6b99f38 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -337,3 +337,10 @@ cert_path="" key_path="" # The name list used to verify the cn name verify_cn="" + +[tarantool] +address = "localhost:3301" +user = "guest" +password = "" +timeout = "5s" +maxReconnects = 1000 diff --git a/weed/command/update_full.go b/weed/command/update_full.go index fcf4364d1..95ca5fc00 100644 --- a/weed/command/update_full.go +++ b/weed/command/update_full.go @@ -1,5 +1,5 @@ -//go:build elastic && gocdk && rclone && sqlite && tikv && ydb -// +build elastic,gocdk,rclone,sqlite,tikv,ydb +//go:build elastic && gocdk && rclone && sqlite && tarantool && tikv && ydb +// +build elastic,gocdk,rclone,sqlite,tarantool,tikv,ydb package command diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go index 1e4149589..fda694f26 100644 --- a/weed/filer/store_test/test_suite.go +++ b/weed/filer/store_test/test_suite.go @@ -29,16 +29,29 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) { }) assert.Nil(t, err, "list directory") assert.Equal(t, 3, counter, "directory list counter") - assert.Equal(t, "f00003", lastFileName, "directory list last file") + assert.Equal(t, "f00002", lastFileName, "directory list last file") lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool { counter++ return true }) assert.Nil(t, err, "list directory") assert.Equal(t, 1027, counter, "directory list counter") - assert.Equal(t, "f01027", lastFileName, "directory list last file") + assert.Equal(t, "f01026", lastFileName, "directory list last file") } + testKey := []byte("test_key") + testValue1 := []byte("test_value1") + testValue2 := []byte("test_value2") + + err := store.KvPut(ctx, testKey, testValue1) + assert.Nil(t, err, "KV put") + value, err := store.KvGet(ctx, testKey) + assert.Equal(t, value, testValue1, "KV get") + + err = store.KvPut(ctx, testKey, testValue2) + assert.Nil(t, err, "KV update") + value, err = store.KvGet(ctx, testKey) + assert.Equal(t, value, testValue2, "KV get after update") } func makeEntry(fullPath util.FullPath, isDirectory bool) *filer.Entry { diff --git a/weed/filer/tarantool/doc.go b/weed/filer/tarantool/doc.go new file mode 100644 index 000000000..3c448e8e1 --- /dev/null +++ b/weed/filer/tarantool/doc.go @@ -0,0 +1,7 @@ +/* +Package tarantool is for Tarantool filer store. + +The referenced "github.com/tarantool/go-tarantool/v2" library is too big when compiled. +So this is only compiled in "make full_install". +*/ +package tarantool diff --git a/weed/filer/tarantool/readme.md b/weed/filer/tarantool/readme.md new file mode 100644 index 000000000..b51241488 --- /dev/null +++ b/weed/filer/tarantool/readme.md @@ -0,0 +1,11 @@ +## Tarantool + +database: https://www.tarantool.io/ + +go driver: https://github.com/tarantool/go-tarantool/ + +To set up local env: +`make -C docker test_tarantool` + +Run tests: +`RUN_TARANTOOL_TESTS=1 go test -tags=tarantool ./weed/filer/tarantool` \ No newline at end of file diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go new file mode 100644 index 000000000..8d19db60d --- /dev/null +++ b/weed/filer/tarantool/tarantool_store.go @@ -0,0 +1,318 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "context" + "fmt" + "reflect" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + weed_util "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/crud" + "github.com/tarantool/go-tarantool/v2/pool" +) + +const ( + tarantoolSpaceName = "filer_metadata" +) + +func init() { + filer.Stores = append(filer.Stores, &TarantoolStore{}) +} + +type TarantoolStore struct { + pool *pool.ConnectionPool +} + +func (store *TarantoolStore) GetName() string { + return "tarantool" +} + +func (store *TarantoolStore) Initialize(configuration weed_util.Configuration, prefix string) error { + + configuration.SetDefault(prefix+"address", "localhost:3301") + configuration.SetDefault(prefix+"user", "guest") + configuration.SetDefault(prefix+"password", "") + configuration.SetDefault(prefix+"timeout", "5s") + configuration.SetDefault(prefix+"maxReconnects", "1000") + + address := configuration.GetString(prefix + "address") + user := configuration.GetString(prefix + "user") + password := configuration.GetString(prefix + "password") + + timeoutStr := configuration.GetString(prefix + "timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("parse tarantool store timeout: %v", err) + } + + maxReconnects := configuration.GetInt(prefix + "maxReconnects") + if maxReconnects < 0 { + return fmt.Errorf("maxReconnects is negative") + } + + addresses := strings.Split(address, ",") + + return store.initialize(addresses, user, password, timeout, uint(maxReconnects)) +} + +func (store *TarantoolStore) initialize(addresses []string, user string, password string, timeout time.Duration, maxReconnects uint) error { + + opts := tarantool.Opts{ + Timeout: timeout, + Reconnect: time.Second, + MaxReconnects: maxReconnects, + } + + poolInstances := makePoolInstances(addresses, user, password, opts) + poolOpts := pool.Opts{ + CheckTimeout: time.Second, + } + + ctx := context.Background() + p, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) + if err != nil { + return fmt.Errorf("Can't create connection pool: %v", err) + } + + _, err = p.Do(tarantool.NewPingRequest(), pool.ANY).Get() + if err != nil { + return err + } + + store.pool = p + + return nil +} + +func makePoolInstances(addresses []string, user string, password string, opts tarantool.Opts) []pool.Instance { + poolInstances := make([]pool.Instance, 0, len(addresses)) + for i, address := range addresses { + poolInstances = append(poolInstances, makePoolInstance(address, user, password, opts, i)) + } + return poolInstances +} + +func makePoolInstance(address string, user string, password string, opts tarantool.Opts, serial int) pool.Instance { + return pool.Instance{ + Name: fmt.Sprintf("instance%d", serial), + Dialer: tarantool.NetDialer{ + Address: address, + User: user, + Password: password, + }, + Opts: opts, + } +} + +func (store *TarantoolStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *TarantoolStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + + var ttl int64 + if entry.TtlSec > 0 { + ttl = time.Now().Unix() + int64(entry.TtlSec) + } else { + ttl = 0 + } + + var operations = []crud.Operation{ + { + Operator: crud.Insert, + Field: "data", + Value: string(meta), + }, + } + + req := crud.MakeUpsertRequest(tarantoolSpaceName). + Tuple([]interface{}{dir, nil, name, ttl, string(meta)}). + Operations(operations) + + ret := crud.Result{} + + if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + return nil +} + +func (store *TarantoolStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *TarantoolStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + + findEntryGetOpts := crud.GetOpts{ + Fields: crud.MakeOptTuple([]interface{}{"data"}), + Mode: crud.MakeOptString("read"), + PreferReplica: crud.MakeOptBool(true), + Balance: crud.MakeOptBool(true), + } + + req := crud.MakeGetRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(findEntryGetOpts) + + resp := crud.Result{} + + err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp) + if err != nil { + return nil, err + } + + results, ok := resp.Rows.([]interface{}) + if !ok || len(results) != 1 { + return nil, filer_pb.ErrNotFound + } + + rows, ok := results[0].([]interface{}) + if !ok || len(rows) != 1 { + return nil, filer_pb.ErrNotFound + } + + row, ok := rows[0].(string) + if !ok { + return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0]) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData([]byte(row))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *TarantoolStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + + delOpts := crud.DeleteOpts{ + Noreturn: crud.MakeOptBool(true), + } + + req := crud.MakeDeleteRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(delOpts) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + req := tarantool.NewCallRequest("filer_metadata.delete_by_directory_idx"). + Args([]interface{}{fullpath}) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + req := tarantool.NewCallRequest("filer_metadata.find_by_directory_idx_and_name"). + Args([]interface{}{string(dirPath), startFileName, includeStartFile, limit}) + + results, err := store.pool.Do(req, pool.PreferRO).Get() + if err != nil { + return + } + + if len(results) < 1 { + glog.Errorf("Can't find results, data is empty") + return + } + + rows, ok := results[0].([]interface{}) + if !ok { + glog.Errorf("Can't convert results[0] to list") + return + } + + for _, result := range rows { + row, ok := result.([]interface{}) + if !ok { + glog.Errorf("Can't convert result to list") + return + } + + if len(row) < 5 { + glog.Errorf("Length of result is less than needed: %v", len(row)) + return + } + + nameRaw := row[2] + name, ok := nameRaw.(string) + if !ok { + glog.Errorf("Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw) + return + } + + dataRaw := row[4] + data, ok := dataRaw.(string) + if !ok { + glog.Errorf("Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw) + return + } + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), name), + } + lastFileName = name + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + if !eachEntryFunc(entry) { + break + } + } + + return lastFileName, err +} + +func (store *TarantoolStore) Shutdown() { + store.pool.Close() +} diff --git a/weed/filer/tarantool/tarantool_store_kv.go b/weed/filer/tarantool/tarantool_store_kv.go new file mode 100644 index 000000000..e9f0f4dd0 --- /dev/null +++ b/weed/filer/tarantool/tarantool_store_kv.go @@ -0,0 +1,95 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "context" + "fmt" + "reflect" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/tarantool/go-tarantool/v2/crud" + "github.com/tarantool/go-tarantool/v2/pool" +) + +const ( + tarantoolKVSpaceName = "key_value" +) + +func (store *TarantoolStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + var operations = []crud.Operation{ + { + Operator: crud.Insert, + Field: "value", + Value: string(value), + }, + } + + req := crud.MakeUpsertRequest(tarantoolKVSpaceName). + Tuple([]interface{}{string(key), nil, string(value)}). + Operations(operations) + + ret := crud.Result{} + if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *TarantoolStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + getOpts := crud.GetOpts{ + Fields: crud.MakeOptTuple([]interface{}{"value"}), + Mode: crud.MakeOptString("read"), + PreferReplica: crud.MakeOptBool(true), + Balance: crud.MakeOptBool(true), + } + + req := crud.MakeGetRequest(tarantoolKVSpaceName). + Key(crud.Tuple([]interface{}{string(key)})). + Opts(getOpts) + + resp := crud.Result{} + + err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp) + if err != nil { + return nil, err + } + + results, ok := resp.Rows.([]interface{}) + if !ok || len(results) != 1 { + return nil, filer.ErrKvNotFound + } + + rows, ok := results[0].([]interface{}) + if !ok || len(rows) != 1 { + return nil, filer.ErrKvNotFound + } + + row, ok := rows[0].(string) + if !ok { + return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0]) + } + + return []byte(row), nil +} + +func (store *TarantoolStore) KvDelete(ctx context.Context, key []byte) (err error) { + + delOpts := crud.DeleteOpts{ + Noreturn: crud.MakeOptBool(true), + } + + req := crud.MakeDeleteRequest(tarantoolKVSpaceName). + Key(crud.Tuple([]interface{}{string(key)})). + Opts(delOpts) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/tarantool/tarantool_store_test.go b/weed/filer/tarantool/tarantool_store_test.go new file mode 100644 index 000000000..500289773 --- /dev/null +++ b/weed/filer/tarantool/tarantool_store_test.go @@ -0,0 +1,24 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer/store_test" +) + +func TestStore(t *testing.T) { + // run "make test_tarantool" under docker folder. + // to set up local env + if os.Getenv("RUN_TARANTOOL_TESTS") != "1" { + t.Skip("Tarantool tests are disabled. Set RUN_TARANTOOL_TESTS=1 to enable.") + } + store := &TarantoolStore{} + addresses := []string{"127.0.1:3303"} + store.initialize(addresses, "client", "client", 5*time.Second, 1000) + store_test.TestFilerStore(t, store) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 6855a4745..2449e91f9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -40,6 +40,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3" _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite" + _ "github.com/seaweedfs/seaweedfs/weed/filer/tarantool" _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/notification"