Browse Source

Merge branch 'master' into master

pull/1742/head
Yoni Nakache 4 years ago
committed by GitHub
parent
commit
d1959de038
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      README.md
  2. 2
      docker/local-dev-compose.yml
  3. 10
      go.mod
  4. 54
      go.sum
  5. 3
      weed/command/command.go
  6. 6
      weed/command/filer.go
  7. 118
      weed/command/filer_cat.go
  8. 2
      weed/command/filer_copy.go
  9. 201
      weed/command/filer_meta_tail.go
  10. 3
      weed/command/filer_replication.go
  11. 2
      weed/command/mount.go
  12. 10
      weed/command/scaffold.go
  13. 9
      weed/command/server.go
  14. 113
      weed/command/watch.go
  15. 6
      weed/filer/configuration.go
  16. 2
      weed/filer/etcd/etcd_store.go
  17. 7
      weed/filer/filechunk_manifest.go
  18. 9
      weed/filer/filechunks.go
  19. 2
      weed/filer/filer_notify.go
  20. 3
      weed/filer/hbase/hbase_store_kv.go
  21. 2
      weed/filer/leveldb/leveldb_store.go
  22. 2
      weed/filer/leveldb2/leveldb2_store.go
  23. 361
      weed/filer/leveldb3/leveldb3_store.go
  24. 46
      weed/filer/leveldb3/leveldb3_store_kv.go
  25. 88
      weed/filer/leveldb3/leveldb3_store_test.go
  26. 6
      weed/filer/reader_at.go
  27. 6
      weed/filer/redis/redis_cluster_store.go
  28. 2
      weed/filer/redis/redis_store.go
  29. 22
      weed/filer/redis/universal_redis_store.go
  30. 8
      weed/filer/redis/universal_redis_store_kv.go
  31. 6
      weed/filer/redis2/redis_cluster_store.go
  32. 2
      weed/filer/redis2/redis_store.go
  33. 26
      weed/filer/redis2/universal_redis_store.go
  34. 8
      weed/filer/redis2/universal_redis_store_kv.go
  35. 99
      weed/filer/rocksdb/rocksdb_store.go
  36. 10
      weed/filer/rocksdb/rocksdb_store_kv.go
  37. 40
      weed/filer/rocksdb/rocksdb_ttl.go
  38. 8
      weed/filer/stream.go
  39. 26
      weed/messaging/broker/broker_grpc_server_subscribe.go
  40. 5
      weed/notification/configuration.go
  41. 3
      weed/replication/sink/filersink/filer_sink.go
  42. 2
      weed/s3api/s3api_bucket_handlers.go
  43. 2
      weed/s3api/s3api_objects_list_handlers.go
  44. 7
      weed/security/tls.go
  45. 52
      weed/server/filer_grpc_server_sub_meta.go
  46. 3
      weed/server/filer_server.go
  47. 10
      weed/server/filer_server_handlers_read.go
  48. 14
      weed/server/filer_server_handlers_write_autochunk.go
  49. 4
      weed/storage/backend/backend.go
  50. 2
      weed/util/chunk_cache/chunk_cache_in_memory.go
  51. 18
      weed/util/config.go
  52. 2
      weed/util/constants.go
  53. 24
      weed/util/log_buffer/log_buffer.go
  54. 8
      weed/util/log_buffer/log_read.go
  55. 10
      weed/wdclient/vid_map.go

4
README.md

@ -67,7 +67,7 @@ Table of Contents
* Download the latest binary from https://github.com/chrislusf/seaweedfs/releases and unzip a single binary file `weed` or `weed.exe`
* Run `weed server -dir=/some/data/dir -s3` to start one master, one volume server, one filer, and one S3 gateway.
Also, to increase capacity, just add more volume servers by `weed volume -dir="/some/data/dir2" -mserver="<master_host>:9333" -port=8081` locally or a different machine. That is it!
Also, to increase capacity, just add more volume servers by `weed volume -dir="/some/data/dir2" -mserver="<master_host>:9333" -port=8081` locally, or a different machine, or thoudsands of machines. That is it!
## Introduction ##
@ -84,7 +84,7 @@ There is only 40 bytes of disk storage overhead for each file's metadata. It is
SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf). Also, SeaweedFS implements erasure coding with ideas from [f4: Facebook’s Warm BLOB Storage System](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-muralidhar.pdf)
On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc.
On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, MemSql, TiDB, Etcd, CockroachDB, etc.
For any distributed key value stores, the large values can be offloaded to SeaweedFS. With the fast access speed and linearly scalable capacity, SeaweedFS can work as a distributed [Key-Large-Value store][KeyLargeValueStore].

2
docker/local-dev-compose.yml

@ -39,7 +39,7 @@ services:
cap_add:
- SYS_ADMIN
mem_limit: 4096m
command: '-v=4 mount -filer="filer:8888" -dirAutoCreate -dir=/mnt/seaweedfs -cacheCapacityMB=0 '
command: '-v=4 mount -filer="filer:8888" -dirAutoCreate -dir=/mnt/seaweedfs -cacheCapacityMB=100 -concurrentWriters=128'
depends_on:
- master
- volume

10
go.mod

