Browse Source

Merge pull request #78 from chrislusf/master

sync
pull/2394/head
hilimd 4 years ago
committed by GitHub
parent
commit
ff26e64648
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      README.md
  2. 2
      docker/compose/local-s3tests-compose.yml
  3. 9
      go.mod
  4. 43
      go.sum
  5. 4
      k8s/seaweedfs/Chart.yaml
  6. 2
      k8s/seaweedfs/values.yaml
  7. 2
      other/java/client/pom.xml
  8. 2
      other/java/client/pom.xml.deploy
  9. 2
      other/java/client/pom_debug.xml
  10. 2
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  11. 8
      other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
  12. 4
      other/java/examples/pom.xml
  13. 2
      other/java/hdfs2/dependency-reduced-pom.xml
  14. 2
      other/java/hdfs2/pom.xml
  15. 2
      other/java/hdfs3/dependency-reduced-pom.xml
  16. 2
      other/java/hdfs3/pom.xml
  17. 2
      unmaintained/load_test/load_test_leveldb/load_test_leveldb.go
  18. 108
      unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
  19. 1
      weed/command/command.go
  20. 165
      weed/command/fuse.go
  21. 2
      weed/command/mount.go
  22. 20
      weed/command/mount_std.go
  23. 11
      weed/command/scaffold.go
  24. 1
      weed/filer/meta_aggregator.go
  25. 78
      weed/filer/sqlite/sqlite_store.go
  26. 9
      weed/filer/sqlite/sqlite_store_unsupported.go
  27. 10
      weed/filer/stream.go
  28. 11
      weed/filesys/dirty_pages_temp_file.go
  29. 37
      weed/filesys/wfs.go
  30. 39
      weed/filesys/wfs_filer_client.go
  31. 2
      weed/filesys/wfs_write.go
  32. 59
      weed/pb/grpc_client_server.go
  33. 3
      weed/replication/sink/azuresink/azure_sink.go
  34. 5
      weed/replication/sink/b2sink/b2_sink.go
  35. 3
      weed/replication/sink/gcssink/gcs_sink.go
  36. 30
      weed/replication/sink/localsink/local_sink.go
  37. 3
      weed/replication/sink/s3sink/s3_sink.go
  38. 1
      weed/replication/source/filer_source.go
  39. 33
      weed/s3api/s3api_object_handlers.go
  40. 12
      weed/s3api/s3err/s3api_errors.go
  41. 8
      weed/server/common.go
  42. 3
      weed/server/filer_server.go
  43. 21
      weed/server/filer_server_handlers_read.go
  44. 2
      weed/server/filer_server_handlers_write_autochunk.go
  45. 7
      weed/server/volume_server_handlers_read.go
  46. 2
      weed/server/volume_server_ui/templates.go
  47. 2
      weed/shell/command_fs_meta_save.go
  48. 3
      weed/shell/command_s3_bucket_list.go
  49. 4
      weed/static/javascript/jquery-2.1.3.min.js
  50. 2
      weed/static/javascript/jquery-3.6.0.min.js
  51. 6
      weed/topology/data_node.go
  52. 3
      weed/util/config.go
  53. 2
      weed/util/constants.go
  54. 1
      weed/wdclient/masterclient.go

11
README.md

