Browse Source

Merge branch 'master' of https://github.com/fuyouyshengwu/seaweedfs

pull/1728/head
henry 4 years ago
parent
commit
97a94eddab
  1. 22
      .github/workflows/cleanup.yml
  2. 12
      .github/workflows/release.yml
  3. 14
      README.md
  4. 3
      docker/Dockerfile.go_build
  5. 3
      docker/Dockerfile.go_build_large
  6. 1
      docker/Dockerfile.local
  7. 13
      docker/local-dev-compose.yml
  8. 3
      go.mod
  9. 10
      go.sum
  10. 2
      k8s/seaweedfs/values.yaml
  11. 1
      weed/command/filer.go
  12. 3
      weed/command/mount_std.go
  13. 24
      weed/command/scaffold.go
  14. 1
      weed/command/server.go
  15. 25
      weed/command/shell.go
  16. 25
      weed/command/upload.go
  17. 103
      weed/filer/filer.go
  18. 2
      weed/filer/filer_buckets.go
  19. 2
      weed/filer/filer_delete_entry.go
  20. 4
      weed/filer/filer_notify.go
  21. 8
      weed/filer/filer_search.go
  22. 6
      weed/filer/leveldb/leveldb_store.go
  23. 33
      weed/filer/leveldb/leveldb_store_test.go
  24. 5
      weed/filer/leveldb2/leveldb2_store.go
  25. 6
      weed/filer/leveldb2/leveldb2_store_test.go
  26. 41
      weed/filer/rocksdb/README.md
  27. 293
      weed/filer/rocksdb/rocksdb_store.go
  28. 51
      weed/filer/rocksdb/rocksdb_store_kv.go
  29. 117
      weed/filer/rocksdb/rocksdb_store_test.go
  30. 3
      weed/operation/submit.go
  31. 31
      weed/s3api/auth_credentials.go
  32. 4
      weed/server/filer_grpc_server.go
  33. 2
      weed/server/filer_grpc_server_rename.go
  34. 2
      weed/server/filer_server_handlers_read_dir.go
  35. 5
      weed/server/filer_server_handlers_write_autochunk.go
  36. 7
      weed/server/filer_server_rocksdb.go
  37. 2
      weed/storage/needle/needle_parse_upload.go
  38. 5
      weed/topology/data_node.go
  39. 2
      weed/util/constants.go

22
.github/workflows/cleanup.yml

@ -0,0 +1,22 @@
name: Cleanup
on:
push:
branches: [ master ]
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Delete old release assets
uses: mknejp/delete-release-assets@v1
with:
token: ${{ github.token }}
tag: dev
fail-if-no-assets: false
assets: |
weed-*

12
.github/workflows/release.yml

@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goos: [linux, windows, darwin ]
goos: [linux, windows, darwin, freebsd ]
goarch: [amd64, arm]
exclude:
- goarch: arm
@ -24,14 +24,10 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Delete old release assets
uses: mknejp/delete-release-assets@v1
- name: Wait for the deletion
uses: jakejarvis/wait-action@master
with:
token: ${{ github.token }}
tag: dev
fail-if-no-assets: false
assets: |
weed-*
time: '30s'
- name: Set BUILD_TIME env
run: echo BUILD_TIME=$(date -u +%Y-%m-%d-%H-%M) >> ${GITHUB_ENV}

14
README.md