@ -8,7 +8,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/OneOfOne/xxhash v1.2.2
github.com/Shopify/sarama v1.23.1
github.com/aws/aws-sdk-go v1.33.5
github.com/aws/aws-sdk-go v1.34.30
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v1.0.4
@ -25,7 +25,7 @@ require (
github.com/fclairamb/ftpserverlib v0.8.0
github.com/frankban/quicktest v1.7.2 // indirect
github.com/go-errors/errors v1.1.1 // indirect
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-redis/redis/v8 v8.4.4
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect
@ -39,7 +39,7 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/json-iterator/go v1.1.10
github.com/karlseguin/ccache v2.0.3+incompatible
github.com/karlseguin/expect v1.0.1 // indirect
github.com/karlseguin/ccache/v2 v2.0.7
github.com/klauspost/compress v1.10.9 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/klauspost/crc32 v1.2.0
@ -51,8 +51,6 @@ require (
github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olivere/elastic/v7 v7.0.19
github.com/onsi/ginkgo v1.10.1 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/peterh/liner v1.1.0
github.com/pierrec/lz4 v2.2.7+incompatible // indirect
github.com/prometheus/client_golang v1.3.0
@ -83,7 +81,7 @@ require (
gocloud.dev/pubsub/natspubsub v0.16.0
gocloud.dev/pubsub/rabbitpubsub v0.16.0
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114

54
go.sum

@ -61,6 +61,8 @@ github.com/aws/aws-sdk-go v1.19.45/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U=
github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.30 h1:izATc/E0+HcT5YHmaQVjn7GHCoqaBxn0PGo6Zq5UNFA=
github.com/aws/aws-sdk-go v1.34.30/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
@ -84,8 +86,6 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chrislusf/raft v1.0.3 h1:11YrnzJtVa5z7m9lhY2p8VcPHoUlC1UswyoAo+U1m1k=
github.com/chrislusf/raft v1.0.3/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.4 h1:THhbsVik2hxdE0/VXX834f64Wn9RzgVPp+E+XCWZdKM=
github.com/chrislusf/raft v1.0.4/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
@ -116,6 +116,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
@ -159,6 +161,8 @@ github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3B
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg=
@ -172,8 +176,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc=
github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -259,6 +263,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-replayers/grpcreplay v0.1.0 h1:eNb1y9rZFmY4ax45uEEECSa8fsxGRU+8Bil52ASAwic=
github.com/google/go-replayers/grpcreplay v0.1.0/go.mod h1:8Ig2Idjpr6gifRd6pNVggX6TC1Zw6Jx74AKp7QNH2QE=
github.com/google/go-replayers/httpreplay v0.1.0 h1:AX7FUb4BjrrzNvblr/OlgwrmFiep6soj5K2QSDW7BGk=
@ -345,6 +351,10 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
@ -359,8 +369,11 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/karlseguin/ccache v2.0.3+incompatible h1:j68C9tWOROiOLWTS/kCGg9IcJG+ACqn5+0+t8Oh83UU=
github.com/karlseguin/ccache v2.0.3+incompatible/go.mod h1:CM9tNPzT6EdRh14+jiW8mEF9mkNZuuE51qmgGYUB93w=
github.com/karlseguin/ccache/v2 v2.0.7 h1:y5Pfi4eiyYCOD6LS/Kj+o6Nb4M5Ngpw9qFQs+v44ZYM=
github.com/karlseguin/ccache/v2 v2.0.7/go.mod h1:2BDThcfQMf/c0jnZowt16eW405XIqZPavt+HoYEtcxQ=
github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY=
github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
@ -459,6 +472,8 @@ github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
@ -468,12 +483,15 @@ github.com/olivere/elastic/v7 v7.0.19/go.mod h1:4Jqt5xvjqpjCqgnTcHwl3j8TLs8mvoOK
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U=
github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
@ -676,6 +694,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw=
go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@ -715,6 +735,8 @@ golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc h1:c0o/qxkaO2LF5t6fQrT4b5
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067 h1:KYGJGHOQy8oSi1fDlSpcZF0+juKwk/hEMv5SiwHogR0=
@ -754,6 +776,9 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -792,9 +817,15 @@ golang.org/x/sys v0.0.0-20190620070143-6f217b454f45/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd h1:WgqgiQvkiZWz7XLhphjt2GI2GcGCTIZs9jqXMWmH+oc=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -802,6 +833,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
@ -934,6 +967,11 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

3
weed/command/command.go

@ -15,6 +15,8 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
cmdFilerCat,
cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
@ -25,7 +27,6 @@ var Commands = []*Command{
cmdScaffold,
cmdServer,
cmdShell,
cmdWatch,
cmdUpload,
cmdVersion,
cmdVolume,

6
weed/command/filer.go

@ -42,7 +42,7 @@ type FilerOptions struct {
cipher *bool
peers *string
metricsHttpPort *int
cacheToFilerLimit *int
saveToFilerLimit *int
defaultLevelDbDirectory *string
}
@ -64,7 +64,7 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer
@ -149,7 +149,7 @@ func (fo *FilerOptions) startFiler() {
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
CacheToFilerLimit: int64(*fo.cacheToFilerLimit),
SaveToFilerLimit: *fo.saveToFilerLimit,
Filers: peers,
})
if nfs_err != nil {

118
weed/command/filer_cat.go

@ -0,0 +1,118 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"math"
"net/url"
"os"
"strings"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
filerCat FilerCatOptions
)
type FilerCatOptions struct {
grpcDialOption grpc.DialOption
filerAddress string
filerClient filer_pb.SeaweedFilerClient
output *string
}
func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
return func(fileId string) (targetUrls []string, err error) {
vid := filer.VolumeId(fileId)
resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return nil, err
}
locations := resp.LocationsMap[vid]
for _, loc := range locations.Locations {
targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
}
return
}
}
func init() {
cmdFilerCat.Run = runFilerCat // break init cycle
filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout")
}
var cmdFilerCat = &Command{
UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file",
Short: "copy one file to local",
Long: `read one file to stdout or write to a file
`,
}
func runFilerCat(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
if len(args) == 0 {
return false
}
filerSource := args[len(args)-1]
filerUrl, err := url.Parse(filerSource)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
urlPath := filerUrl.Path
if strings.HasSuffix(urlPath, "/") {
fmt.Printf("The last argument should be a file: %v\n", err)
return false
}
filerCat.filerAddress = filerUrl.Host
filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
dir, name := util.FullPath(urlPath).DirAndName()
writer := os.Stdout
if *filerCat.output != "" {
fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output)
f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
fmt.Printf("open file %s: %v\n", *filerCat.output, err)
return false
}
defer f.Close()
writer = f
}
pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
respLookupEntry, err := filer_pb.LookupEntry(client, request)
if err != nil {
return err
}
filerCat.filerClient = client
return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
})
return true
}