@ -6,7 +6,7 @@
[![Build Status](https://travis-ci.org/chrislusf/seaweedfs.svg?branch=master)](https://travis-ci.org/chrislusf/seaweedfs)
[![GoDoc](https://godoc.org/github.com/chrislusf/seaweedfs/weed?status.svg)](https://godoc.org/github.com/chrislusf/seaweedfs/weed)
[![Wiki](https://img.shields.io/badge/docs-wiki-blue.svg)](https://github.com/chrislusf/seaweedfs/wiki)
[![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs/)
[![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs/)
[![SeaweedFS on Maven Central](https://img.shields.io/maven-central/v/com.github.chrislusf/seaweedfs-client)](https://search.maven.org/search?q=g:com.github.chrislusf)
@ -44,7 +44,8 @@ Your support will be really appreciated by me and other supporters!
- [SeaweedFS Mailing List](https://groups.google.com/d/forum/seaweedfs)
- [Wiki Documentation](https://github.com/chrislusf/seaweedfs/wiki)
- [SeaweedFS White Paper](https://github.com/chrislusf/seaweedfs/wiki/SeaweedFS_Architecture.pdf)
- [SeaweedFS Introduction Slides](https://www.slideshare.net/chrislusf/seaweedfs-introduction)
- [SeaweedFS Introduction Slides 2021.5](https://docs.google.com/presentation/d/1DcxKWlINc-HNCjhYeERkpGXXm6nTCES8mi2W5G0Z4Ts/edit?usp=sharing)
- [SeaweedFS Introduction Slides 2019.3](https://www.slideshare.net/chrislusf/seaweedfs-introduction)
Table of Contents
=================
@ -108,7 +109,7 @@ Also, SeaweedFS implements erasure coding with ideas from
On top of the object store, optional [Filer] can support directories and POSIX attributes.
Filer is a separate linearly-scalable stateless server with customizable metadata stores,
e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, MemSql, TiDB, Etcd, CockroachDB, etc.
e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, RocksDB, Sqlite, MemSql, TiDB, Etcd, CockroachDB, etc.
For any distributed key value stores, the large values can be offloaded to SeaweedFS.
With the fast access speed and linearly scalable capacity,
@ -399,7 +400,7 @@ The architectures are mostly the same. SeaweedFS aims to store and read files fa
* SeaweedFS optimizes for small files, ensuring O(1) disk seek operation, and can also handle large files.
* SeaweedFS statically assigns a volume id for a file. Locating file content becomes just a lookup of the volume id, which can be easily cached.
* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized.
* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, Sqlite, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized.
* SeaweedFS Volume server also communicates directly with clients via HTTP, supporting range queries, direct uploads, etc.
| System | File Metadata | File Content Read| POSIX | REST API | Optimized for large number of small files |
@ -441,7 +442,7 @@ Ceph uses CRUSH hashing to automatically manage the data placement, which is eff
SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read.
SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage.
SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Sqlite, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage.
| SeaweedFS | comparable to Ceph | advantage |
| ------------- | ------------- | ---------------- |

2
docker/compose/local-s3tests-compose.yml

@ -38,7 +38,7 @@ services:
S3TEST_CONF: "s3tests.conf"
NOSETESTS_OPTIONS: "--verbose --logging-level=ERROR --with-xunit --failure-detail s3tests_boto3.functional.test_s3"
NOSETESTS_ATTR: "!tagging,!fails_on_aws,!encryption,!bucket-policy,!versioning,!fails_on_rgw,!bucket-policy,!fails_with_subdomain,!policy_status,!object-lock,!lifecycle,!cors,!user-policy"
NOSETESTS_EXCLUDE: "(bucket_list_delimiter_basic|bucket_listv2_delimiter_basic|bucket_listv2_encoding_basic|bucket_list_encoding_basic|bucket_list_delimiter_prefix|bucket_listv2_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_alt|bucket_listv2_delimiter_alt|bucket_list_delimiter_prefix_underscore|bucket_list_delimiter_percentage|bucket_listv2_delimiter_percentage|bucket_list_delimiter_whitespace|bucket_listv2_delimiter_whitespace|bucket_list_delimiter_dot|bucket_listv2_delimiter_dot|bucket_list_delimiter_unreadable|bucket_listv2_delimiter_unreadable|bucket_listv2_fetchowner_defaultempty|bucket_listv2_fetchowner_empty|bucket_list_prefix_delimiter_alt|bucket_listv2_prefix_delimiter_alt|bucket_list_prefix_delimiter_prefix_not_exist|bucket_listv2_prefix_delimiter_prefix_not_exist|bucket_list_prefix_delimiter_delimiter_not_exist|bucket_listv2_prefix_delimiter_delimiter_not_exist|bucket_list_prefix_delimiter_prefix_delimiter_not_exist|bucket_listv2_prefix_delimiter_prefix_delimiter_not_exist|bucket_list_maxkeys_none|bucket_listv2_maxkeys_none|bucket_list_maxkeys_invalid|bucket_listv2_continuationtoken_empty|bucket_list_return_data|bucket_list_objects_anonymous|bucket_listv2_objects_anonymous|bucket_notexist|bucketv2_notexist|bucket_delete_nonempty|bucket_concurrent_set_canned_acl|object_write_to_nonexist_bucket|object_requestid_matches_header_on_error|object_write_cache_control|object_write_expires|object_set_get_metadata_none_to_good|object_set_get_metadata_none_to_empty|object_set_get_metadata_overwrite_to_empty|post_object_anonymous_request|post_object_authenticated_request|post_object_authenticated_no_content_type|post_object_authenticated_request_bad_access_key|post_object_set_success_code|post_object_set_invalid_success_code|post_object_upload_larger_than_chunk|post_object_set_key_from_filename|post_object_ignored_header|post_object_case_insensitive_condition_fields|post_object_escaped_field_values|post_object_success_redirect_action|post_object_invalid_signature|post_object_invalid_access_key|post_object_missing_policy_condition|post_object_user_specified_header|post_object_request_missing_policy_specified_field|post_object_expired_policy|post_object_invalid_request_field_value|get_object_ifmatch_failed|get_object_ifunmodifiedsince_good|put_object_ifmatch_failed|object_raw_get_bucket_gone|object_delete_key_bucket_gone|object_raw_get_bucket_acl|object_raw_get_object_acl|object_raw_response_headers|object_raw_authenticated_bucket_gone|object_raw_get_x_amz_expires_out_max_range|object_raw_get_x_amz_expires_out_positive_range|object_anon_put_write_access|object_raw_put_authenticated_expired|bucket_create_exists|bucket_create_naming_bad_short_one|bucket_create_naming_bad_short_two|bucket_get_location|bucket_acl_default|bucket_acl_canned|bucket_acl_canned_publicreadwrite|bucket_acl_canned_authenticatedread|object_acl_default|object_acl_canned_during_create|object_acl_canned|object_acl_canned_publicreadwrite|object_acl_canned_authenticatedread|object_acl_canned_bucketownerread|object_acl_canned_bucketownerfullcontrol|object_acl_full_control_verify_attributes|bucket_acl_canned_private_to_private|bucket_acl_grant_nonexist_user|bucket_acl_no_grants|bucket_acl_grant_email_not_exist|bucket_acl_revoke_all|bucket_recreate_not_overriding|object_copy_verify_contenttype|object_copy_to_itself_with_metadata|object_copy_not_owned_bucket|object_copy_not_owned_object_bucket|object_copy_retaining_metadata|object_copy_replacing_metadata|multipart_upload_empty|multipart_copy_invalid_range|multipart_copy_special_names|multipart_upload_resend_part|multipart_upload_size_too_small|abort_multipart_upload_not_found|multipart_upload_missing_part|multipart_upload_incorrect_etag|100_continue|ranged_request_invalid_range|ranged_request_empty_object|access_bucket)"
NOSETESTS_EXCLUDE: "(bucket_list_delimiter_basic|bucket_listv2_delimiter_basic|bucket_listv2_encoding_basic|bucket_list_encoding_basic|bucket_list_delimiter_prefix|bucket_listv2_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_prefix_ends_with_delimiter|bucket_list_delimiter_alt|bucket_listv2_delimiter_alt|bucket_list_delimiter_prefix_underscore|bucket_list_delimiter_percentage|bucket_listv2_delimiter_percentage|bucket_list_delimiter_whitespace|bucket_listv2_delimiter_whitespace|bucket_list_delimiter_dot|bucket_listv2_delimiter_dot|bucket_list_delimiter_unreadable|bucket_listv2_delimiter_unreadable|bucket_listv2_fetchowner_defaultempty|bucket_listv2_fetchowner_empty|bucket_list_prefix_delimiter_alt|bucket_listv2_prefix_delimiter_alt|bucket_list_prefix_delimiter_prefix_not_exist|bucket_listv2_prefix_delimiter_prefix_not_exist|bucket_list_prefix_delimiter_delimiter_not_exist|bucket_listv2_prefix_delimiter_delimiter_not_exist|bucket_list_prefix_delimiter_prefix_delimiter_not_exist|bucket_listv2_prefix_delimiter_prefix_delimiter_not_exist|bucket_list_maxkeys_none|bucket_listv2_maxkeys_none|bucket_list_maxkeys_invalid|bucket_listv2_continuationtoken_empty|bucket_list_return_data|bucket_list_objects_anonymous|bucket_listv2_objects_anonymous|bucket_notexist|bucketv2_notexist|bucket_delete_nonempty|bucket_concurrent_set_canned_acl|object_write_to_nonexist_bucket|object_requestid_matches_header_on_error|object_set_get_metadata_none_to_good|object_set_get_metadata_none_to_empty|object_set_get_metadata_overwrite_to_empty|post_object_anonymous_request|post_object_authenticated_request|post_object_authenticated_no_content_type|post_object_authenticated_request_bad_access_key|post_object_set_success_code|post_object_set_invalid_success_code|post_object_upload_larger_than_chunk|post_object_set_key_from_filename|post_object_ignored_header|post_object_case_insensitive_condition_fields|post_object_escaped_field_values|post_object_success_redirect_action|post_object_invalid_signature|post_object_invalid_access_key|post_object_missing_policy_condition|post_object_user_specified_header|post_object_request_missing_policy_specified_field|post_object_expired_policy|post_object_invalid_request_field_value|get_object_ifunmodifiedsince_good|put_object_ifmatch_failed|object_raw_get_bucket_gone|object_delete_key_bucket_gone|object_raw_get_bucket_acl|object_raw_get_object_acl|object_raw_response_headers|object_raw_authenticated_bucket_gone|object_raw_get_x_amz_expires_out_max_range|object_raw_get_x_amz_expires_out_positive_range|object_anon_put_write_access|object_raw_put_authenticated_expired|bucket_create_exists|bucket_create_naming_bad_short_one|bucket_create_naming_bad_short_two|bucket_get_location|bucket_acl_default|bucket_acl_canned|bucket_acl_canned_publicreadwrite|bucket_acl_canned_authenticatedread|object_acl_default|object_acl_canned_during_create|object_acl_canned|object_acl_canned_publicreadwrite|object_acl_canned_authenticatedread|object_acl_canned_bucketownerread|object_acl_canned_bucketownerfullcontrol|object_acl_full_control_verify_attributes|bucket_acl_canned_private_to_private|bucket_acl_grant_nonexist_user|bucket_acl_no_grants|bucket_acl_grant_email_not_exist|bucket_acl_revoke_all|bucket_recreate_not_overriding|object_copy_verify_contenttype|object_copy_to_itself_with_metadata|object_copy_not_owned_bucket|object_copy_not_owned_object_bucket|object_copy_retaining_metadata|object_copy_replacing_metadata|multipart_upload_empty|multipart_copy_invalid_range|multipart_copy_special_names|multipart_upload_resend_part|multipart_upload_size_too_small|abort_multipart_upload_not_found|multipart_upload_missing_part|multipart_upload_incorrect_etag|100_continue|ranged_request_invalid_range|ranged_request_empty_object|access_bucket)"
depends_on:
- master
- volume

9
go.mod

@ -39,7 +39,7 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jinzhu/copier v0.2.8
@ -59,6 +59,7 @@ require (
github.com/olivere/elastic/v7 v7.0.19
github.com/peterh/liner v1.1.0
github.com/pierrec/lz4 v2.2.7+incompatible // indirect
github.com/pquerna/cachecontrol v0.1.0
github.com/prometheus/client_golang v1.3.0
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
github.com/seaweedfs/fuse v1.1.6
@ -88,9 +89,8 @@ require (
gocloud.dev/pubsub/rabbitpubsub v0.20.0
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 // indirect
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd
golang.org/x/tools v0.0.0-20200608174601-1b747fd94509
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78
google.golang.org/api v0.26.0
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.24.0
@ -98,6 +98,7 @@ require (
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
modernc.org/sqlite v1.10.7
)
// replace github.com/seaweedfs/fuse => /Users/chris/go/src/github.com/seaweedfs/fuse

43
go.sum

@ -350,6 +350,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-replayers/grpcreplay v0.1.0 h1:eNb1y9rZFmY4ax45uEEECSa8fsxGRU+8Bil52ASAwic=
@ -476,6 +477,8 @@ github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLM
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
@ -533,11 +536,14 @@ github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3 h1:a+kO+98RDGEfo6asOGMmpodZq4FNtnGP54yps8BzLR4=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@ -639,6 +645,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/pquerna/cachecontrol v0.1.0 h1:yJMy84ti9h/+OEWa752kBTKv4XC30OtVVHYv/8cTqKc=
github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQnrHV5K9mBcUI=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
@ -810,6 +818,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
@ -954,6 +963,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -975,6 +985,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200930132711-30421366ff76 h1:JnxiSYT3Nm0BT2a8CyvYyM6cnrWpidecD1UuSYbhKm0=
golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -1007,6 +1019,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1026,6 +1039,9 @@ golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd h1:WgqgiQvkiZWz7XLhphjt2GI2GcGCTIZs9jqXMWmH+oc=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201126233918-771906719818/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -1095,12 +1111,16 @@ golang.org/x/tools v0.0.0-20200601175630-2caf76543d99/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200606014950-c42cb6316fb6/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200608174601-1b747fd94509 h1:MI14dOfl3OG6Zd32w3ugsrvcUO810fDZdWakTq39dH4=
golang.org/x/tools v0.0.0-20200608174601-1b747fd94509/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78 h1:M8tBwCtWD/cZV9DZpFYRUgaymAYAr+aIUTWzDaM3uPs=
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.5.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
@ -1252,10 +1272,33 @@ honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o=
modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg=
modernc.org/cc/v3 v3.32.4/go.mod h1:0R6jl1aZlIl2avnYfbfHBS1QB6/f+16mihBObaBC878=
modernc.org/cc/v3 v3.33.5 h1:gfsIOmcv80EelyQyOHn/Xhlzex8xunhQxWiJRMYmPrI=
modernc.org/cc/v3 v3.33.5/go.mod h1:0R6jl1aZlIl2avnYfbfHBS1QB6/f+16mihBObaBC878=
modernc.org/ccgo/v3 v3.9.2/go.mod h1:gnJpy6NIVqkETT+L5zPsQFj7L2kkhfPMzOghRNv/CFo=
modernc.org/ccgo/v3 v3.9.4 h1:mt2+HyTZKxva27O6T4C9//0xiNQ/MornL3i8itM5cCs=
modernc.org/ccgo/v3 v3.9.4/go.mod h1:19XAY9uOrYnDhOgfHwCABasBvK69jgC4I8+rizbk3Bc=
modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM=
modernc.org/libc v1.7.13-0.20210308123627-12f642a52bb8/go.mod h1:U1eq8YWr/Kc1RWCMFUWEdkTg8OTcfLw2kY8EDwl039w=
modernc.org/libc v1.9.5 h1:zv111ldxmP7DJ5mOIqzRbza7ZDl3kh4ncKfASB2jIYY=
modernc.org/libc v1.9.5/go.mod h1:U1eq8YWr/Kc1RWCMFUWEdkTg8OTcfLw2kY8EDwl039w=
modernc.org/mathutil v1.1.1 h1:FeylZSVX8S+58VsyJlkEj2bcpdytmp9MmDKZkKx8OIE=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.0.4 h1:utMBrFcpnQDdNsmM6asmyH/FM9TqLPS7XF7otpJmrwM=
modernc.org/memory v1.0.4/go.mod h1:nV2OApxradM3/OVbs2/0OsP6nPfakXpi50C7dcoHXlc=
modernc.org/opt v0.1.1 h1:/0RX92k9vwVeDXj+Xn23DKp2VJubL7k8qNffND6qn3A=
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sqlite v1.10.7 h1:B4ITfAx3HxSxOOKZqKhw4vnrhM+kTY1HoJf2L7PQBCQ=
modernc.org/sqlite v1.10.7/go.mod h1:GXpJIZPNgRGqG0inyYDW18j9YpBpFUBn/weGI63hLLs=
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/tcl v1.5.2/go.mod h1:pmJYOLgpiys3oI4AeAafkcUfE+TKKilminxNyU/+Zlo=
modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk=
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
modernc.org/z v1.0.1-0.20210308123920-1f282aa71362/go.mod h1:8/SRk5C/HgiQWCgXdfpb+1RvhORdkz5sw72d3jjtyqA=
modernc.org/z v1.0.1/go.mod h1:8/SRk5C/HgiQWCgXdfpb+1RvhORdkz5sw72d3jjtyqA=
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=

4
k8s/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "2.48"
version: 2.48
appVersion: "2.49"
version: 2.49

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
# imageTag: "2.48" - started using {.Chart.appVersion}
# imageTag: "2.49" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

2
other/java/client/pom.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>

2
other/java/client/pom.xml.deploy

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>

2
other/java/client/pom_debug.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>

2
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -39,7 +39,7 @@ public class SeaweedOutputStream extends OutputStream {
}
public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) {
this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, replication);
}
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,

8
other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java

@ -1,5 +1,6 @@
package seaweedfs.client;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
@ -45,11 +46,16 @@ public class SeaweedWrite {
FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(filerClient.getCollection())
.setReplication(replication == null ? filerClient.getReplication() : replication)
.setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)
.setPath(path)
.build());
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(response.getError());
}
String fileId = response.getFileId();
String auth = response.getAuth();

4
other/java/examples/pom.xml

@ -11,13 +11,13 @@
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop2-client</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<scope>compile</scope>
</dependency>
<dependency>

2
other/java/hdfs2/dependency-reduced-pom.xml

@ -301,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<seaweedfs.client.version>1.6.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

2
other/java/hdfs2/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<seaweedfs.client.version>1.6.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

2
other/java/hdfs3/dependency-reduced-pom.xml

@ -309,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<seaweedfs.client.version>1.6.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

2
other/java/hdfs3/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.6.4</seaweedfs.client.version>
<seaweedfs.client.version>1.6.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

2
unmaintained/load_test/load_test_leveldb/load_test_leveldb.go

@ -1,4 +1,4 @@
package load_test_leveldb
package main
import (
"crypto/md5"

108
unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go

@ -0,0 +1,108 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
"io"
"strconv"
)
var (
dir = flag.String("dir", "/tmp", "directory to create files")
n = flag.Int("n", 100, "the number of metadata")
tailFiler = flag.String("filer", "localhost:8888", "the filer address")
isWrite = flag.Bool("write", false, "only write")
)
func main() {
flag.Parse()
if *isWrite {
startGenerateMetadata()
return
}
expected := 0
startSubscribeMetadata(func(event *filer_pb.SubscribeMetadataResponse) error {
if event.Directory != *dir {
return nil
}
name := event.EventNotification.NewEntry.Name
fmt.Printf("=> %s\n", name)
id := name[4:]
if x, err := strconv.Atoi(id); err == nil {
if x != expected {
return fmt.Errorf("Expected file%d Actual %s\n", expected, name)
}
expected++
} else {
return err
}
return nil
})
}
func startGenerateMetadata() {
pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
for i := 0; i < *n; i++ {
name := fmt.Sprintf("file%d", i)
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: *dir,
Entry: &filer_pb.Entry{
Name: name,
},
}); err != nil {
fmt.Printf("create entry %s: %v\n", name, err)
return err
}
}
return nil
})
}
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
lastTsNs := int64(0)
tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "tail",
PathPrefix: *dir,
SinceNs: lastTsNs,
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err = eachEntryFunc(resp); err != nil {
return err
}
lastTsNs = resp.TsNs
}
})
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
}

1
weed/command/command.go

@ -22,6 +22,7 @@ var Commands = []*Command{
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
cmdFuse,
cmdGateway,
cmdMaster,
cmdMount,

165
weed/command/fuse.go

@ -0,0 +1,165 @@
package command
import (
"fmt"
"strings"
"strconv"
"time"
"os"
)
func init() {
cmdFuse.Run = runFuse // break init cycle
}
func runFuse(cmd *Command, args []string) bool {
argsLen := len(args)
options := []string{}
// at least target mount path should be passed
if argsLen < 1 {
return false
}
// first option is always target mount path
mountOptions.dir = &args[0]
// scan parameters looking for one or more -o options
// -o options receive parameters on format key=value[,key=value]...
for i := 0; i < argsLen; i++ {
if args[i] == "-o" && i+1 <= argsLen {
options = strings.Split(args[i+1], ",")
i++
}
}
// for each option passed with -o
for _, option := range options {
// split just first = character
parts := strings.SplitN(option, "=", 2)
// if doesn't key and value skip
if len(parts) != 2 {
continue
}
key, value := parts[0], parts[1]
// switch key keeping "weed mount" parameters
switch key {
case "filer":
mountOptions.filer = &value
case "filer.path":
mountOptions.filerMountRootPath = &value
case "dirAutoCreate":
if parsed, err := strconv.ParseBool(value); err != nil {
mountOptions.dirAutoCreate = &parsed
} else {
panic(fmt.Errorf("dirAutoCreate: %s", err))
}
case "collection":
mountOptions.collection = &value
case "replication":
mountOptions.replication = &value
case "disk":
mountOptions.diskType = &value
case "ttl":
if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
intValue := int(parsed)
mountOptions.ttlSec = &intValue
} else {
panic(fmt.Errorf("ttl: %s", err))
}
case "chunkSizeLimitMB":
if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
intValue := int(parsed)
mountOptions.chunkSizeLimitMB = &intValue
} else {
panic(fmt.Errorf("chunkSizeLimitMB: %s", err))
}
case "concurrentWriters":
if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
intValue := int(parsed)
mountOptions.concurrentWriters = &intValue
} else {
panic(fmt.Errorf("concurrentWriters: %s", err))
}
case "cacheDir":
mountOptions.cacheDir = &value
case "cacheCapacityMB":
if parsed, err := strconv.ParseInt(value, 0, 64); err != nil {
mountOptions.cacheSizeMB = &parsed
} else {
panic(fmt.Errorf("cacheCapacityMB: %s", err))
}
case "dataCenter":
mountOptions.dataCenter = &value
case "allowOthers":
if parsed, err := strconv.ParseBool(value); err != nil {
mountOptions.allowOthers = &parsed
} else {
panic(fmt.Errorf("allowOthers: %s", err))
}
case "umask":
mountOptions.umaskString = &value
case "nonempty":
if parsed, err := strconv.ParseBool(value); err != nil {
mountOptions.nonempty = &parsed
} else {
panic(fmt.Errorf("nonempty: %s", err))
}
case "volumeServerAccess":
mountOptions.volumeServerAccess = &value
case "map.uid":
mountOptions.uidMap = &value
case "map.gid":
mountOptions.gidMap = &value
case "readOnly":
if parsed, err := strconv.ParseBool(value); err != nil {
mountOptions.readOnly = &parsed
} else {
panic(fmt.Errorf("readOnly: %s", err))
}
case "cpuprofile":
mountCpuProfile = &value
case "memprofile":
mountMemProfile = &value
case "readRetryTime":
if parsed, err := time.ParseDuration(value); err != nil {
mountReadRetryTime = &parsed
} else {
panic(fmt.Errorf("readRetryTime: %s", err))
}
}
}
// I don't know why PATH environment variable is lost
if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin"); err != nil {
panic(fmt.Errorf("setenv: %s", err))
}
// just call "weed mount" command
return runMount(cmdMount, []string{})
}
var cmdFuse = &Command{
UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"",
Short: "Allow use weed with linux's mount command",
Long: `Allow use weed with linux's mount command
You can use -t weed on mount command:
mv weed /sbin/mount.weed
mount -t weed fuse /mnt -o "filer=localhost:8888,filer.path=/"
Or you can use -t fuse on mount command:
mv weed /sbin/weed
mount -t fuse.weed fuse /mnt -o "filer=localhost:8888,filer.path=/"
mount -t fuse "weed#fuse" /mnt -o "filer=localhost:8888,filer.path=/"
To use without mess with your /sbin:
mount -t fuse./home/user/bin/weed fuse /mnt -o "filer=localhost:8888,filer.path=/"
mount -t fuse "/home/user/bin/weed#fuse" /mnt -o "filer=localhost:8888,filer.path=/"
To check valid options look "weed mount --help"
`,
}

2
weed/command/mount.go

@ -37,7 +37,7 @@ var (
func init() {
cmdMount.Run = runMount // break init cycle
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "comma-separated weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")

20
weed/command/mount_std.go

@ -51,9 +51,9 @@ func runMount(cmd *Command, args []string) bool {
func RunMount(option *MountOptions, umask os.FileMode) bool {
filer := *option.filer
filers := strings.Split(*option.filer, ",")
// parse filer grpc address
filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@ -64,22 +64,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
for i := 0; i < 10; i++ {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err)
return true
}
@ -145,7 +145,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options := []fuse.MountOption{
fuse.VolumeName(mountName),
fuse.FSName(filer + ":" + filerMountRootPath),
fuse.FSName(*option.filer + ":" + filerMountRootPath),
fuse.Subtype("seaweedfs"),
// fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
@ -181,8 +181,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
FilerAddress: filer,
FilerGrpcAddress: filerGrpcAddress,
FilerAddresses: filers,
FilerGrpcAddresses: filerGrpcAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
@ -218,7 +218,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
c.Close()
})
glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
server := fs.New(c, nil)
seaweedFileSystem.Server = server
err = server.Serve(seaweedFileSystem)

11
weed/command/scaffold.go

@ -101,6 +101,11 @@ dir = "./filerldb3" # directory to store level db files
enabled = false
dir = "./filerrdb" # directory to store rocksdb files
[sqlite]
# local on disk, similar to leveldb
enabled = false
dbFile = "./filer.db" # sqlite db file
[mysql] # or memsql, tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
@ -372,12 +377,6 @@ directory = "/data"
# so each date directory contains all new and updated files.
is_incremental = false
[sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories
# so each date directory contains all new and updated files.
enabled = false
directory = "/backup"
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"

1
weed/filer/meta_aggregator.go

@ -71,6 +71,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
// when filer store is not shared by multiple filers
if peerSignature != f.Signature {
lastTsNs = 0
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs
}

78
weed/filer/sqlite/sqlite_store.go

@ -0,0 +1,78 @@
// +build linux darwin windows
// limited GOOS due to modernc.org/libc/unistd
package sqlite
import (
"context"
"database/sql"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/filer/mysql"
"github.com/chrislusf/seaweedfs/weed/util"
_ "modernc.org/sqlite"
)
func init() {
filer.Stores = append(filer.Stores, &SqliteStore{})
}
type SqliteStore struct {
abstract_sql.AbstractSqlStore
}
func (store *SqliteStore) GetName() string {
return "sqlite"
}
func (store *SqliteStore) Initialize(configuration util.Configuration, prefix string) (err error) {
dbFile := configuration.GetString(prefix + "dbFile")
createTable := `CREATE TABLE IF NOT EXISTS "%s" (
dirhash BIGINT,
name VARCHAR(1000),
directory TEXT,
meta BLOB,
PRIMARY KEY (dirhash, name)
) WITHOUT ROWID;`
upsertQuery := `INSERT INTO "%s"(dirhash,name,directory,meta)VALUES(?,?,?,?)
ON CONFLICT(dirhash,name) DO UPDATE SET
directory=excluded.directory,
meta=excluded.meta;
`
return store.initialize(
dbFile,
createTable,
upsertQuery,
)
}
func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (err error) {
store.SupportBucketTable = true
store.SqlGenerator = &mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table `%s`",
UpsertQueryTemplate: upsertQuery,
}
var dbErr error
store.DB, dbErr = sql.Open("sqlite", dbFile)
if dbErr != nil {
store.DB.Close()
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", dbFile, err)
}
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", dbFile, err)
}
if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
}
return nil
}

9
weed/filer/sqlite/sqlite_store_unsupported.go

@ -0,0 +1,9 @@
// +build !linux,!darwin,!windows
// limited GOOS due to modernc.org/libc/unistd
package sqlite
func init() {
// filer.Stores = append(filer.Stores, &SqliteStore{})
}

10
weed/filer/stream.go

@ -6,9 +6,11 @@ import (
"io"
"math"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@ -36,16 +38,20 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
for _, chunkView := range chunkViews {
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
glog.Errorf("read chunk: %v", err)
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
return fmt.Errorf("read chunk: %v", err)
}
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
stats.FilerRequestCounter.WithLabelValues("chunkDownloadedError").Inc()
return fmt.Errorf("write chunk: %v", err)
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
}
return nil

11
weed/filesys/dirty_pages_temp_file.go

@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"os"
"path/filepath"
"sync"
"time"
)
@ -24,14 +23,6 @@ type TempFileDirtyPages struct {
replication string
}
var (
tmpDir = filepath.Join(os.TempDir(), "sw")
)
func init() {
os.Mkdir(tmpDir, 0755)
}
func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
@ -49,7 +40,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
defer pages.pageAddLock.Unlock()
if pages.tf == nil {
tf, err := os.CreateTemp(tmpDir, "")
tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
if err != nil {
glog.Errorf("create temp file: %v", err)
pages.lastErr = err

37
weed/filesys/wfs.go

@ -7,8 +7,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"time"
@ -28,8 +30,9 @@ import (
type Option struct {
MountDirectory string
FilerAddress string
FilerGrpcAddress string
FilerAddresses []string
filerIndex int
FilerGrpcAddresses []string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
@ -52,6 +55,9 @@ type Option struct {
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
}
var _ = fs.FS(&WFS{})
@ -95,14 +101,13 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
signature: util.RandomInt32(),
}
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
fsNode := NodeWithId(filePath.AsInode())
if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
@ -259,11 +264,27 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() string {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
}
type NodeWithId uint64

39
weed/filesys/wfs_filer_client.go

@ -1,6 +1,7 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
@ -10,19 +11,35 @@ import (
var _ = filer_pb.FilerClient(&WFS{})
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
})
return util.Retry("filer grpc", func() error {
if err == nil {
return nil
}
return err
i := wfs.option.filerIndex
n := len(wfs.option.FilerGrpcAddresses)
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerGrpcAddresses[i]
err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, wfs.option.GrpcDialOption)
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
} else {
wfs.option.filerIndex = i
return nil
}
i++
if i >= n {
i = 0
}
}
return err
})
}