@ -86,6 +86,8 @@ SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.
On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc.
For any distributed key value stores, the large values can be offloaded to SeaweedFS. With the fast access speed and linearly scalable capacity, SeaweedFS can work as a distributed [Key-Large-Value store][KeyLargeValueStore].
[Back to TOC](#table-of-contents)
## Additional Features ##
@ -117,7 +119,10 @@ On top of the object store, optional [Filer] can support directories and POSIX a
* [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices.
* [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data.
* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB.
## Kubernetes ##
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
* [SeaweedFS Operator](https://github.com/seaweedfs/seaweedfs-operator)
[Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files
[SuperLargeFiles]: https://github.com/chrislusf/seaweedfs/wiki/Data-Structure-for-Large-Files
@ -134,6 +139,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a
[SeaweedFsCsiDriver]: https://github.com/seaweedfs/seaweedfs-csi-driver
[ActiveActiveAsyncReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Active-Active-cross-cluster-continuous-synchronization
[FilerStoreReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Store-Replication
[KeyLargeValueStore]: https://github.com/chrislusf/seaweedfs/wiki/Filer-as-a-Key-Large-Value-Store
[Back to TOC](#table-of-contents)
@ -335,7 +341,7 @@ Usually hot data are fresh and warm data are old. SeaweedFS puts the newly creat
With the O(1) access time, the network latency cost is kept at minimum.
If the hot~warm data is split as 20~80, with 20 servers, you can achieve storage capacity of 100 servers. That's a cost saving of 80%! Or you can repurpose the 80 servers to store new data also, and get 5X storage throughput.
If the hot/warm data is split as 20/80, with 20 servers, you can achieve storage capacity of 100 servers. That's a cost saving of 80%! Or you can repurpose the 80 servers to store new data also, and get 5X storage throughput.
[Back to TOC](#table-of-contents)
@ -403,7 +409,7 @@ SeaweedFS has a centralized master group to look up free volumes, while Ceph use
Same as SeaweedFS, Ceph is also based on the object store RADOS. Ceph is rather complicated with mixed reviews.
Ceph uses CRUSH hashing to automatically manage the data placement. SeaweedFS places data by assigned volumes.
Ceph uses CRUSH hashing to automatically manage the data placement, which is efficient to locate the data. But the data has to be placed according to the CRUSH algorithm. Any wrong configuration would cause data loss. SeaweedFS places data by assigning them to any writable volumes. If writes to one volume failed, just pick another volume to write. Adding more volumes are also as simple as it can be.
SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read.
@ -436,9 +442,7 @@ MinIO has specific requirements on storage layout. It is not flexible to adjust
## Dev Plan ##
* More tools and documentation, on how to manage and scale the system. For example, how to move volumes, automatically balancing data, how to grow volumes, how to check system status, etc.
* Integrate with Kubernetes. build [SeaweedFS Operator](https://github.com/seaweedfs/seaweedfs-operator).
* Add ftp server.
* More tools and documentation, on how to manage and scale the system.
* Read and write stream data.
* Support structured data.

3
docker/Dockerfile.go_build

@ -1,5 +1,5 @@
FROM frolvlad/alpine-glibc as builder
RUN apk add git go g++
RUN apk add git go g++ fuse
RUN mkdir -p /go/src/github.com/chrislusf/
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs
ARG BRANCH=${BRANCH:-master}
@ -14,6 +14,7 @@ COPY --from=builder /root/go/bin/weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh
RUN apk add fuse # for weed mount
# volume server gprc port
EXPOSE 18080

3
docker/Dockerfile.go_build_large

@ -1,5 +1,5 @@
FROM frolvlad/alpine-glibc as builder
RUN apk add git go g++
RUN apk add git go g++ fuse
RUN mkdir -p /go/src/github.com/chrislusf/
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs
ARG BRANCH=${BRANCH:-master}
@ -14,6 +14,7 @@ COPY --from=builder /root/go/bin/weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh
RUN apk add fuse # for weed mount
# volume server gprc port
EXPOSE 18080

1
docker/Dockerfile.local

@ -4,6 +4,7 @@ COPY ./weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY ./filer.toml /etc/seaweedfs/filer.toml
COPY ./entrypoint.sh /entrypoint.sh
RUN apk add fuse # for weed mount
# volume server grpc port
EXPOSE 18080

13
docker/local-dev-compose.yml

@ -12,7 +12,7 @@ services:
ports:
- 8080:8080
- 18080:18080
command: "volume -mserver=master:9333 -port=8080 -ip=volume"
command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
depends_on:
- master
filer:
@ -33,3 +33,14 @@ services:
- master
- volume
- filer
mount:
image: chrislusf/seaweedfs:local
privileged: true
cap_add:
- SYS_ADMIN
mem_limit: 4096m
command: '-v=4 mount -filer="filer:8888" -dirAutoCreate -dir=/mnt/seaweedfs -cacheCapacityMB=0 '
depends_on:
- master
- volume
- filer

3
go.mod

@ -11,7 +11,7 @@ require (
github.com/aws/aws-sdk-go v1.33.5
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v1.0.3
github.com/chrislusf/raft v1.0.4
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2
@ -66,6 +66,7 @@ require (
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v1.0.0
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
github.com/tidwall/gjson v1.3.2
github.com/tidwall/match v1.0.1
github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365

10
go.sum

@ -86,6 +86,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chrislusf/raft v1.0.3 h1:11YrnzJtVa5z7m9lhY2p8VcPHoUlC1UswyoAo+U1m1k=
github.com/chrislusf/raft v1.0.3/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.4 h1:THhbsVik2hxdE0/VXX834f64Wn9RzgVPp+E+XCWZdKM=
github.com/chrislusf/raft v1.0.4/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@ -225,6 +227,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.3 h1:GV+pQPG/EUUbkh47niozDcADz6go/dUwhVzdUQHIVRw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -542,6 +545,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@ -551,8 +555,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seaweedfs/fuse v1.0.7 h1:tESMXhI3gXzN+dlWsCUrkIZDiWA4dZX18rQMoqmvazw=
github.com/seaweedfs/fuse v1.0.7/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s=
github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
@ -618,6 +620,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok=
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8=
github.com/tidwall/gjson v1.3.2 h1:+7p3qQFaH3fOMXAJSrdZwGKcOO/lYdGS0HqGhPqDdTI=
github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
@ -941,7 +945,9 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o=
modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg=
modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
imageTag: "2.17"
imageTag: "2.19"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

1
weed/command/filer.go

@ -74,6 +74,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
}
var cmdFiler = &Command{

3
weed/command/mount_std.go

@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return true
}
util.LoadConfiguration("security", false)
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
util.LoadConfiguration("security", false)
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")

24
weed/command/scaffold.go

@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = SECURITY_TOML_EXAMPLE
case "master":
content = MASTER_TOML_EXAMPLE
case "shell":
content = SHELL_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@ -85,7 +87,13 @@ buckets_folder = "/buckets"
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
enabled = true
dir = "." # directory to store level db files
dir = "./filerldb2" # directory to store level db files
[rocksdb]
# local on disk, similar to leveldb
# since it is using a C wrapper, you need to install rocksdb and build it by yourself
enabled = false
dir = "./filerrdb" # directory to store rocksdb files
[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
@ -459,5 +467,19 @@ copy_other = 1 # create n x 1 = n actual volumes
# if you are doing your own replication or periodic sync of volumes.
treat_replication_as_minimums = false
`
SHELL_TOML_EXAMPLE = `
[cluster]
default = "c1"
[cluster.c1]
master = "localhost:9333" # comma-separated master servers
filer = "localhost:8888" # filer host and port
[cluster.c2]
master = ""
filer = ""
`
)

1
weed/command/server.go

@ -113,6 +113,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")

25
weed/command/shell.go

@ -11,12 +11,14 @@ import (
var (
shellOptions shell.ShellOptions
shellInitialFiler *string
shellCluster *string
)
func init() {
cmdShell.Run = runShell // break init cycle
shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333")
shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888")
shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml")
}
var cmdShell = &Command{
@ -24,6 +26,8 @@ var cmdShell = &Command{
Short: "run interactive administrative commands",
Long: `run interactive administrative commands.
Generate shell.toml via "weed scaffold -config=shell"
`,
}
@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
if *shellOptions.Masters == "" && *shellInitialFiler == "" {
util.LoadConfiguration("shell", false)
v := util.GetViper()
cluster := v.GetString("cluster.default")
if *shellCluster != "" {
cluster = *shellCluster
}
if cluster == "" {
*shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888"
} else {
*shellOptions.Masters = v.GetString("cluster." + cluster + ".master")
*shellInitialFiler = v.GetString("cluster." + cluster + ".filer")
}
}
fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
if err != nil {

25
weed/command/upload.go

@ -1,8 +1,12 @@
package command
import (
"context"
"encoding/json"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"google.golang.org/grpc"
"os"
"path/filepath"
@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
if err != nil {
fmt.Printf("upload: %v", err)
return false
}
if *upload.replication == "" {
*upload.replication = defaultCollection
}
if len(args) == 0 {
if *upload.dir == "" {
return false
@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool {
}
return true
}
func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) {
err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
}
replication = resp.DefaultReplication
return nil
})
return
}

103
weed/filer/filer.go

@ -134,14 +134,57 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
}
*/
if oldEntry == nil {
dirParts := strings.Split(string(entry.FullPath), "/")
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
return err
}
// fmt.Printf("directory parts: %+v\n", dirParts)
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
if o_excl {
glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
}
glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
var lastDirectoryEntry *Entry
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
for i := 1; i < len(dirParts); i++ {
dirPath := "/" + util.Join(dirParts[:i]...)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
return nil
}
func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
if level == 0 {
return nil
}
dirPath := "/" + util.Join(dirParts[:level]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// check the store directly
@ -151,6 +194,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
// no such existing directory
if dirEntry == nil {
// ensure parent directory
if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
return err
}
// create the directory
now := time.Now()
@ -186,53 +234,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return fmt.Errorf("%s is a file", dirPath)
}
// remember the direct parent directory entry
if i == len(dirParts)-1 {
lastDirectoryEntry = dirEntry
}
}
if lastDirectoryEntry == nil {
glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
return fmt.Errorf("parent folder not found: %v", entry.FullPath)
}
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
}
*/
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
if oldEntry == nil {
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
if o_excl {
glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
}
glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
return nil
}

2
weed/filer/filer_buckets.go

@ -29,7 +29,7 @@ func (f *Filer) LoadBuckets() {
limit := math.MaxInt32
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)

2
weed/filer/filer_delete_entry.go

@ -68,7 +68,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName := ""
includeLastFile := false
for {
entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
if err != nil {
glog.Errorf("list folder %s: %v", entry.FullPath, err)
return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)

4
weed/filer/filer_notify.go

@ -113,13 +113,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "")
if listDayErr != nil {
return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}

8
weed/filer/filer_search.go

@ -19,12 +19,16 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
return "", restPattern
}
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) {
// For now, prefix and namePattern are mutually exclusive
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string, namePattern string) (entries []*Entry, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
prefix, restNamePattern := splitPattern(namePattern)
prefixInNamePattern, restNamePattern := splitPattern(namePattern)
if prefixInNamePattern != "" {
prefix = prefixInNamePattern
}
var missedCount int
var lastFileName string

6
weed/filer/leveldb/leveldb_store.go

@ -170,8 +170,12 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
lastFileStart := directoryPrefix
if startFileName != "" {
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
}
iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {

33
weed/filer/leveldb/leveldb_store_test.go

@ -2,9 +2,11 @@ package leveldb
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
@ -49,14 +51,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
@ -86,3 +88,28 @@ func TestEmptyRoot(t *testing.T) {
}
}
func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
ctx := context.Background()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
entry := &filer.Entry{
FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
},
}
store.InsertEntry(ctx, entry)
}
}

5
weed/filer/leveldb2/leveldb2_store.go

@ -179,7 +179,10 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount)
lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
lastFileStart := directoryPrefix
if startFileName != "" {
lastFileStart, _ = genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
}
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {

6
weed/filer/leveldb2/leveldb2_store_test.go

@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return

41
weed/filer/rocksdb/README.md

@ -0,0 +1,41 @@
# Prepare the compilation environment on linux
- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
- sudo apt-get update -qq
- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq
- export CXX="g++-6" CC="gcc-6"
- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb
- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb
- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb
- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb
# Prepare the compilation environment on mac os
```
brew install snappy
```
# install rocksdb:
```
export ROCKSDB_HOME=/Users/chris/dev/rocksdb
git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME
pushd $ROCKSDB_HOME
make clean
make install-static
popd
```
# install gorocksdb
```
export CGO_CFLAGS="-I$ROCKSDB_HOME/include"
export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
go get github.com/tecbot/gorocksdb
```
# compile with rocksdb
```
cd ~/go/src/github.com/chrislusf/seaweedfs/weed
go install -tags rocksdb
```

293
weed/filer/rocksdb/rocksdb_store.go

@ -0,0 +1,293 @@
// +build rocksdb
package rocksdb
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/tecbot/gorocksdb"
"io"
)
func init() {
filer.Stores = append(filer.Stores, &RocksDBStore{})
}
type RocksDBStore struct {
path string
db *gorocksdb.DB
}
func (store *RocksDBStore) GetName() string {
return "rocksdb"
}
func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
dir := configuration.GetString(prefix + "dir")
return store.initialize(dir)
}
func (store *RocksDBStore) initialize(dir string) (err error) {
glog.Infof("filer store rocksdb dir: %s", dir)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
options := gorocksdb.NewDefaultOptions()
options.SetCreateIfMissing(true)
store.db, err = gorocksdb.OpenDb(options, dir)
return
}
func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.DirAndName()
key := genKey(dir, name)
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Put(wo, key, value)
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
// println("saved", entry.FullPath, "chunks", len(entry.Chunks))
return nil
}
func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
ro := gorocksdb.NewDefaultReadOptions()
data, err := store.db.GetBytes(ro, key)
if data == nil {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
// println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
return entry, nil
}
func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Delete(wo, key)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return nil
}
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
batch := new(gorocksdb.WriteBatch)
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
batch.Delete(key)
return true
})
if err != nil {
return fmt.Errorf("delete list %s : %v", fullpath, err)
}
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Write(wo, batch)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return nil
}
func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
if len(lastKey) == 0 {
iter.Seek(prefix)
} else {
iter.Seek(lastKey)
if !includeLastKey {
k := iter.Key()
v := iter.Value()
key := k.Data()
defer k.Free()
defer v.Free()
if !bytes.HasPrefix(key, prefix) {
return nil
}
if bytes.Equal(key, lastKey) {
iter.Next()
}
}
}
i := 0
for ; iter.Valid(); iter.Next() {
if limit > 0 {
i++
if i > limit {
break
}
}
k := iter.Key()
v := iter.Value()
key := k.Data()
value := v.Data()
if !bytes.HasPrefix(key, prefix) {
k.Free()
v.Free()
break
}
ret := fn(key, value)
k.Free()
v.Free()
if !ret {
break
}
}
if err := iter.Err(); err != nil {
return fmt.Errorf("prefix scan iterator: %v", err)
}
return nil
}
func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
}
func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
lastFileStart := directoryPrefix
if startFileName != "" {
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
}
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
fileName := getNameFromKey(key)
if fileName == "" {
return true
}
limit--
if limit < 0 {
return false
}
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
return false
}
entries = append(entries, entry)
return true
})
if err != nil {
return entries, fmt.Errorf("prefix list %s : %v", fullpath, err)
}
return entries, err
}
func genKey(dirPath, fileName string) (key []byte) {
key = hashToBytes(dirPath)
key = append(key, []byte(fileName)...)
return key
}
func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = hashToBytes(string(fullpath))
if len(startFileName) > 0 {
keyPrefix = append(keyPrefix, []byte(startFileName)...)
}
return keyPrefix
}
func getNameFromKey(key []byte) string {
return string(key[md5.Size:])
}
// hash directory, and use last byte for partitioning
func hashToBytes(dir string) []byte {
h := md5.New()
io.WriteString(h, dir)
b := h.Sum(nil)
return b
}
func (store *RocksDBStore) Shutdown() {
store.db.Close()
}

51
weed/filer/rocksdb/rocksdb_store_kv.go

@ -0,0 +1,51 @@
// +build rocksdb
package rocksdb
import (
"context"
"fmt"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Put(wo, key, value)
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
ro := gorocksdb.NewDefaultReadOptions()
value, err = store.db.GetBytes(ro, key)
if value == nil {
return nil, filer.ErrKvNotFound
}
if err != nil {
return nil, fmt.Errorf("kv get: %v", err)
}
return
}
func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Delete(wo, key)
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}

117
weed/filer/rocksdb/rocksdb_store_test.go

@ -0,0 +1,117 @@
// +build rocksdb
package rocksdb
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
entry1 := &filer.Entry{
FullPath: fullpath,
Attr: filer.Attr{
Mode: 0440,
Uid: 1234,
Gid: 5678,
},
}
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
entry, err := testFiler.FindEntry(ctx, fullpath)
if err != nil {
t.Errorf("find entry: %v", err)
return
}
if entry.FullPath != entry1.FullPath {
t.Errorf("find wrong entry: %v", entry.FullPath)
return
}
// checking one upper directory
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
}
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
ctx := context.Background()
// checking one upper directory
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
}
if len(entries) != 0 {
t.Errorf("list entries count: %v", len(entries))
return
}
}
func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
defer os.RemoveAll(dir)
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
ctx := context.Background()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
entry := &filer.Entry{
FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
},
}
store.InsertEntry(ctx, entry)
}
}

3
weed/operation/submit.go

@ -32,7 +32,7 @@ type FilePart struct {
type SubmitResult struct {
FileName string `json:"fileName,omitempty"`
FileUrl string `json:"fileUrl,omitempty"`
FileUrl string `json:"url,omitempty"`
Fid string `json:"fid,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
@ -69,6 +69,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
file.Ttl = ttl
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()

31
weed/s3api/auth_credentials.go

@ -156,7 +156,36 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
// check whether the request has valid access keys
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) {
identity, s3Err := iam.authUser(r)
var identity *Identity
var s3Err s3err.ErrorCode
var found bool
switch getRequestAuthType(r) {
case authTypeStreamingSigned:
return identity, s3err.ErrNone
case authTypeUnknown:
glog.V(3).Infof("unknown auth type")
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r)
case authTypeSigned, authTypePresigned:
glog.V(3).Infof("v4 auth type")
identity, s3Err = iam.reqSignatureV4Verify(r)
case authTypePostPolicy:
glog.V(3).Infof("post policy auth type")
return identity, s3err.ErrNone
case authTypeJWT:
glog.V(3).Infof("jwt auth type")
return identity, s3err.ErrNotImplemented
case authTypeAnonymous:
identity, found = iam.lookupAnonymous()
if !found {
return identity, s3err.ErrAccessDenied
}
default:
return identity, s3err.ErrNotImplemented
}
if s3Err != s3err.ErrNone {
return identity, s3Err
}

4
weed/server/filer_grpc_server.go

@ -61,7 +61,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix, "")
if err != nil {
return err
@ -326,7 +326,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil {
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
return resp, nil

2
weed/server/filer_grpc_server_rename.go

@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "")
if err != nil {
return err
}

2
weed/server/filer_server_handlers_read_dir.go

@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
namePattern := r.FormValue("namePattern")
entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern)
entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "", namePattern)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)

5
weed/server/filer_server_handlers_write_autochunk.go

@ -111,7 +111,10 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
contentType := ""
contentType := r.Header.Get("Content-Type")
if contentType == "application/octet-stream" {
contentType = ""
}
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
if err != nil {

7
weed/server/filer_server_rocksdb.go

@ -0,0 +1,7 @@
// +build rocksdb
package weed_server
import (
_ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb"
)

2
weed/storage/needle/needle_parse_upload.go

@ -193,9 +193,9 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
mtype = contentType
}
}
pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
// pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
}
return
}

5
weed/topology/data_node.go

@ -151,7 +151,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro
}
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
rack := dn.Parent()
dcNode := rack.Parent()
dcValue := dcNode.GetValue()
return dcValue.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 17)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 19)
COMMIT = ""
)

Loading…
Cancel
Save