2
weed/command/filer_copy.go

@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool {
}
urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}

201
weed/command/filer_meta_tail.go

@ -0,0 +1,201 @@
package command
import (
"context"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
"io"
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
}
var cmdFilerMetaTail = &Command{
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
`,
}
var (
tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
)
func runFilerMetaTail(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var filterFunc func(dir, fname string) bool
if *tailPattern != "" {
if strings.Contains(*tailPattern, "/") {
println("watch path pattern", *tailPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
} else {
println("watch file pattern", *tailPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*tailPattern, fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
}
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
if filterFunc == nil {
return true
}
if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
return false
}
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
return true
}
if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
return true
}
return false
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
return nil
}
if *esServers != "" {
var err error
eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
if err != nil {
fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
return false
}
}
tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "tail",
PathPrefix: *tailTarget,
SinceNs: time.Now().Add(-*tailStart).UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if !shouldPrint(resp) {
continue
}
if err = eachEntryFunc(resp); err != nil {
return err
}
}
})
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
return true
}
type EsDocument struct {
Dir string `json:"dir,omitempty"`
Name string `json:"name,omitempty"`
IsDirectory bool `json:"isDir,omitempty"`
Size uint64 `json:"size,omitempty"`
Uid uint32 `json:"uid,omitempty"`
Gid uint32 `json:"gid,omitempty"`
UserName string `json:"userName,omitempty"`
Collection string `json:"collection,omitempty"`
Crtime int64 `json:"crtime,omitempty"`
Mtime int64 `json:"mtime,omitempty"`
Mime string `json:"mime,omitempty"`
}
func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
entry := event.NewEntry
dir, name := event.NewParentPath, entry.Name
id := util.Md5String([]byte(util.NewFullPath(dir, name)))
esEntry := &EsDocument{
Dir: dir,
Name: name,
IsDirectory: entry.IsDirectory,
Size: entry.Attributes.FileSize,
Uid: entry.Attributes.Uid,
Gid: entry.Attributes.Gid,
UserName: entry.Attributes.UserName,
Collection: entry.Attributes.Collection,
Crtime: entry.Attributes.Crtime,
Mtime: entry.Attributes.Mtime,
Mime: entry.Attributes.Mime,
}
return esEntry, id
}
func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
options := []elastic.ClientOptionFunc{}
options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
options = append(options, elastic.SetSniff(false))
options = append(options, elastic.SetHealthcheck(false))
client, err := elastic.NewClient(options...)
if err != nil {
return nil, err
}
return func(resp *filer_pb.SubscribeMetadataResponse) error {
event := resp.EventNotification
if event.OldEntry != nil &&
(event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
// delete or not update the same file
dir, name := resp.Directory, event.OldEntry.Name
id := util.Md5String([]byte(util.NewFullPath(dir, name)))
println("delete", id)
_, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
return err
}
if event.NewEntry != nil {
// add a new file or update the same file
esEntry, id := toEsEntry(event)
value, err := jsoniter.Marshal(esEntry)
if err != nil {
return err
}
println(string(value))
_, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
return err
}
return nil
}, nil
}

3
weed/command/filer_replication.go

@ -14,7 +14,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
)
func init() {
@ -123,7 +122,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
func validateOneEnabledInput(config *viper.Viper) {
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {

2
weed/command/mount.go

@ -43,7 +43,7 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")

10
weed/command/scaffold.go

@ -89,6 +89,12 @@ buckets_folder = "/buckets"
enabled = true
dir = "./filerldb2" # directory to store level db files
[leveldb3]
# similar to leveldb2.
# each bucket has its own meta store.
enabled = false
dir = "./filerldb3" # directory to store level db files
[rocksdb]
# local on disk, similar to leveldb
# since it is using a C wrapper, you need to install rocksdb and build it by yourself
@ -174,9 +180,9 @@ addresses = [
]
password = ""
# allows reads from slave servers or the master, but all writes still go to the master
readOnly = true
readOnly = false
# automatically use the closest Redis server for reads
routeByLatency = true
routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []

9
weed/command/server.go

@ -61,6 +61,7 @@ var (
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server")
isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
@ -94,7 +95,7 @@ func init() {
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@ -224,7 +225,11 @@ func runServer(cmd *Command, args []string) bool {
}
startMaster(masterOptions, serverWhiteList)
if *isStartingMasterServer {
go startMaster(masterOptions, serverWhiteList)
}
select {}
return true
}

113
weed/command/watch.go

@ -1,113 +0,0 @@
package command
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
cmdWatch.Run = runWatch // break init cycle
}
var cmdWatch = &Command{
UsageLine: "watch [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
`,
}
var (
watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
)
func runWatch(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var filterFunc func(dir, fname string) bool
if *watchPattern != "" {
if strings.Contains(*watchPattern, "/") {
println("watch path pattern", *watchPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
} else {
println("watch file pattern", *watchPattern)
filterFunc = func(dir, fname string) bool {
matched, err := filepath.Match(*watchPattern, fname)
if err != nil {
fmt.Printf("error: %v", err)
}
return matched
}
}
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
if filterFunc == nil {
return true
}
if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
return false
}
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
return true
}
if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
return true
}
return false
}
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "watch",
PathPrefix: *watchTarget,
SinceNs: time.Now().Add(-*watchStart).UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if !shouldPrint(resp) {
continue
}
fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
}
})
if watchErr != nil {
fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
}
return true
}