2
weed/filesys/wfs_write.go

@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {

59
weed/pb/grpc_client_server.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"math/rand"
"net/http"
"strconv"
"strings"
@ -24,10 +25,15 @@ const (
var (
// cache grpc connections
grpcClients = make(map[string]*grpc.ClientConn)
grpcClients = make(map[string]*versionedGrpcClient)
grpcClientsLock sync.Mutex
)
type versionedGrpcClient struct {
*grpc.ClientConn
version int
}
func init() {
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
@ -79,7 +85,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
return grpc.DialContext(ctx, address, options...)
}
func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
grpcClientsLock.Lock()
defer grpcClientsLock.Unlock()
@ -94,23 +100,49 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*grpc.Clien
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
}
grpcClients[address] = grpcConnection
vgc := &versionedGrpcClient{
grpcConnection,
rand.Int(),
}
grpcClients[address] = vgc
return grpcConnection, nil
return vgc, nil
}
func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
grpcConnection, err := getOrCreateConnection(address, opts...)
vgc, err := getOrCreateConnection(address, opts...)
if err != nil {
return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
}
return fn(grpcConnection)
executionErr := fn(vgc.ClientConn)
if executionErr != nil && strings.Contains(executionErr.Error(), "transport") {
grpcClientsLock.Lock()
if t, ok := grpcClients[address]; ok {
if t.version == vgc.version {
vgc.Close()
delete(grpcClients, address)
}
}
grpcClientsLock.Unlock()
}
return executionErr
}
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
return ParseServerAddress(server, 10000)
}
func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) {
for _, server := range servers {
if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil {
serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress)
} else {
return nil, parseErr
}
}
return
}
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
@ -202,3 +234,18 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
for _, filerGrpcAddress := range filerGrpcAddresses {
err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, grpcDialOption)
if err == nil {
return nil
}
}
return err
}