6
weed/filer/configuration.go

@ -2,7 +2,7 @@ package filer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/util"
"os"
"reflect"
"strings"
@ -12,7 +12,7 @@ var (
Stores []FilerStore
)
func (f *Filer) LoadConfiguration(config *viper.Viper) {
func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
validateOneEnabledStore(config)
@ -79,7 +79,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
}
func validateOneEnabledStore(config *viper.Viper) {
func validateOneEnabledStore(config *util.ViperProxy) {
enabledStore := ""
for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") {

2
weed/filer/etcd/etcd_store.go

@ -101,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
resp, err := store.client.Get(ctx, string(key))
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
if len(resp.Kvs) == 0 {

7
weed/filer/filechunk_manifest.go

@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"time"
@ -38,7 +39,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
@ -63,7 +64,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return
}
func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
@ -84,7 +85,7 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
}
// TODO fetch from cache for weed mount?
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)

9
weed/filer/filechunks.go

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"sort"
"sync"
@ -52,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
}
func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@ -71,7 +72,7 @@ func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_
return
}
func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
if aErr != nil {
@ -116,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@ -222,7 +223,7 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)

2
weed/filer/filer_notify.go

@ -170,7 +170,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func
return lastTsNs, err
}
if logEntry.TsNs <= ns {
return lastTsNs, nil
continue
}
// println("each log: ", logEntry.TsNs)
if err := eachLogEntryFn(logEntry); err != nil {

3
weed/filer/hbase/hbase_store_kv.go

@ -7,9 +7,10 @@ import (
"time"
)
const(
const (
COLUMN_NAME = "a"
)
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return store.doPut(ctx, store.cfKv, key, value, 0)
}

2
weed/filer/leveldb/leveldb_store.go

@ -107,7 +107,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{

2
weed/filer/leveldb2/leveldb2_store.go

@ -115,7 +115,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{

361
weed/filer/leveldb3/leveldb3_store.go

@ -0,0 +1,361 @@
package leveldb
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"github.com/syndtr/goleveldb/leveldb"
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"io"
"os"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
const (
DEFAULT = "_main"
)
func init() {
filer.Stores = append(filer.Stores, &LevelDB3Store{})
}
type LevelDB3Store struct {
dir string
dbs map[string]*leveldb.DB
dbsLock sync.RWMutex
}
func (store *LevelDB3Store) GetName() string {
return "leveldb3"
}
func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
dir := configuration.GetString(prefix + "dir")
return store.initialize(dir)
}
func (store *LevelDB3Store) initialize(dir string) (err error) {
glog.Infof("filer store leveldb3 dir: %s", dir)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
store.dir = dir
db, loadDbErr := store.loadDB(DEFAULT)
if loadDbErr != nil {
return loadDbErr
}
store.dbs = make(map[string]*leveldb.DB)
store.dbs[DEFAULT] = db
return
}
func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
opts := &opt.Options{
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
}
if name != DEFAULT {
opts = &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
}
}
dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
os.MkdirAll(dbFolder, 0755)
db, dbErr := leveldb.OpenFile(dbFolder, opts)
if leveldb_errors.IsCorrupted(dbErr) {
db, dbErr = leveldb.RecoverFile(dbFolder, opts)
}
if dbErr != nil {
glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
return nil, dbErr
}
return db, nil
}
func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
store.dbsLock.RLock()
defaultDB := store.dbs[DEFAULT]
if !strings.HasPrefix(string(fullpath), "/buckets/") {
store.dbsLock.RUnlock()
return defaultDB, DEFAULT, fullpath, nil
}
// detect bucket
bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
t := strings.Index(bucketAndObjectKey, "/")
if t < 0 && !isForChildren {
store.dbsLock.RUnlock()
return defaultDB, DEFAULT, fullpath, nil
}
bucket := bucketAndObjectKey
shortPath := weed_util.FullPath("/")
if t > 0 {
bucket = bucketAndObjectKey[:t]
shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
}
if db, found := store.dbs[bucket]; found {
store.dbsLock.RUnlock()
return db, bucket, shortPath, nil
}
store.dbsLock.RUnlock()
// upgrade to write lock
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
// double check after getting the write lock
if db, found := store.dbs[bucket]; found {
return db, bucket, shortPath, nil
}
// create db
db, err := store.loadDB(bucket)
if err != nil {
return nil, bucket, shortPath, err
}
store.dbs[bucket] = db
return db, bucket, shortPath, nil
}
func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
db, _, shortPath, err := store.findDB(entry.FullPath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
}
dir, name := shortPath.DirAndName()
key := genKey(dir, name)
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > 50 {
value = weed_util.MaybeGzipData(value)
}
err = db.Put(key, value, nil)
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
// println("saved", entry.FullPath, "chunks", len(entry.Chunks))
return nil
}
func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
db, _, shortPath, err := store.findDB(fullpath, false)
if err != nil {
return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
}
dir, name := shortPath.DirAndName()
key := genKey(dir, name)
data, err := db.Get(key, nil)
if err == leveldb.ErrNotFound {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
// println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
return entry, nil
}
func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
db, _, shortPath, err := store.findDB(fullpath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
dir, name := shortPath.DirAndName()
key := genKey(dir, name)
err = db.Delete(key, nil)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return nil
}
func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
db, bucket, shortPath, err := store.findDB(fullpath, true)
if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
if bucket != DEFAULT && shortPath == "/" {
db.Close()
if bucket != "" { // just to make sure
os.RemoveAll(store.dir + "/" + bucket)
}
return nil
}
directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
batch := new(leveldb.Batch)
iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {
break
}
fileName := getNameFromKey(key)
if fileName == "" {
continue
}
batch.Delete(append(directoryPrefix, []byte(fileName)...))
}
iter.Release()
err = db.Write(batch, nil)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
return nil
}
func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
}
func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
db, _, shortPath, err := store.findDB(fullpath, true)
if err != nil {
return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
}
directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
lastFileStart := directoryPrefix
if startFileName != "" {
lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
}
iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {
break
}
fileName := getNameFromKey(key)
if fileName == "" {
continue
}
if fileName == startFileName && !inclusive {
continue
}
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
entries = append(entries, entry)
}
iter.Release()
return entries, err
}
func genKey(dirPath, fileName string) (key []byte) {
key = hashToBytes(dirPath)
key = append(key, []byte(fileName)...)
return key
}
func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = hashToBytes(string(fullpath))
if len(startFileName) > 0 {
keyPrefix = append(keyPrefix, []byte(startFileName)...)
}
return keyPrefix
}
func getNameFromKey(key []byte) string {
return string(key[md5.Size:])
}
// hash directory
func hashToBytes(dir string) []byte {
h := md5.New()
io.WriteString(h, dir)
b := h.Sum(nil)
return b
}
func (store *LevelDB3Store) Shutdown() {
for _, db := range store.dbs {
db.Close()
}
}

46
weed/filer/leveldb3/leveldb3_store_kv.go

@ -0,0 +1,46 @@
package leveldb
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/syndtr/goleveldb/leveldb"
)
func (store *LevelDB3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
err = store.dbs[DEFAULT].Put(key, value, nil)
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *LevelDB3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
value, err = store.dbs[DEFAULT].Get(key, nil)
if err == leveldb.ErrNotFound {
return nil, filer.ErrKvNotFound
}
if err != nil {
return nil, fmt.Errorf("kv get: %v", err)
}
return
}
func (store *LevelDB3Store) KvDelete(ctx context.Context, key []byte) (err error) {
err = store.dbs[DEFAULT].Delete(key, nil)
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}

88
weed/filer/leveldb3/leveldb3_store_test.go

@ -0,0 +1,88 @@
package leveldb
import (
"context"
"io/ioutil"
"os"
"testing"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
entry1 := &filer.Entry{
FullPath: fullpath,
Attr: filer.Attr{
Mode: 0440,
Uid: 1234,
Gid: 5678,
},
}
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
entry, err := testFiler.FindEntry(ctx, fullpath)
if err != nil {
t.Errorf("find entry: %v", err)
return
}
if entry.FullPath != entry1.FullPath {
t.Errorf("find wrong entry: %v", entry.FullPath)
return
}
// checking one upper directory
entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
}
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
ctx := context.Background()
// checking one upper directory
entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
}
if len(entries) != 0 {
t.Errorf("list entries count: %v", len(entries))
return
}
}

6
weed/filer/reader_at.go

@ -18,7 +18,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
lookupFileId LookupFileIdFunctionType
lookupFileId wdclient.LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
@ -31,9 +31,7 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex

6
weed/filer/redis/redis_cluster_store.go