3
weed/replication/sink/azuresink/azure_sink.go

@ -129,8 +129,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil
return true, g.CreateEntry(key, newEntry, signatures)
}
func cleanKey(key string) string {

5
weed/replication/sink/b2sink/b2_sink.go

@ -118,11 +118,8 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
}
func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil
return true, g.CreateEntry(key, newEntry, signatures)
}
func cleanKey(key string) string {

3
weed/replication/sink/gcssink/gcs_sink.go

@ -116,6 +116,5 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
}
func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
// TODO improve efficiency
return false, nil
return true, g.CreateEntry(key, newEntry, signatures)
}

30
weed/replication/sink/localsink/local_sink.go

@ -8,15 +8,15 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
"io/ioutil"
"os"
"path/filepath"
"strings"
)
type LocalSink struct {
Dir string
filerSource *source.FilerSource
Dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
@ -35,15 +35,17 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
}
func (localsink *LocalSink) initialize(dir string) error {
func (localsink *LocalSink) initialize(dir string, isIncremental bool) error {
localsink.Dir = dir
localsink.isIncremental = isIncremental
return nil
}
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory")
isIncremental := configuration.GetBool(prefix + "is_incremental")
glog.V(4).Infof("sink.local.directory: %v", dir)
return localsink.initialize(dir)
return localsink.initialize(dir, isIncremental)
}
func (localsink *LocalSink) GetSinkToDirectory() string {
@ -51,7 +53,7 @@ func (localsink *LocalSink) GetSinkToDirectory() string {
}
func (localsink *LocalSink) IsIncremental() bool {
return true
return localsink.isIncremental
}
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
@ -83,8 +85,18 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
}
}
if entry.IsDirectory {
return os.Mkdir(key, os.FileMode(entry.Attributes.FileMode))
}
dstFile, err := os.OpenFile(key, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(entry.Attributes.FileMode))
if err != nil {
return err
}
defer dstFile.Close()
writeFunc := func(data []byte) error {
writeErr := ioutil.WriteFile(key, data, 0755)
_, writeErr := dstFile.Write(data)
return writeErr
}
@ -101,5 +113,7 @@ func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, ne
}
glog.V(4).Infof("Update Entry key: %s", key)
// do delete and create
return false, nil
foundExistingEntry = util.FileExists(key)
err = localsink.CreateEntry(key, newEntry, signatures)
return
}