@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func init() {
@ -20,8 +20,8 @@ func (store *RedisClusterStore) GetName() string {
func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
configuration.SetDefault(prefix+"useReadOnly", true)
configuration.SetDefault(prefix+"routeByLatency", true)
configuration.SetDefault(prefix+"useReadOnly", false)
configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),

2
weed/filer/redis/redis_store.go

@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func init() {

22
weed/filer/redis/universal_redis_store.go

@ -7,7 +7,7 @@ import (
"strings"
"time"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -44,7 +44,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
value = util.MaybeGzipData(value)
}
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
_, err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@ -52,7 +52,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
dir, name := entry.FullPath.DirAndName()
if name != "" {
_, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
_, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
@ -68,7 +68,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@ -90,7 +90,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
_, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@ -98,7 +98,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
dir, name := fullpath.DirAndName()
if name != "" {
_, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
_, err = store.Client.SRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@ -109,14 +109,14 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete folder %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
_, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@ -133,7 +133,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
limit int) (entries []*filer.Entry, err error) {
dirListKey := genDirectoryListKey(string(fullpath))
members, err := store.Client.SMembers(dirListKey).Result()
members, err := store.Client.SMembers(ctx, dirListKey).Result()
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
@ -174,8 +174,8 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(string(path)).Result()
store.Client.SRem(dirListKey, fileName).Result()
store.Client.Del(ctx, string(path)).Result()
store.Client.SRem(ctx, dirListKey, fileName).Result()
continue
}
}

8
weed/filer/redis/universal_redis_store_kv.go

@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(string(key), value, 0).Result()
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@ -21,7 +21,7 @@ func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value [
func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(string(key)).Result()
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@ -32,7 +32,7 @@ func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedisStore) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(string(key)).Result()
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)

6
weed/filer/redis2/redis_cluster_store.go

@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func init() {
@ -20,8 +20,8 @@ func (store *RedisCluster2Store) GetName() string {
func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
configuration.SetDefault(prefix+"useReadOnly", true)
configuration.SetDefault(prefix+"routeByLatency", true)
configuration.SetDefault(prefix+"useReadOnly", false)
configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),

2
weed/filer/redis2/redis_store.go

@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func init() {

26
weed/filer/redis2/universal_redis_store.go

@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -56,7 +56,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@ -66,7 +66,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
@ -81,7 +81,7 @@ func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@ -103,12 +103,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(genDirectoryListKey(string(fullpath))).Result()
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
_, err = store.Client.Del(string(fullpath)).Result()
_, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@ -118,7 +118,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
return nil
}
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
_, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
@ -133,14 +133,14 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
members, err := store.Client.ZRange(ctx, genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
_, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
@ -159,12 +159,12 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, ful
dirListKey := genDirectoryListKey(string(fullpath))
start := int64(0)
if startFileName != "" {
start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result()
if !inclusive {
start++
}
}
members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
@ -178,8 +178,8 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, ful
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(string(path)).Result()
store.Client.ZRem(dirListKey, fileName).Result()
store.Client.Del(ctx, string(path)).Result()
store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
}
}

8
weed/filer/redis2/universal_redis_store_kv.go

@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
)
func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(string(key), value, 0).Result()
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@ -21,7 +21,7 @@ func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value
func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(string(key)).Result()
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@ -32,7 +32,7 @@ func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedis2Store) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(string(key)).Result()
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)

99
weed/filer/rocksdb/rocksdb_store.go