3
weed/replication/sink/s3sink/s3_sink.go

@ -147,8 +147,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
key = cleanKey(key)
// TODO improve efficiency
return false, nil
return true, s3sink.CreateEntry(key, newEntry, signatures)
}
func cleanKey(key string) string {

1
weed/replication/source/filer_source.go

@ -58,7 +58,6 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})

33
weed/s3api/s3api_object_handlers.go

@ -6,12 +6,14 @@ import (
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/pquerna/cachecontrol/cacheobject"
"io"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
@ -46,6 +48,20 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
if r.Header.Get("Cache-Control") != "" {
if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil {
writeErrorResponse(w, s3err.ErrInvalidDigest, r.URL)
return
}
}
if r.Header.Get("Expires") != "" {
if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
writeErrorResponse(w, s3err.ErrInvalidDigest, r.URL)
return
}
}
dataReader := r.Body
if s3a.iam.isEnabled() {
rAuthType := getRequestAuthType(r)
@ -62,6 +78,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeErrorResponse(w, s3ErrCode, r.URL)
return
}
} else {
rAuthType := getRequestAuthType(r)
if authTypeAnonymous != rAuthType {
writeErrorResponse(w, s3err.ErrAuthNotSetup, r.URL)
return
}
}
defer dataReader.Close()
@ -311,6 +333,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
}
defer util.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
writeErrorResponse(w, s3err.ErrPreconditionFailed, r.URL)
return
}
if (resp.ContentLength == -1 || resp.StatusCode == 404) && resp.StatusCode != 304 {
if r.Method != "DELETE" {
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
@ -326,7 +353,11 @@ func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) {
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
w.WriteHeader(proxyResponse.StatusCode)
if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
w.WriteHeader(http.StatusPartialContent)
} else {
w.WriteHeader(proxyResponse.StatusCode)
}
io.Copy(w, proxyResponse.Body)
}

12
weed/s3api/s3err/s3api_errors.go

@ -91,7 +91,9 @@ const (
ErrRequestNotReadyYet
ErrMissingDateHeader
ErrInvalidRequest
ErrAuthNotSetup
ErrNotImplemented
ErrPreconditionFailed
ErrExistingObjectIsDirectory
)
@ -341,11 +343,21 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Invalid Request",
HTTPStatusCode: http.StatusBadRequest,
},
ErrAuthNotSetup: {
Code: "InvalidRequest",
Description: "Signed request requires setting up SeaweedFS S3 authentication",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNotImplemented: {
Code: "NotImplemented",
Description: "A header you provided implies functionality that is not implemented",
HTTPStatusCode: http.StatusNotImplemented,
},
ErrPreconditionFailed: {
Code: "PreconditionFailed",
Description: "At least one of the pre-conditions you specified did not hold",
HTTPStatusCode: http.StatusPreconditionFailed,
},
ErrExistingObjectIsDirectory: {
Code: "ExistingObjectIsDirectory",
Description: "Existing Object is a directory.",

8
weed/server/common.go

@ -232,12 +232,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) {
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
if err := writeFn(w, 0, totalSize, 0); err != nil {
if err := writeFn(w, 0, totalSize); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -277,7 +277,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
err = writeFn(w, ra.start, ra.length, http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -305,7 +305,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
pw.CloseWithError(e)
return
}
if e = writeFn(part, ra.start, ra.length, 0); e != nil {
if e = writeFn(part, ra.start, ra.length); e != nil {
pw.CloseWithError(e)
return
}

3
weed/server/filer_server.go

@ -30,6 +30,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
@ -128,7 +129,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")

21
weed/server/filer_server_handlers_read.go

@ -61,6 +61,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
// set etag
etag := filer.ETagEntry(entry)
if ifm := r.Header.Get("If-Match"); ifm != "" && ifm != "\""+etag+"\"" {
w.WriteHeader(http.StatusPreconditionFailed)
return
}
w.Header().Set("Accept-Ranges", "bytes")
// mime type
@ -115,8 +122,6 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
// set etag
etag := filer.ETagEntry(entry)
if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
w.WriteHeader(http.StatusNotModified)
return
@ -150,10 +155,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
if httpStatusCode != 0 {
w.WriteHeader(httpStatusCode)
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
if err != nil {
@ -161,7 +163,10 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
if err != nil {
glog.Errorf("failed to stream content %s: %v", r.URL, err)
}
return err
})
}

2
weed/server/filer_server_handlers_write_autochunk.go

@ -221,7 +221,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) {
if len(v) > 0 && (strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires") {
entry.Extended[k] = []byte(v[0])
}
}

7
weed/server/volume_server_handlers_read.go

@ -27,7 +27,7 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
// println(r.Method + " " + r.URL.Path)
glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
@ -261,13 +261,10 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
if httpStatusCode != 0 {
w.WriteHeader(httpStatusCode)
}
_, e = io.CopyN(writer, rs, size)
return e
})