@ -7,21 +7,42 @@ import (
"context"
"crypto/md5"
"fmt"
"io"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/tecbot/gorocksdb"
"io"
)
func init() {
filer.Stores = append(filer.Stores, &RocksDBStore{})
}
type options struct {
opt *gorocksdb.Options
ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions
}
func (opt *options) init() {
opt.opt = gorocksdb.NewDefaultOptions()
opt.ro = gorocksdb.NewDefaultReadOptions()
opt.wo = gorocksdb.NewDefaultWriteOptions()
}
func (opt *options) close() {
opt.opt.Destroy()
opt.ro.Destroy()
opt.wo.Destroy()
}
type RocksDBStore struct {
path string
db *gorocksdb.DB
options
}
func (store *RocksDBStore) GetName() string {
@ -38,10 +59,15 @@ func (store *RocksDBStore) initialize(dir string) (err error) {
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
store.options.init()
store.opt.SetCreateIfMissing(true)
// reduce write amplification
// also avoid expired data stored in highest level never get compacted
store.opt.SetLevelCompactionDynamicLevelBytes(true)
store.opt.SetCompactionFilter(NewTTLFilter())
// store.opt.SetMaxBackgroundCompactions(2)
options := gorocksdb.NewDefaultOptions()
options.SetCreateIfMissing(true)
store.db, err = gorocksdb.OpenDb(options, dir)
store.db, err = gorocksdb.OpenDb(store.opt, dir)
return
}
@ -65,8 +91,7 @@ func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Put(wo, key, value)
err = store.db.Put(store.wo, key, value)
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@ -85,21 +110,21 @@ func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
ro := gorocksdb.NewDefaultReadOptions()
data, err := store.db.GetBytes(ro, key)
data, err := store.db.Get(store.ro, key)
if data == nil {
return nil, filer_pb.ErrNotFound
}
defer data.Free()
if err != nil {
return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
err = entry.DecodeAttributesAndChunks(data.Data())
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@ -113,8 +138,7 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Delete(wo, key)
err = store.db.Delete(store.wo, key)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@ -125,10 +149,13 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
batch := new(gorocksdb.WriteBatch)
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy()
ro.SetFillCache(false)
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
@ -139,8 +166,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return fmt.Errorf("delete list %s : %v", fullpath, err)
}
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Write(wo, batch)
err = store.db.Write(store.wo, batch)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@ -155,22 +181,12 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
iter.Seek(prefix)
} else {
iter.Seek(lastKey)
if !includeLastKey {
k := iter.Key()
v := iter.Value()
key := k.Data()
defer k.Free()
defer v.Free()
if !bytes.HasPrefix(key, prefix) {
return nil
}
if bytes.Equal(key, lastKey) {
iter.Next()
if iter.Valid() {
if bytes.Equal(iter.Key().Data(), lastKey) {
iter.Next()
}
}
}
}
@ -184,21 +200,13 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
}
}
k := iter.Key()
v := iter.Value()
key := k.Data()
value := v.Data()
key := iter.Key().Data()
if !bytes.HasPrefix(key, prefix) {
k.Free()
v.Free()
break
}
ret := fn(key, value)
k.Free()
v.Free()
ret := fn(key, iter.Value().Data())
if !ret {
break
@ -226,7 +234,9 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
}
ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy()
ro.SetFillCache(false)
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
@ -234,16 +244,12 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
if fileName == "" {
return true
}
limit--
if limit < 0 {
return false
}
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
return false
@ -290,4 +296,5 @@ func hashToBytes(dir string) []byte {
func (store *RocksDBStore) Shutdown() {
store.db.Close()
store.options.close()
}

10
weed/filer/rocksdb/rocksdb_store_kv.go

@ -5,15 +5,13 @@ package rocksdb
import (
"context"
"fmt"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Put(wo, key, value)
err = store.db.Put(store.wo, key, value)
if err != nil {
return fmt.Errorf("kv put: %v", err)
@ -24,8 +22,7 @@ func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte)
func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
ro := gorocksdb.NewDefaultReadOptions()
value, err = store.db.GetBytes(ro, key)
value, err = store.db.GetBytes(store.ro, key)
if value == nil {
return nil, filer.ErrKvNotFound
@ -40,8 +37,7 @@ func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte,
func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
wo := gorocksdb.NewDefaultWriteOptions()
err = store.db.Delete(wo, key)
err = store.db.Delete(store.wo, key)
if err != nil {
return fmt.Errorf("kv delete: %v", err)

40
weed/filer/rocksdb/rocksdb_ttl.go

@ -0,0 +1,40 @@
//+build rocksdb
package rocksdb
import (
"time"
"github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
type TTLFilter struct {
skipLevel0 bool
}
func NewTTLFilter() gorocksdb.CompactionFilter {
return &TTLFilter{
skipLevel0: true,
}
}
func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
// decode could be slow, causing write stall
// level >0 sst can run compaction in parallel
if !t.skipLevel0 || level > 0 {
entry := filer.Entry{}
if err := entry.DecodeAttributesAndChunks(val); err == nil {
if entry.TtlSec > 0 &&
entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return true, nil
}
}
}
return false, val
}
func (t *TTLFilter) Name() string {
return "TTLFilter"
}

8
weed/filer/stream.go

@ -13,16 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@ -86,7 +86,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
lookupFileId LookupFileIdFunctionType
lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})

26
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -101,20 +101,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return nil
}
if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
if err != io.EOF {
// println("stopping from persisted logs", err.Error())
return err
}
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
for {
if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
if err != io.EOF {
// println("stopping from persisted logs", err.Error())
return err
}
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@ -122,6 +123,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return isConnected
}, eachLogEntryFn)
if err != nil {
if err == log_buffer.ResumeFromDiskError {
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {

5
weed/notification/configuration.go

@ -4,7 +4,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"github.com/spf13/viper"
)
type MessageQueue interface {
@ -21,7 +20,7 @@ var (
Queue MessageQueue
)
func LoadConfiguration(config *viper.Viper, prefix string) {
func LoadConfiguration(config *util.ViperProxy, prefix string) {
if config == nil {
return
@ -43,7 +42,7 @@ func LoadConfiguration(config *viper.Viper, prefix string) {
}
func validateOneEnabledQueue(config *viper.Viper) {
func validateOneEnabledQueue(config *util.ViperProxy) {
enabledQueue := ""
for _, queue := range MessageQueues {
if config.GetBool(queue.GetName() + ".enabled") {

3
weed/replication/sink/filersink/filer_sink.go

@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@ -206,7 +207,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
if aErr != nil {
return nil, nil, aErr

2
weed/s3api/s3api_bucket_handlers.go

@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
if identity!=nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
if identity != nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{

2
weed/s3api/s3api_objects_list_handlers.go

@ -71,7 +71,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
ContinuationToken: continuationToken,
Delimiter: response.Delimiter,
IsTruncated: response.IsTruncated,
KeyCount: len(response.Contents),
KeyCount: len(response.Contents) + len(response.CommonPrefixes),
MaxKeys: response.MaxKeys,
NextContinuationToken: response.NextMarker,
Prefix: response.Prefix,

7
weed/security/tls.go

@ -3,17 +3,16 @@ package security
import (
"crypto/tls"
"crypto/x509"
"github.com/chrislusf/seaweedfs/weed/util"
"io/ioutil"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption {
func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption {
if config == nil {
return nil
}
@ -40,7 +39,7 @@ func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption {
return grpc.Creds(ta)
}
func LoadClientTLS(config *viper.Viper, component string) grpc.DialOption {
func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
if config == nil {
return grpc.WithInsecure()
}

52
weed/server/filer_grpc_server_sub_meta.go

@ -29,16 +29,20 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
var processedTsNs int64
var err error
for {
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
@ -46,6 +50,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return true
}, eachLogEntryFn)
if err != nil {
if err == log_buffer.ResumeFromDiskError {
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
@ -73,19 +80,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
// println("reading from persisted logs ...")
processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
var processedTsNs int64
var err error
// println("reading from in memory logs ...")
for {
// println("reading from persisted logs ...")
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
// glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
// println("reading from in memory logs ...")
lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
@ -93,6 +104,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
return true
}, eachLogEntryFn)
if err != nil {
if err == log_buffer.ResumeFromDiskError {
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {

3
weed/server/filer_server.go

@ -26,6 +26,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
@ -56,7 +57,7 @@ type FilerOption struct {
Port uint32
recursiveDelete bool
Cipher bool
CacheToFilerLimit int64
SaveToFilerLimit int
Filers []string
}

10
weed/server/filer_server_handlers_read.go

@ -100,6 +100,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.Header().Set(k, string(v))
}
//Seaweed custom header are not visible to Vue or javascript
seaweedHeaders := []string{}
for header, _ := range w.Header() {
if strings.HasPrefix(header, "Seaweed-") {
seaweedHeaders = append(seaweedHeaders, header)
}
}
seaweedHeaders = append(seaweedHeaders, "Content-Disposition")
w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
//set tag count
if r.Method == "GET" {
tagCount := 0

14
weed/server/filer_server_handlers_write_autochunk.go

@ -207,7 +207,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
chunkOffset := int64(0)
var smallContent, content []byte
var smallContent []byte
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@ -216,6 +216,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil {
return nil, nil, 0, err, nil
}
if chunkOffset == 0 {
if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
smallContent = data
chunkOffset += int64(len(data))
break
}
}
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@ -242,8 +249,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, nil, 0, uploadErr, nil
}
content = data
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
break
@ -263,9 +268,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
}
if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
smallContent = content
}
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

4
weed/storage/backend/backend.go

@ -1,6 +1,7 @@
package backend
import (
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"os"
"strings"
@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/spf13/viper"
)
type BackendStorageFile interface {
@ -45,7 +45,7 @@ var (
)
// used by master to load remote storage configurations
func LoadConfiguration(config *viper.Viper) {
func LoadConfiguration(config *util.ViperProxy) {
StorageBackendPrefix := "storage.backend"

2
weed/util/chunk_cache/chunk_cache_in_memory.go

@ -3,7 +3,7 @@ package chunk_cache
import (
"time"
"github.com/karlseguin/ccache"
"github.com/karlseguin/ccache/v2"
)
// a global cache for recently accessed file chunks

18
weed/util/config.go

@ -2,6 +2,7 @@ package util
import (
"strings"
"sync"
"github.com/spf13/viper"
@ -46,9 +47,20 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
return true
}
func GetViper() *viper.Viper {
v := &viper.Viper{}
*v = *viper.GetViper()
type ViperProxy struct {
*viper.Viper
sync.Mutex
}
func (vp *ViperProxy) SetDefault(key string, value interface{}) {
vp.Lock()
defer vp.Unlock()
vp.Viper.SetDefault(key, value)
}
func GetViper() *ViperProxy {
v := &ViperProxy{}
v.Viper = viper.GetViper()
v.AutomaticEnv()
v.SetEnvPrefix("weed")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 19)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 20)
COMMIT = ""
)

24
weed/util/log_buffer/log_buffer.go

@ -28,6 +28,7 @@ type LogBuffer struct {
pos int
startTime time.Time
stopTime time.Time
lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
@ -129,6 +130,7 @@ func (m *LogBuffer) loopFlush() {
// fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
d.releaseMemory()
m.lastFlushTime = d.stopTime
}
}
}
@ -174,10 +176,14 @@ func (d *dataToFlush) releaseMemory() {
bufferPool.Put(d.data)
}
func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) {
m.RLock()
defer m.RUnlock()
if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
return nil, ResumeFromDiskError
}
/*
fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
for i, prevBuf := range m.prevBuffers.buffers {
@ -186,11 +192,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
*/
if lastReadTime.Equal(m.stopTime) {
return nil
return nil, nil
}
if lastReadTime.After(m.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
return nil
return nil, nil
}
if lastReadTime.Before(m.startTime) {
// println("checking ", lastReadTime.UnixNano())
@ -198,19 +204,19 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
if buf.startTime.After(lastReadTime) {
if i == 0 {
// println("return the earliest in memory", buf.startTime.UnixNano())
return copiedBytes(buf.buf[:buf.size])
return copiedBytes(buf.buf[:buf.size]), nil
}
// println("return the", i, "th in memory", buf.startTime.UnixNano())
return copiedBytes(buf.buf[:buf.size])
return copiedBytes(buf.buf[:buf.size]), nil
}
if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
pos := buf.locateByTs(lastReadTime)
// fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
return copiedBytes(buf.buf[pos:buf.size])
return copiedBytes(buf.buf[pos:buf.size]), nil
}
}
// println("return the current buf", lastReadTime.UnixNano())
return copiedBytes(m.buf[:m.pos])
return copiedBytes(m.buf[:m.pos]), nil
}
lastTs := lastReadTime.UnixNano()
@ -243,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
if prevT <= lastTs {
// fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
return copiedBytes(m.buf[pos:m.pos])
return copiedBytes(m.buf[pos:m.pos]), nil
}
h = mid
}
@ -251,7 +257,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
// FIXME: this could be that the buffer has been flushed already
return nil
return nil, nil
}
func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {

8
weed/util/log_buffer/log_read.go

@ -13,7 +13,8 @@ import (
)
var (
ResumeError = fmt.Errorf("resume")
ResumeError = fmt.Errorf("resume")
ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
func (logBuffer *LogBuffer) LoopProcessLogData(
@ -34,7 +35,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
if err == ResumeFromDiskError {
return lastReadTime, ResumeFromDiskError
}
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
if bytesBuf == nil {
if waitForDataFn() {

10
weed/wdclient/vid_map.go

@ -15,6 +15,12 @@ const (
maxCursorIndex = 4096
)
type HasLookupFileIdFunction interface {
GetLookupFileIdFunction() LookupFileIdFunctionType
}
type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
@ -67,6 +73,10 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
return vc.LookupFileId
}
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {

Loading…
Cancel
Save