2
weed/server/volume_server_ui/templates.go

@ -31,7 +31,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<head>
<title>SeaweedFS {{ .Version }}</title>
<link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
<script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-2.1.3.min.js"></script>
<script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-3.6.0.min.js"></script>
<script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script>
<script type="text/javascript">
$(function() {

2
weed/shell/command_fs_meta_save.go

@ -38,7 +38,7 @@ func (c *commandFsMetaSave) Help() string {
fs.meta.save # save from current directory
The meta data will be saved into a local <filer_host>-<port>-<time>.meta file.
These meta data can be later loaded by fs.meta.load command,
These meta data can be later loaded by fs.meta.load command
`
}

3
weed/shell/command_s3_bucket_list.go

@ -46,6 +46,9 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
}
err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if !entry.IsDirectory {
return nil
}
if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" {
fmt.Fprintf(writer, " %s\n", entry.Name)
} else {

4
weed/static/javascript/jquery-2.1.3.min.js
File diff suppressed because it is too large
View File

2
weed/static/javascript/jquery-3.6.0.min.js
File diff suppressed because it is too large
View File

6
weed/topology/data_node.go

@ -177,7 +177,13 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo
func (dn *DataNode) GetDataCenter() *DataCenter {
rack := dn.Parent()
if rack == nil {
return nil
}
dcNode := rack.Parent()
if dcNode == nil {
return nil
}
dcValue := dcNode.GetValue()
return dcValue.(*DataCenter)
}

3
weed/util/config.go

@ -26,8 +26,6 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
if strings.Contains(err.Error(), "Not Found") {
glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
@ -43,6 +41,7 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
return false
}
}
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
return true
}

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 48)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 49)
COMMIT = ""
)

1
weed/wdclient/masterclient.go

@ -36,6 +36,7 @@ func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHo
}
func (mc *MasterClient) GetMaster() string {
mc.WaitUntilConnected()
return mc.currentMaster
}

Loading…
Cancel
Save