Browse Source

Merge pull request #1 from chrislusf/master

update
pull/56/head
yourchanges 10 years ago
parent
commit
f7bcd8e958
  1. 1
      .gitignore
  2. 319
      README.md
  3. 13
      docs/api.rst
  4. 20
      docs/clients.rst
  5. 6
      docs/directories.rst
  6. 2
      docs/gettingstarted.rst
  7. 14
      docs/optimization.rst
  8. 4
      docs/replication.rst
  9. 2
      docs/usecases.rst
  10. 2
      go/filer/client_operations.go
  11. 33
      go/filer/directory_in_map.go
  12. 2
      go/filer/filer_embedded.go
  13. 2
      go/filer/files_in_leveldb.go
  14. 5
      go/images/orientation.go
  15. 9
      go/operation/assign_file_id.go
  16. 2
      go/operation/delete_content.go
  17. 4
      go/operation/list_masters.go
  18. 2
      go/operation/lookup.go
  19. 13
      go/operation/submit.go
  20. 12
      go/operation/system_message.pb.go
  21. 2
      go/operation/upload_content.go
  22. 1
      go/proto/system_message.proto
  23. 2
      go/storage/cdb_map.go
  24. 2
      go/storage/cdb_map_test.go
  25. 4
      go/storage/compact_map_perf_test.go
  26. 2
      go/storage/compress.go
  27. 4
      go/storage/crc.go
  28. 4
      go/storage/file_id.go
  29. 15
      go/storage/needle.go
  30. 4
      go/storage/needle_map.go
  31. 25
      go/storage/needle_read_write.go
  32. 8
      go/storage/replica_placement.go
  33. 53
      go/storage/store.go
  34. 2
      go/storage/store_vacuum.go
  35. 130
      go/storage/volume.go
  36. 4
      go/storage/volume_info.go
  37. 75
      go/storage/volume_super_block.go
  38. 23
      go/storage/volume_super_block_test.go
  39. 135
      go/storage/volume_ttl.go
  40. 60
      go/storage/volume_ttl_test.go
  41. 16
      go/storage/volume_vacuum.go
  42. 2
      go/tools/read_index.go
  43. 11
      go/topology/allocate_volume.go
  44. 4
      go/topology/cluster_commands.go
  45. 25
      go/topology/collection.go
  46. 10
      go/topology/data_node.go
  47. 4
      go/topology/node.go
  48. 8
      go/topology/store_replicate.go
  49. 29
      go/topology/topology.go
  50. 10
      go/topology/topology_event_handling.go
  51. 2
      go/topology/topology_map.go
  52. 8
      go/topology/topology_vacuum.go
  53. 16
      go/topology/volume_growth.go
  54. 4
      go/topology/volume_growth_test.go
  55. 17
      go/topology/volume_layout.go
  56. 12
      go/util/bytes.go
  57. 2
      go/util/config.go
  58. 2
      go/util/constants.go
  59. 2
      go/util/file_util.go
  60. 2
      go/util/net_timeout.go
  61. 10
      go/weed/benchmark.go
  62. 6
      go/weed/compact.go
  63. 4
      go/weed/download.go
  64. 6
      go/weed/export.go
  65. 8
      go/weed/filer.go
  66. 4
      go/weed/fix.go
  67. 8
      go/weed/master.go
  68. 10
      go/weed/mount_std.go
  69. 17
      go/weed/server.go
  70. 2
      go/weed/shell.go
  71. 8
      go/weed/upload.go
  72. 6
      go/weed/version.go
  73. 11
      go/weed/volume.go
  74. 2
      go/weed/volume_test.go
  75. 4
      go/weed/weed.go
  76. 14
      go/weed/weed_server/common.go
  77. 4
      go/weed/weed_server/filer_server.go
  78. 19
      go/weed/weed_server/filer_server_handlers.go
  79. 2
      go/weed/weed_server/filer_server_handlers_admin.go
  80. 8
      go/weed/weed_server/master_server.go
  81. 6
      go/weed/weed_server/master_server_handlers.go
  82. 19
      go/weed/weed_server/master_server_handlers_admin.go
  83. 4
      go/weed/weed_server/raft_server.go
  84. 4
      go/weed/weed_server/raft_server_handlers.go
  85. 5
      go/weed/weed_server/volume_server.go
  86. 12
      go/weed/weed_server/volume_server_handlers.go
  87. 12
      go/weed/weed_server/volume_server_handlers_admin.go
  88. 2
      go/weed/weed_server/volume_server_handlers_vacuum.go

1
.gitignore

@ -0,0 +1 @@
weed

319
README.md

@ -1,18 +1,323 @@
weed-fs
Seaweed File System
=======
[![Build Status](https://travis-ci.org/chrislusf/weed-fs.svg?branch=master)](https://travis-ci.org/chrislusf/weed-fs)
[![GoDoc](https://godoc.org/github.com/chrislusf/weed-fs/go?status.svg)](https://godoc.org/github.com/chrislusf/weed-fs/go)
[![RTD](https://readthedocs.org/projects/weed-fs/badge/?version=latest)](http://weed-fs.readthedocs.org/en/latest/)
An official mirror of code.google.com/p/weed-fs.
Moving to github.com to make cooperations easier.
This repo and the google code repo will be kept synchronized.
## Usage
```
go get github.com/chrislusf/weed-fs/go/weed
```
For documents and bug reporting, Please visit
http://weed-fs.googlecode.com
## Reference
For pre-compiled releases,
https://bintray.com/chrislusf/Weed-FS/weed
https://bintray.com/chrislusf/Weed-FS/seaweed
## Introduction
Seaweed-FS is a simple and highly scalable distributed file system. There are two objectives:
1. to store billions of files!
2. to serve the files fast!
Instead of supporting full POSIX file system semantics, Seaweed-FS choose to implement only a key~file mapping. Similar to the word "NoSQL", you can call it as "NoFS".
Instead of managing all file metadata in a central master, Seaweed-FS choose to manages file volumes in the central master, and let volume servers manage files and the metadata. This relieves concurrency pressure from the central master and spreads file metadata into volume servers' memories, allowing faster file access with just one disk read operation!
Seaweed-FS models after [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf).
Seaweed-FS costs only 40 bytes disk storage for each file's metadata. It is so simple with O(1) disk read that you are welcome to challenge the performance with your actual use cases.
![](https://api.bintray.com/packages/chrislusf/Weed-FS/seaweed/images/download.png)
https://bintray.com/chrislusf/Weed-FS/seaweed Download latest compiled binaries for different platforms here.
## Additional Features
* Can choose no replication or different replication level, rack and data center aware
* Automatic master servers failover. No single point of failure, SPOF.
* Automatic Gzip compression depending on file mime type
* Automatic compaction to reclaimed disk spaces after deletion or update
* Servers in the same cluster can have different disk spaces, different file systems, different OS
* Adding/Removing servers do not cause any data re-balancing
* Optional [filer server](https://code.google.com/p/weed-fs/wiki/DirectoriesAndFiles) provides "normal" directories and files via http
* For jpeg pictures, optionally fix the orientation.
* Support Etag, Accept-Range, Last-Modified, etc.
## Example Usage
By default, the master node runs on port 9333, and the volume nodes runs on port 8080.
Here I will start one master node, and two volume nodes on port 8080 and 8081. Ideally, they should be started from different machines. Here I just use localhost as example.
Seaweed-FS uses HTTP REST operations to write, read, delete. The return results are JSON or JSONP format.
### Start Master Server
```
> ./weed master
```
### Start Volume Servers ###
```
> weed volume -dir="/tmp/data1" -max=5 -mserver="localhost:9333" -port=8080 &
> weed volume -dir="/tmp/data2" -max=10 -mserver="localhost:9333" -port=8081 &
```
### Write File ###
Here is a simple usage on how to save a file:
```
> curl http://localhost:9333/dir/assign
{"count":1,"fid":"3,01637037d6","url":"127.0.0.1:8080","publicUrl":"localhost:8080"}
```
First, send a HTTP request to get an fid and a volume server url.
```
> curl -F file=@/home/chris/myphoto.jpg http://127.0.0.1:8080/3,01637037d6
{"size": 43234}
```
Second, send a HTTP multipart POST request to the volume server url+'/'+fid, to really store the file content.
For update, send another POST request with updated file content.
For deletion, send a http DELETE request
```
> curl -X DELETE http://127.0.0.1:8080/3,01637037d6
```
### Save File Id ###
Now you can save the fid, 3,01637037d6 in this case, to some database field.
The number 3 here, is a volume id. After the comma, it's one file key, 01, and a file cookie, 637037d6.
The volume id is an unsigned 32 bit integer. The file key is an unsigned 64bit integer. The file cookie is an unsigned 32bit integer, used to prevent URL guessing.
The file key and file cookie are both coded in hex. You can store the <volume id, file key, file cookie> tuple in your own format, or simply store the fid as string.
If stored as a string, in theory, you would need 8+1+16+8=33 bytes. A char(33) would be enough, if not more than enough, since most usage would not need 2^32 volumes.
If space is really a concern, you can store the file id in your own format. You would need one 4-byte integer for volume id, 8-byte long number for file key, 4-byte integer for file cookie. So 16 bytes are enough (more than enough).
### Read File ###
Here is the example on how to render the URL.
```
> curl http://localhost:9333/dir/lookup?volumeId=3
{"locations":[{"publicUrl":"localhost:8080","url":"localhost:8080"}]}
```
First lookup the volume server's URLs by the file's volumeId. However, since usually there are not too many volume servers, and volumes does not move often, you can cache the results most of the time. Depends on the replication type, one volume can have multiple replica locations. Just randomly pick one location to read.
Now you can take the public url, render the url or directly read from the volume server via url:
```
http://localhost:8080/3,01637037d6.jpg
```
Notice we add an file extension ".jpg" here. It's optional and just one way for the client to specify the file content type.
If you want a nicer URL, you can use one of these alternative URL formats:
```
http://localhost:8080/3/01637037d6/my_preferred_name.jpg
http://localhost:8080/3/01637037d6.jpg
http://localhost:8080/3,01637037d6.jpg
http://localhost:8080/3/01637037d6
http://localhost:8080/3,01637037d6
```
### Rack-Aware and Data Center-Aware Replication ###
Seaweed-FS apply the replication strategy on a volume level. So when you are getting a file id, you can specify the replication strategy. For example:
```
curl http://localhost:9333/dir/assign?replication=001
```
Here is the meaning of the replication parameter
```
000: no replication
001: replicate once on the same rack
010: replicate once on a different rack, but same data center
100: replicate once on a different data center
200: replicate twice on two different data center
110: replicate once on a different rack, and once on a different data center
```
More details about replication can be found here:
https://code.google.com/p/weed-fs/wiki/RackDataCenterAwareReplication
You can also set the default replication strategy when starting the master server.
### Allocate File Key on specific data center ###
Volume servers can start with a specific data center name.
```
weed volume -dir=/tmp/1 -port=8080 -dataCenter=dc1
weed volume -dir=/tmp/2 -port=8081 -dataCenter=dc2
```
Or the master server can determine the data center via volume server's IP address and settings in weed.conf file.
Now when requesting a file key, an optional "dataCenter" parameter can limit the assigned volume to the specific data center. For example, this specify
```
http://localhost:9333/dir/assign?dataCenter=dc1
```
### Other Features ###
* [No Single Point of Failure](https://code.google.com/p/weed-fs/wiki/FailoverMasterServer)
* [Insert with your own keys](https://code.google.com/p/weed-fs/wiki/Optimization#Insert_with_your_own_keys)
* [ Chunking large files](https://code.google.com/p/weed-fs/wiki/Optimization#Upload_large_files)
* [Collection as a Simple Name Space](https://code.google.com/p/weed-fs/wiki/Optimization#Collection_as_a_Simple_Name_Space)
## Architecture ##
Usually distributed file system split each file into chunks, and a central master keeps a mapping of a filename and a chunk index to chunk handles, and also which chunks each chunk server has.
This has the draw back that the central master can not handle many small files efficiently, and since all read requests need to go through the chunk master, responses would be slow for many concurrent web users.
Instead of managing chunks, Seaweed-FS choose to manage data volumes in the master server. Each data volume is size 32GB, and can hold a lot of files. And each storage node can has many data volumes. So the master node only needs to store the metadata about the volumes, which is fairly small amount of data and pretty static most of the time.
The actual file metadata is stored in each volume on volume servers. Since each volume server only manage metadata of files on its own disk, and only 16 bytes for each file, all file access can read file metadata just from memory and only needs one disk operation to actually read file data.
For comparison, consider that an xfs inode structure in Linux is 536 bytes.
### Master Server and Volume Server ###
The architecture is fairly simple. The actual data is stored in volumes on storage nodes. One volume server can have multiple volumes, and can both support read and write access with basic authentication.
All volumes are managed by a master server. The master server contains volume id to volume server mapping. This is fairly static information, and could be cached easily.
On each write request, the master server also generates a file key, which is a growing 64bit unsigned integer. Since the write requests are not as busy as read requests, one master server should be able to handle the concurrency well.
### Write and Read files ###
When a client sends a write request, the master server returns <volume id, file key, file cookie, volume node url> for the file. The client then contact the volume node and POST the file content via REST.
When a client needs to read a file based on <volume id, file key, file cookie>, it can ask the master server by the <volum id> for the <volume node url, volume node public url>, or from cache. Then the client can HTTP GET the content via REST, or just render the URL on web pages and let browsers to fetch the content.
Please see the example for details on write-read process.
### Storage Size ###
In current implementation, each volume can be size of 8x2^32^=32G bytes. This is because of aligning contents to 8 bytes. We can be easily increased to 64G, or 128G, or more, by changing 2 lines of code, at the cost of some wasted padding space due to alignment.
There can be 2^32^ volumes. So total system size is 8 x 2^32^ x 2^32^ = 8 x 4G x 4G = 128GG bytes. (Sorry, I don't know the word for giga of giga bytes.)
Each individual file size is limited to the volume size.
### Saving memory ###
All file meta information on volume server is readable from memory without disk access. Each file just takes an 16-byte map entry of <64bit key, 32bit offset, 32bit size>. Of course, each map entry has its own the space cost for the map. But usually the disk runs out before the memory does.
## Compared to Other File Systems##
Frankly, I don't use other distributed file systems too often. All seems more complicated than necessary. Please correct me if anything here is wrong.
### Compared to Ceph ###
Ceph can be setup similar to Seaweed-FS as a key~blob store. It is much more complicated, with the need to support layers on top of it. Here is a more detailed comparison. https://code.google.com/p/weed-fs/issues/detail?id=44
Seaweed-FS is meant to be fast and simple, both during usage and during setup. If you do not understand how it works when you reach here, we failed! Jokes aside, you should not need any consulting service for it.
Seaweed-FS has a centralized master to lookup free volumes, while Ceph uses hashing to locate its objects. Having a centralized master makes it easy to code and manage. HDFS/GFS has the single name node for years. Seaweed-FS now support multiple master nodes.
Ceph hashing avoids SPOF, but makes it complicated when moving or adding servers.
### Compared to HDFS ###
HDFS uses the chunk approach for each file, and is ideal for streaming large files.
Seaweed-FS is ideal for serving relatively smaller files quickly and concurrently.
Seaweed-FS can also store extra large files by splitting them into manageable data chunks, and store the file ids of the data chunks into a meta chunk. This is managed by "weed upload/download" tool, and the weed master or volume servers are agnostic about it.
### Compared to MogileFS###
Seaweed-FS has 2 components: directory server, storage nodes.
MogileFS has 3 components: tracers, database, storage nodes.
One more layer means slower access, more operation complexity, more failure possibility.
### Compared to GlusterFS ###
Seaweed-FS is not POSIX compliant, and has simple implementation.
GlusterFS is POSIX compliant, much more complex.
### Compared to Mongo's GridFS ###
Mongo's GridFS splits files into chunks and manage chunks in the central mongodb. For every read or write request, the database needs to query the metadata. It's OK if this is not a bottleneck yet, but for a lot of concurrent reads this unnecessary query could slow things down.
Since files are chunked(default to 256KB), there will be multiple metadata readings and multiple chunk readings, linear to the file size. One 2.56MB file would require at least 20 disk read requests.
On the contrary, Seaweed-FS uses large file volume of 32G size to store lots of files, and only manages file volumes in the master server. Each volume manages file metadata themselves. So all the file metadata is spread onto the volume nodes memories, and just one disk read is needed.
## Dev plan ##
More tools and documentation, on how to maintain and scale the system. For example, how to move volumes, automatically balancing data, how to grow volumes, how to check system status, etc.
This is a super exciting project! And I need helpers!
## Contributions ##
To make contributions easier, I have mirrored a repo in github.com
```
https://github.com/chrislusf/weed-fs.git
```
## Disk Related topics ##
### Hard Drive Performance ###
When testing read performance on Seaweed-FS, it basically becomes performance test your hard drive's random read speed. Hard Drive usually get 100MB/s~200MB/s.
### Solid State Disk
To modify or delete small files, SSD must delete a whole block at a time, and move content in existing blocks to a new block. SSD is fast when brand new, but will get fragmented over time and you have to garbage collect, compacting blocks. Seaweed-FS is friendly to SSD since it is append-only. Deletion and compaction are done on volume level in the background, not slowing reading and not causing fragmentation.
## Not Planned
POSIX support
## Benchmark
My Own Unscientific Single Machine Results on Mac Book with Solid State Disk, CPU: 1 Intel Core i7 2.2GHz.
Write 1 million 1KB file:
```
Concurrency Level: 64
Time taken for tests: 182.456 seconds
Complete requests: 1048576
Failed requests: 0
Total transferred: 1073741824 bytes
Requests per second: 5747.01 [#/sec]
Transfer rate: 5747.01 [Kbytes/sec]
Connection Times (ms)
min avg max std
Total: 0.3 10.9 430.9 5.7
Percentage of the requests served within a certain time (ms)
50% 10.2 ms
66% 12.0 ms
75% 12.6 ms
80% 12.9 ms
90% 14.0 ms
95% 14.9 ms
98% 16.2 ms
99% 17.3 ms
100% 430.9 ms
```
Randomly read 1 million files:
```
Concurrency Level: 64
Time taken for tests: 80.732 seconds
Complete requests: 1048576
Failed requests: 0
Total transferred: 1073741824 bytes
Requests per second: 12988.37 [#/sec]
Transfer rate: 12988.37 [Kbytes/sec]
Connection Times (ms)
min avg max std
Total: 0.0 4.7 254.3 6.3
Percentage of the requests served within a certain time (ms)
50% 2.6 ms
66% 2.9 ms
75% 3.7 ms
80% 4.7 ms
90% 10.3 ms
95% 16.6 ms
98% 26.3 ms
99% 34.8 ms
100% 254.3 ms
```

13
docs/api.rst

@ -69,17 +69,6 @@ One volume servers one write a time. If you need to increase concurrency, you ca
This generates 4 empty volumes.
Upload File Directly
***********************************
.. code-block:: bash
curl -F file=@/home/chris/myphoto.jpg http://localhost:9333/submit
{"fid":"3,01fbe0dc6f1f38","fileName":"myphoto.jpg",
"fileUrl":"localhost:8080/3,01fbe0dc6f1f38","size":68231}
This API is a little convenient. The master server would contact itself via HTTP to get an file id and store it to the right volume server. It is a convenient API and does not support different parameters when assigning file id.
Check System Status
***********************************
@ -191,7 +180,7 @@ Upload File
curl -F file=@/home/chris/myphoto.jpg http://127.0.0.1:8080/3,01637037d6
{"size": 43234}
The size returned is the size stored on WeedFS, sometimes the file is automatically gzipped based on the mime type.
The size returned is the size stored on Seaweed-FS, sometimes the file is automatically gzipped based on the mime type.
Upload File Directly
***********************************

20
docs/clients.rst

@ -8,17 +8,17 @@ Clients
+---------------------------------------------------------------------------------+--------------+-----------+
| `WeedPHP <https://github.com/micjohnson/weed-php/>`_ | Mic Johnson | PHP |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Weed-FS Symfony bundle <https://github.com/micjohnson/weed-php-bundle>`_ | Mic Johnson | PHP |
| `Seaweed-FS Symfony bundle <https://github.com/micjohnson/weed-php-bundle>`_ | Mic Johnson | PHP |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Weed-FS Node.js client <https://github.com/cruzrr/node-weedfs>`_ | Aaron Blakely| Javascript|
| `Seaweed-FS Node.js client <https://github.com/cruzrr/node-weedfs>`_ | Aaron Blakely| Javascript|
+---------------------------------------------------------------------------------+--------------+-----------+
| `Amazon S3 API for weed-fs <https://github.com/tgulacsi/s3weed>`_ | Tamás Gulácsi| Go |
| `Amazon S3 API for Seaweed-FS <https://github.com/tgulacsi/s3weed>`_ | Tamás Gulácsi| Go |
+---------------------------------------------------------------------------------+--------------+-----------+
| `File store upload test <https://github.com/tgulacsi/filestore-upload-test>`_ | Tamás Gulácsi| Go |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Java Weed-FS client <https://github.com/simplebread/WeedFSClient>`_ | Xu Zhang | Java |
| `Java Seaweed-FS client <https://github.com/simplebread/WeedFSClient>`_ | Xu Zhang | Java |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Java Weed-FS client 2 <https://github.com/zenria/Weed-FS-Java-Client>`_ | Zenria | Java |
| `Java Seaweed-FS client 2 <https://github.com/zenria/Weed-FS-Java-Client>`_ | Zenria | Java |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Python-weed <https://github.com/darkdarkfruit/python-weed>`_ | Darkdarkfruit| Python |
+---------------------------------------------------------------------------------+--------------+-----------+
@ -26,7 +26,7 @@ Clients
+---------------------------------------------------------------------------------+--------------+-----------+
| `Camlistore blobserver Storage <https://github.com/tgulacsi/camli-weed>`_ | Tamás Gulácsi| Go |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Scala Weed-FS client <https://github.com/chiradip/WeedFsScalaClient>`_ | Chiradip | Scala |
| `Scala Seaweed-FS client <https://github.com/chiradip/WeedFsScalaClient>`_ | Chiradip | Scala |
+---------------------------------------------------------------------------------+--------------+-----------+
| `Module for kohana <https://github.com/bububa/kohanaphp-weedfs>`_ | Bububa | PHP |
+---------------------------------------------------------------------------------+--------------+-----------+
@ -35,10 +35,10 @@ Clients
| `Django-weed <https://github.com/ProstoKSI/django-weed>`_ | ProstoKSI | Python |
+---------------------------------------------------------------------------------+--------------+-----------+
Projects using Weed-Fs
Projects using Seaweed-FS
###################################
* An `email River Plugin <https://github.com/medcl/elasticsearch-river-email/>`_ for Elasticsearch uses weed-fs server to save attachments
* An `email River Plugin <https://github.com/medcl/elasticsearch-river-email/>`_ for Elasticsearch uses Seaweed-FS server to save attachments
Websites using Weed-Fs
Websites using Seaweed-FS
###################################
* `Email to create Web Pages <http://mailp.in/>`_ uses weed-fs to save email attachments.
* `Email to create Web Pages <http://mailp.in/>`_ uses Seaweed-FS to save email attachments.

6
docs/directories.rst

@ -37,7 +37,7 @@ Design
A common file system would use inode to store meta data for each folder and file. The folder tree structure are usually linked. And sub folders and files are usually organized as an on-disk b+tree or similar variations. This scales well in terms of storage, but not well for fast file retrieval due to multiple disk access just for the file meta data, before even trying to get the file content.
WeedFS wants to make as small number of disk access as possible, yet still be able to store a lot of file metadata. So we need to think very differently.
Seaweed-FS wants to make as small number of disk access as possible, yet still be able to store a lot of file metadata. So we need to think very differently.
From a full file path to get to the file content, there are several steps:
@ -48,7 +48,7 @@ From a full file path to get to the file content, there are several steps:
file_id => data_block
Because default WeedFS only provides file_id=>data_block mapping, the first 2 steps need to be implemented.
Because default Seaweed-FS only provides file_id=>data_block mapping, the first 2 steps need to be implemented.
There are several data features I noticed:
@ -122,7 +122,7 @@ The LevelDB implementation may be switched underneath to external data storage,
Also, a HA feature will be added, so that multiple "weed filer" instance can share the same set of view of files.
Later, FUSE or HCFS plugins will be created, to really integrate WeedFS to existing systems.
Later, FUSE or HCFS plugins will be created, to really integrate Seaweed-FS to existing systems.
Helps Wanted
########################

2
docs/gettingstarted.rst

@ -3,7 +3,7 @@ Getting started
Installing Weed-Fs
###################################
Download a proper version from `WeedFS download page <https://bintray.com/chrislusf/Weed-FS/weed/>`_.
Download a proper version from `Seaweed-FS download page <https://bintray.com/chrislusf/Weed-FS/weed/>`_.
Decompress the downloaded file. You will only find one executable file, either "weed" on most systems or "weed.exe" on windows.

14
docs/optimization.rst

@ -1,14 +1,14 @@
Optimization
==============
Here are the strategies or best ways to optimize WeedFS.
Here are the strategies or best ways to optimize Seaweed-FS.
Increase concurrent writes
################################
By default, WeedFS grows the volumes automatically. For example, for no-replication volumes, there will be concurrently 7 writable volumes allocated.
By default, Seaweed-FS grows the volumes automatically. For example, for no-replication volumes, there will be concurrently 7 writable volumes allocated.
If you want to distribute writes to more volumes, you can do so by instructing WeedFS master via this URL.
If you want to distribute writes to more volumes, you can do so by instructing Seaweed-FS master via this URL.
.. code-block:: bash
@ -31,14 +31,14 @@ More hard drives will give you better write/read throughput.
Gzip content
################################
WeedFS determines the file can be gzipped based on the file name extension. So if you submit a textual file, it's better to use an common file name extension, like ".txt", ".html", ".js", ".css", etc. If the name is unknown, like ".go", WeedFS will not gzip the content, but just save the content as is.
Seaweed-FS determines the file can be gzipped based on the file name extension. So if you submit a textual file, it's better to use an common file name extension, like ".txt", ".html", ".js", ".css", etc. If the name is unknown, like ".go", Seaweed-FS will not gzip the content, but just save the content as is.
You can also manually gzip content before submission. If you do so, make sure the submitted file has file name with ends with ".gz". For example, "my.css" can be gzipped to "my.css.gz" and sent to WeedFS. When retrieving the content, if the http client supports "gzip" encoding, the gzipped content would be sent back. Otherwise, the unzipped content would be sent back.
You can also manually gzip content before submission. If you do so, make sure the submitted file has file name with ends with ".gz". For example, "my.css" can be gzipped to "my.css.gz" and sent to Seaweed-FS. When retrieving the content, if the http client supports "gzip" encoding, the gzipped content would be sent back. Otherwise, the unzipped content would be sent back.
Memory consumption
#################################
For volume servers, the memory consumption is tightly related to the number of files. For example, one 32G volume can easily have 1.5 million files if each file is only 20KB. To store the 1.5 million entries of meta data in memory, currently WeedFS consumes 36MB memory, about 24bytes per entry in memory. So if you allocate 64 volumes(2TB), you would need 2~3GB memory. However, if the average file size is larger, say 200KB, only 200~300MB memory is needed.
For volume servers, the memory consumption is tightly related to the number of files. For example, one 32G volume can easily have 1.5 million files if each file is only 20KB. To store the 1.5 million entries of meta data in memory, currently Seaweed-FS consumes 36MB memory, about 24bytes per entry in memory. So if you allocate 64 volumes(2TB), you would need 2~3GB memory. However, if the average file size is larger, say 200KB, only 200~300MB memory is needed.
Theoretically the memory consumption can go even lower by compacting since the file ids are mostly monotonically increasing. I did not invest time on that yet since the memory consumption, 24bytes/entry(including uncompressed 8bytes file id, 4 bytes file size, plus additional map data structure cost) is already pretty low. But I welcome any one to compact these data in memory even more efficiently.
@ -106,7 +106,7 @@ In case you need to delete them later, you can go to the volume servers and dele
Logging
##############################
When going to production, you will want to collect the logs. WeedFS uses glog. Here are some examples:
When going to production, you will want to collect the logs. Seaweed-FS uses glog. Here are some examples:
.. code-block:: bash

4
docs/replication.rst

@ -1,6 +1,6 @@
Replication
===================================
Weed-FS can support replication. The replication is implemented not on file level, but on volume level.
Seaweed-FS can support replication. The replication is implemented not on file level, but on volume level.
How to use
###################################
@ -58,7 +58,7 @@ Each replication type will physically create x+y+z+1 copies of volume data files
Example topology configuration
###################################
The WeedFS master server tries to read the default topology configuration file are read from /etc/weedfs/weedfs.conf, if it exists. The topology setting to configure data center and racks file format is as this.
The Seaweed-FS master server tries to read the default topology configuration file are read from /etc/weedfs/weedfs.conf, if it exists. The topology setting to configure data center and racks file format is as this.
.. code-block:: xml

2
docs/usecases.rst

@ -41,7 +41,7 @@ The wrong way to send it:
curl -H "Content-Type:image/png" -F file=@myImage.png http://127.0.0.1:8080/5,2730a7f18b44
Securing WeedFS
Securing Seaweed-FS
#############################
The simple way is to front all master and volume servers with firewall.

2
go/filer/client_operations.go

@ -3,7 +3,7 @@ package filer
import ()
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"

33
go/filer/directory_in_map.go

@ -2,15 +2,18 @@ package filer
import (
"bufio"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
)
var writeLock sync.Mutex //serialize changes to dir.log
type DirectoryEntryInMap struct {
Name string
Parent *DirectoryEntryInMap
@ -25,23 +28,25 @@ type DirectoryManagerInMap struct {
isLoading bool
}
func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap) {
func (dm *DirectoryManagerInMap) NewDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap, err error) {
writeLock.Lock()
defer writeLock.Unlock()
d = &DirectoryEntryInMap{Name: name, Parent: parent, SubDirectories: make(map[string]*DirectoryEntryInMap)}
dm.max++
d.Id = dm.max
parts := make([]string, 0)
for p := d; p != nil && p.Name != ""; p = p.Parent {
parts = append(parts, p.Name)
}
n := len(parts)
if n <= 0 {
return d
return nil, fmt.Errorf("Failed to create folder %s/%s", parent.Name, name)
}
for i := 0; i < n/2; i++ {
parts[i], parts[n-1-i] = parts[n-1-i], parts[i]
}
dm.max++
d.Id = dm.max
dm.log("add", "/"+strings.Join(parts, "/"), strconv.Itoa(int(d.Id)))
return d
return d, nil
}
func (dm *DirectoryManagerInMap) log(words ...string) {
@ -157,7 +162,11 @@ func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId
if i != len(parts)-1 {
return fmt.Errorf("%s should be created after parent %s!", dirPath, parts[i])
}
sub = dm.NewDirectoryEntryInMap(dir, parts[i])
var err error
sub, err = dm.NewDirectoryEntryInMap(dir, parts[i])
if err != nil {
return err
}
if sub.Id != dirId {
return fmt.Errorf("%s should be have id %v instead of %v!", dirPath, sub.Id, dirId)
}
@ -178,7 +187,11 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn
for i := 1; i < len(parts); i++ {
sub, ok := dir.SubDirectories[parts[i]]
if !ok {
sub = dm.NewDirectoryEntryInMap(dir, parts[i])
var err error
sub, err = dm.NewDirectoryEntryInMap(dir, parts[i])
if err != nil {
return nil, false
}
dir.SubDirectories[parts[i]] = sub
created = true
}
@ -193,6 +206,8 @@ func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, err
}
func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error {
writeLock.Lock()
defer writeLock.Unlock()
oldDir, oe := dm.findDirectory(oldDirPath)
if oe != nil {
return oe
@ -223,6 +238,8 @@ func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []Dir
return dirNames, nil
}
func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error {
writeLock.Lock()
defer writeLock.Unlock()
if dirPath == "/" {
return fmt.Errorf("Can not delete %s", dirPath)
}

2
go/filer/filer_embedded.go

@ -1,7 +1,7 @@
package filer
import (
"code.google.com/p/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/operation"
"errors"
"fmt"
"path/filepath"

2
go/filer/files_in_leveldb.go

@ -2,7 +2,7 @@ package filer
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

5
go/images/orientation.go

@ -21,7 +21,10 @@ func FixJpgOrientation(data []byte) (oriented []byte) {
}
angle := 0
flipMode := FlipDirection(0)
orient := tag.Int(0)
orient, err := tag.Int(0)
if err != nil {
return data
}
switch orient {
case topLeftSide:
// do nothing

9
go/operation/assign_file_id.go

@ -1,8 +1,8 @@
package operation
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@ -17,7 +17,7 @@ type AssignResult struct {
Error string `json:"error,omitempty"`
}
func Assign(server string, count int, replication string, collection string) (*AssignResult, error) {
func Assign(server string, count int, replication string, collection string, ttl string) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.Itoa(count))
if replication != "" {
@ -26,6 +26,9 @@ func Assign(server string, count int, replication string, collection string) (*A
if collection != "" {
values.Add("collection", collection)
}
if ttl != "" {
values.Add("ttl", ttl)
}
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
glog.V(2).Info("assign result :", string(jsonBlob))
if err != nil {

2
go/operation/delete_content.go

@ -1,7 +1,7 @@
package operation
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"

4
go/operation/list_masters.go

@ -1,8 +1,8 @@
package operation
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
)

2
go/operation/lookup.go

@ -1,7 +1,7 @@
package operation
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
_ "fmt"

13
go/operation/submit.go

@ -2,7 +2,7 @@ package operation
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"io"
"mime"
"os"
@ -20,6 +20,7 @@ type FilePart struct {
ModTime int64 //in seconds
Replication string
Collection string
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
}
@ -32,12 +33,12 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) {
func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
}
ret, err := Assign(master, len(files), replication, collection)
ret, err := Assign(master, len(files), replication, collection, ttl)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@ -112,7 +113,7 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0)
for i := int64(0); i < chunks; i++ {
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection)
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
if e != nil {
return 0, e
}
@ -130,8 +131,8 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
return
}
func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) {
ret, err := Assign(master, 1, replication, collection)
func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
ret, err := Assign(master, 1, replication, collection, ttl)
if err != nil {
return "", 0, err
}

12
go/operation/system_message.pb.go

@ -15,12 +15,10 @@ It has these top-level messages:
package operation
import proto "code.google.com/p/goprotobuf/proto"
import json "encoding/json"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type VolumeInformationMessage struct {
@ -33,6 +31,7 @@ type VolumeInformationMessage struct {
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -105,6 +104,13 @@ func (m *VolumeInformationMessage) GetVersion() uint32 {
return Default_VolumeInformationMessage_Version
}
func (m *VolumeInformationMessage) GetTtl() uint32 {
if m != nil && m.Ttl != nil {
return *m.Ttl
}
return 0
}
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`

2
go/operation/upload_content.go

@ -2,7 +2,7 @@ package operation
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"encoding/json"
"errors"
"fmt"

1
go/proto/system_message.proto

@ -10,6 +10,7 @@ message VolumeInformationMessage {
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
}
message JoinMessage {

2
go/storage/cdb_map.go

@ -1,7 +1,7 @@
package storage
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"

2
go/storage/cdb_map_test.go

@ -1,7 +1,7 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"math/rand"
"os"
"runtime"

4
go/storage/compact_map_perf_test.go

@ -1,8 +1,8 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"log"
"os"
"testing"

2
go/storage/compress.go

@ -2,7 +2,7 @@ package storage
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"compress/flate"
"compress/gzip"
"io/ioutil"

4
go/storage/crc.go

@ -1,7 +1,7 @@
package storage
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"hash/crc32"
)
@ -25,5 +25,5 @@ func (c CRC) Value() uint32 {
func (n *Needle) Etag() string {
bits := make([]byte, 4)
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
return fmt.Sprintf("\"%x\"", bits)
}

4
go/storage/file_id.go

@ -1,8 +1,8 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"strings"

15
go/storage/needle.go

@ -1,9 +1,9 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/images"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"io/ioutil"
@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
if n.Ttl != EMPTY_TTL {
n.SetHasTtl()
}
if fixJpgOrientation {
loweredName := strings.ToLower(fname)

4
go/storage/needle_map.go

@ -1,8 +1,8 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"os"

25
go/storage/needle_read_write.go

@ -1,8 +1,8 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"errors"
"fmt"
"io"
@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10
LastModifiedBytesLength = 5
TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
if n.HasTtl() {
n.Size = n.Size + TtlBytesLength
}
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
if n.HasTtl() {
n.Ttl.ToBytes(header[0:TtlBytesLength])
if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
return
}
}
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
if index < lenBytes && n.HasTtl() {
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
index = index + TtlBytesLength
}
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
func (n *Needle) HasTtl() bool {
return n.Flags&FlagHasTtl > 0
}
func (n *Needle) SetHasTtl() {
n.Flags = n.Flags | FlagHasTtl
}

8
go/storage/replica_placement.go

@ -5,10 +5,6 @@ import (
"fmt"
)
const (
ReplicaPlacementCount = 9
)
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
}

53
go/storage/store.go

@ -2,9 +2,9 @@ package storage
import (
proto "code.google.com/p/goprotobuf/proto"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
@ -14,6 +14,10 @@ import (
"strings"
)
const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
type DiskLocation struct {
Directory string
MaxVolumeCount int
@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
ttl, e := ReadTTL(ttlString)
if e != nil {
return e
}
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
e = s.addVolume(VolumeId(id), collection, rt)
e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
e = v.Destroy()
if e != nil {
return
}
delete(volumes, v.Id)
return
}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@ -240,6 +257,10 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.volumeSizeLimit) {
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(k)),
Size: proto.Uint64(uint64(v.Size())),
@ -250,10 +271,16 @@ func (s *Store) Join() (masterNode string, e error) {
ReadOnly: proto.Bool(v.readOnly),
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
}
volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
} else {
if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
s.DeleteVolume(location.volumes, v)
glog.V(0).Infoln("volume", v.Id, "is deleted.")
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
}
}
}

2
go/storage/store_vacuum.go

@ -1,7 +1,7 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"fmt"
"strconv"
)

130
go/storage/volume.go

@ -2,7 +2,7 @@ package storage
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"errors"
"fmt"
"io"
@ -12,22 +12,6 @@ import (
"time"
)
const (
SuperBlockSize = 8
)
type SuperBlock struct {
Version Version
ReplicaPlacement *ReplicaPlacement
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.Version)
header[1] = s.ReplicaPlacement.Byte()
return header
}
type Volume struct {
Id VolumeId
dir string
@ -39,11 +23,12 @@ type Volume struct {
SuperBlock
accessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds
}
func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return e
}
func (v *Volume) Version() Version {
return v.SuperBlock.Version
return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@ -138,44 +124,6 @@ func (v *Volume) Close() {
v.nm.Close()
_ = v.dataFile.Close()
}
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
return e
}
if stat.Size() == 0 {
v.SuperBlock.Version = CurrentVersion
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
v.readOnly = false
}
}
}
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
}
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e.Error())
}
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.Version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
return
}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
if v.lastModifiedTime < n.LastModified {
v.lastModifiedTime = n.LastModified
}
return
}
@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
if ok && nv.Offset > 0 {
return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
return bytesRead, err
}
if !n.HasTtl() {
return bytesRead, err
}
ttlMinutes := n.Ttl.Minutes()
if ttlMinutes == 0 {
return bytesRead, nil
}
if !n.HasLastModifiedDate() {
return bytesRead, nil
}
if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
// volume is expired if modified time + volume ttl < now
// except when volume is empty
// or when the volume does not have a ttl
// or when volumeSizeLimit is 0 when server just starts
func (v *Volume) expired(volumeSizeLimit uint64) bool {
if volumeSizeLimit == 0 {
//skip if we don't know size limit
return false
}
if v.ContentSize() == 0 {
return false
}
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
return false
}
// wait either maxDelayMinutes or 10% of ttl minutes
func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
removalDelay := v.Ttl.Minutes() / 10
if removalDelay > maxDelayMinutes {
removalDelay = maxDelayMinutes
}
if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
return true
}
return false
}

4
go/storage/volume_info.go

@ -1,13 +1,14 @@
package storage
import (
"code.google.com/p/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/operation"
)
type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
Collection string
Version Version
FileCount int
@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}

75
go/storage/volume_super_block.go

@ -0,0 +1,75 @@
package storage
import (
"github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
)
const (
SuperBlockSize = 8
)
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 2 and byte 3: Time to live. See TTL for definition
* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
}
func (s *SuperBlock) Version() Version {
return s.version
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
s.Ttl.ToBytes(header[2:4])
return header
}
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
return e
}
if stat.Size() == 0 {
v.SuperBlock.version = CurrentVersion
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
v.readOnly = false
}
}
}
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
}
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e.Error())
}
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
superBlock.Ttl = LoadTTLFromBytes(header[2:4])
return
}

23
go/storage/volume_super_block_test.go

@ -0,0 +1,23 @@
package storage
import (
"testing"
)
func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
ttl, _ := ReadTTL("15d")
s := &SuperBlock{
version: CurrentVersion,
ReplicaPlacement: rp,
Ttl: ttl,
}
bytes := s.Bytes()
if !(bytes[2] == 15 && bytes[3] == Day) {
println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
t.Fail()
}
}

135
go/storage/volume_ttl.go

@ -0,0 +1,135 @@
package storage
import (
"strconv"
)
const (
//stored unit types
Empty byte = iota
Minute
Hour
Day
Week
Month
Year
)
type TTL struct {
count byte
unit byte
}
var EMPTY_TTL = &TTL{}
// translate a readable ttl to internal ttl
// Supports format example:
// 3m: 3 minutes
// 4h: 4 hours
// 5d: 5 days
// 6w: 6 weeks
// 7M: 7 months
// 8y: 8 years
func ReadTTL(ttlString string) (*TTL, error) {
if ttlString == "" {
return EMPTY_TTL, nil
}
ttlBytes := []byte(ttlString)
unitByte := ttlBytes[len(ttlBytes)-1]
countBytes := ttlBytes[0 : len(ttlBytes)-1]
if '0' <= unitByte && unitByte <= '9' {
countBytes = ttlBytes
unitByte = 'm'
}
count, err := strconv.Atoi(string(countBytes))
unit := toStoredByte(unitByte)
return &TTL{count: byte(count), unit: unit}, err
}
// read stored bytes to a ttl
func LoadTTLFromBytes(input []byte) (t *TTL) {
return &TTL{count: input[0], unit: input[1]}
}
// read stored bytes to a ttl
func LoadTTLFromUint32(ttl uint32) (t *TTL) {
input := make([]byte, 2)
input[1] = byte(ttl)
input[0] = byte(ttl >> 8)
return LoadTTLFromBytes(input)
}
// save stored bytes to an output with 2 bytes
func (t TTL) ToBytes(output []byte) {
output[0] = t.count
output[1] = t.unit
}
func (t TTL) ToUint32() (output uint32) {
output = uint32(t.count) << 8
output += uint32(t.unit)
return output
}
func (t TTL) String() string {
if t.count == 0 {
return ""
}
if t.unit == Empty {
return ""
}
countString := strconv.Itoa(int(t.count))
switch t.unit {
case Minute:
return countString + "m"
case Hour:
return countString + "h"
case Day:
return countString + "d"
case Week:
return countString + "w"
case Month:
return countString + "M"
case Year:
return countString + "y"
}
return ""
}
func toStoredByte(readableUnitByte byte) byte {
switch readableUnitByte {
case 'm':
return Minute
case 'h':
return Hour
case 'd':
return Day
case 'w':
return Week
case 'M':
return Month
case 'y':
return Year
}
return 0
}
func (t TTL) Minutes() uint32 {
switch t.unit {
case Empty:
return 0
case Minute:
return uint32(t.count)
case Hour:
return uint32(t.count) * 60
case Day:
return uint32(t.count) * 60 * 24
case Week:
return uint32(t.count) * 60 * 24 * 7
case Month:
return uint32(t.count) * 60 * 24 * 31
case Year:
return uint32(t.count) * 60 * 24 * 365
}
return 0
}

60
go/storage/volume_ttl_test.go

@ -0,0 +1,60 @@
package storage
import (
"testing"
)
func TestTTLReadWrite(t *testing.T) {
ttl, _ := ReadTTL("")
if ttl.Minutes() != 0 {
t.Errorf("empty ttl:%v", ttl)
}
ttl, _ = ReadTTL("9")
if ttl.Minutes() != 9 {
t.Errorf("9 ttl:%v", ttl)
}
ttl, _ = ReadTTL("8m")
if ttl.Minutes() != 8 {
t.Errorf("8m ttl:%v", ttl)
}
ttl, _ = ReadTTL("5h")
if ttl.Minutes() != 300 {
t.Errorf("5h ttl:%v", ttl)
}
ttl, _ = ReadTTL("5d")
if ttl.Minutes() != 5*24*60 {
t.Errorf("5d ttl:%v", ttl)
}
ttl, _ = ReadTTL("5w")
if ttl.Minutes() != 5*7*24*60 {
t.Errorf("5w ttl:%v", ttl)
}
ttl, _ = ReadTTL("5M")
if ttl.Minutes() != 5*31*24*60 {
t.Errorf("5M ttl:%v", ttl)
}
ttl, _ = ReadTTL("5y")
if ttl.Minutes() != 5*365*24*60 {
t.Errorf("5y ttl:%v", ttl)
}
output := make([]byte, 2)
ttl.ToBytes(output)
ttl2 := LoadTTLFromBytes(output)
if ttl.Minutes() != ttl2.Minutes() {
t.Errorf("ttl:%v ttl2:%v", ttl, ttl2)
}
ttl3 := LoadTTLFromUint32(ttl.ToUint32())
if ttl.Minutes() != ttl3.Minutes() {
t.Errorf("ttl:%v ttl3:%v", ttl, ttl3)
}
}

16
go/storage/volume_vacuum.go

@ -1,10 +1,10 @@
package storage
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
_ "time"
"time"
)
func (v *Volume) garbageLevel() float64 {
@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
v.accessLock.Lock()
defer v.accessLock.Unlock()
glog.V(3).Infof("Got Compaction lock...")
//no need to lock for copy on write
//v.accessLock.Lock()
//defer v.accessLock.Unlock()
//glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
now := uint64(time.Now().Unix())
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
return nil
}
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {

2
go/tools/read_index.go

@ -1,7 +1,7 @@
package main
import (
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/storage"
"flag"
"fmt"
"log"

11
go/topology/allocate_volume.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@ -12,11 +12,12 @@ type AllocateVolumeResult struct {
Error string
}
func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", collection)
values.Add("replication", rp.String())
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil {
return err

4
go/topology/cluster_commands.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/goraft/raft"
)

25
go/topology/collection.go

@ -1,33 +1,34 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/storage"
)
type Collection struct {
Name string
volumeSizeLimit uint64
replicaType2VolumeLayout []*VolumeLayout
storageType2VolumeLayout map[string]*VolumeLayout
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
return c
}
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
replicaPlacementIndex := rp.GetReplicationLevelIndex()
if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
return c.replicaType2VolumeLayout[replicaPlacementIndex]
if c.storageType2VolumeLayout[keyString] == nil {
c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
}
return c.storageType2VolumeLayout[keyString]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.replicaType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.replicaType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
nodes = append(nodes, list...)

10
go/topology/data_node.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"strconv"
)
@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
}
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
for vid, _ := range dn.volumes {
for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
return
}
func (dn *DataNode) GetDataCenter() *DataCenter {

4
go/topology/node.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"errors"
"math/rand"
"strings"

8
go/topology/store_replicate.go

@ -2,10 +2,10 @@ package topology
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"net/http"
"strconv"
)

29
go/topology/topology.go

@ -1,10 +1,10 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
"errors"
"github.com/goraft/raft"
"io/ioutil"
@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
dn.UpdateVolumes(volumeInfos)
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {

10
go/topology/topology_event_handling.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"time"
)
@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}

2
go/topology/topology_map.go

@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} {
m["DataCenters"] = dcs
var layouts []interface{}
for _, c := range t.collectionMap {
for _, layout := range c.replicaType2VolumeLayout {
for _, layout := range c.storageType2VolumeLayout {
if layout != nil {
tmp := layout.ToMap()
tmp["collection"] = c.Name

8
go/topology/topology_vacuum.go

@ -1,9 +1,9 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
for _, vl := range c.replicaType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {

16
go/topology/volume_growth.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"fmt"
"math/rand"
"sync"
@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
Ttl *storage.TTL
DataCenter string
Rack string
DataNode string
@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
if err := AllocateVolume(server, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
Version: storage.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)

4
go/topology/volume_growth_test.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
"encoding/json"
"fmt"
"testing"

17
go/topology/volume_layout.go

@ -1,8 +1,8 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"errors"
"math/rand"
"sync"
@ -11,15 +11,17 @@ import (
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.removeFromWritable(v.Id)
delete(vl.vid2location, v.Id)
}
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
m["ttl"] = vl.ttl.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m

12
go/util/bytes.go

@ -1,5 +1,7 @@
package util
// big endian
func BytesToUint64(b []byte) (v uint64) {
length := uint(len(b))
for i := uint(0); i < length-1; i++ {
@ -18,6 +20,12 @@ func BytesToUint32(b []byte) (v uint32) {
v += uint32(b[length-1])
return
}
func BytesToUint16(b []byte) (v uint16) {
v += uint16(b[0])
v <<= 8
v += uint16(b[1])
return
}
func Uint64toBytes(b []byte, v uint64) {
for i := uint(0); i < 8; i++ {
b[7-i] = byte(v >> (i * 8))
@ -28,6 +36,10 @@ func Uint32toBytes(b []byte, v uint32) {
b[3-i] = byte(v >> (i * 8))
}
}
func Uint16toBytes(b []byte, v uint16) {
b[0] = byte(v >> 8)
b[1] = byte(v)
}
func Uint8toBytes(b []byte, v uint8) {
b[0] = byte(v)
}

2
go/util/config.go

@ -10,7 +10,7 @@ package util
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"encoding/json"
"os"
)

2
go/util/constants.go

@ -3,5 +3,5 @@ package util
import ()
const (
VERSION = "0.63 beta"
VERSION = "0.64"
)

2
go/util/file_util.go

@ -2,7 +2,7 @@ package util
import (
"bufio"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"errors"
"os"
)

2
go/util/net_timeout.go

@ -1,7 +1,7 @@
package util
import (
"code.google.com/p/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/stats"
"net"
"time"
)

10
go/weed/benchmark.go

@ -2,9 +2,9 @@ package main
import (
"bufio"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"math"
@ -98,7 +98,7 @@ func init() {
}
func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Weed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)

6
go/weed/compact.go

@ -1,8 +1,8 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
)
func init() {
@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}

4
go/weed/download.go

@ -1,8 +1,8 @@
package main
import (
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"io/ioutil"

6
go/weed/export.go

@ -3,8 +3,8 @@ package main
import (
"archive/tar"
"bytes"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"fmt"
"os"
"path"
@ -100,7 +100,7 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version
err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
version = superBlock.Version
version = superBlock.Version()
return nil
}, true, func(n *storage.Needle, offset int64) error {
nv, ok := nm.Get(n.Id)

8
go/weed/filer.go

@ -1,9 +1,9 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"code.google.com/p/weed-fs/go/weed/weed_server"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"strconv"
@ -63,7 +63,7 @@ func runFiler(cmd *Command, args []string) bool {
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
filerListener, e := util.NewListener(
":"+strconv.Itoa(*f.port),
time.Duration(10)*time.Second,

4
go/weed/fix.go

@ -1,8 +1,8 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"os"
"path"
"strconv"

8
go/weed/master.go

@ -1,9 +1,9 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"code.google.com/p/weed-fs/go/weed/weed_server"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http"
"os"
@ -63,7 +63,7 @@ func runMaster(cmd *Command, args []string) bool {
listeningAddress := *masterIp + ":" + strconv.Itoa(*mport)
glog.V(0).Infoln("Start Weed Master", util.VERSION, "at", listeningAddress)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second)
if e != nil {

10
go/weed/mount_std.go

@ -5,17 +5,17 @@ package main
import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
"code.google.com/p/weed-fs/go/filer"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"os"
"runtime"
)
func runMount(cmd *Command, args []string) bool {
fmt.Printf("This is Weed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *mountOptions.dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false

17
go/weed/server.go

@ -1,9 +1,9 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"code.google.com/p/weed-fs/go/weed/weed_server"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http"
"os"
@ -48,6 +48,7 @@ var cmdServer = &Command{
var (
serverIp = cmdServer.Flag.String("ip", "", "ip or server name")
serverPublicIp = cmdServer.Flag.String("publicIp", "", "ip or server name")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
@ -152,7 +153,7 @@ func runServer(cmd *Command, args []string) bool {
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port))
glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port))
filerListener, e := util.NewListener(
":"+strconv.Itoa(*filerOptions.port),
time.Duration(10)*time.Second,
@ -178,8 +179,8 @@ func runServer(cmd *Command, args []string) bool {
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList,
)
glog.V(0).Infoln("Start Weed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
masterListener, e := util.NewListener(*serverIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
if e != nil {
glog.Fatalf(e.Error())
}
@ -211,9 +212,9 @@ func runServer(cmd *Command, args []string) bool {
*volumeFixJpgOrientation,
)
glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
volumeListener, e := util.NewListener(
*serverIp+":"+strconv.Itoa(*volumePort),
*serverBindIp+":"+strconv.Itoa(*volumePort),
time.Duration(*serverTimeout)*time.Second,
)
if e != nil {

2
go/weed/shell.go

@ -2,7 +2,7 @@ package main
import (
"bufio"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
)

8
go/weed/upload.go

@ -1,7 +1,7 @@
package main
import (
"code.google.com/p/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
"fmt"
"os"
@ -12,6 +12,7 @@ var (
uploadReplication *string
uploadCollection *string
uploadDir *string
uploadTtl *string
include *string
maxMB *int
)
@ -24,6 +25,7 @@ func init() {
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type")
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
}
@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}

6
go/weed/version.go

@ -1,7 +1,7 @@
package main
import (
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/util"
"fmt"
"runtime"
)
@ -9,8 +9,8 @@ import (
var cmdVersion = &Command{
Run: runVersion,
UsageLine: "version",
Short: "print Weed File System version",
Long: `Version prints the Weed File System version`,
Short: "print Seaweed File System version",
Long: `Version prints the Seaweed File System version`,
}
func runVersion(cmd *Command, args []string) bool {

11
go/weed/volume.go

@ -1,9 +1,9 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"code.google.com/p/weed-fs/go/weed/weed_server"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"runtime"
@ -30,6 +30,7 @@ var (
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name")
publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
@ -84,9 +85,9 @@ func runVolume(cmd *Command, args []string) bool {
*fixJpgOrientation,
)
listeningAddress := *ip + ":" + strconv.Itoa(*vport)
listeningAddress := *bindIp + ":" + strconv.Itoa(*vport)
glog.V(0).Infoln("Start Weed volume server", util.VERSION, "at", listeningAddress)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*vTimeout)*time.Second)
if e != nil {

2
go/weed/volume_test.go

@ -1,7 +1,7 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"net/http"
"testing"
"time"

4
go/weed/weed.go

@ -1,7 +1,7 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"flag"
"fmt"
"io"
@ -90,7 +90,7 @@ func main() {
}
var usageTemplate = `
Weed File System : store billions of files and serve them fast!
Seaweed File System : store billions of files and serve them fast!
Usage:

14
go/weed/weed_server/common.go

@ -2,11 +2,11 @@ package weed_server
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"fmt"
"net"
@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, pe)
return
}
debug("assigning file id for", fname)
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"))
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
if ae != nil {
writeJsonError(w, r, ae)
return

4
go/weed/weed_server/filer_server.go

@ -1,8 +1,8 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/filer"
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog"
"net/http"
"strconv"
)

19
go/weed/weed_server/filer_server_handlers.go

@ -1,9 +1,9 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"github.com/syndtr/goleveldb/leveldb"
@ -103,12 +103,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
for k, v := range resp.Header {
w.Header()[k] = v
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection)
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, ae)
@ -130,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
writeJsonError(w, r, do_err)
return
}
defer resp.Body.Close()
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
glog.V(0).Infoln("failing to upload to volume server", ra_err.Error())
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
writeJsonError(w, r, ra_err)
return
}
@ -145,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
var ret operation.UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload resonse", string(resp_body))
glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
writeJsonError(w, r, unmarshal_err)
return
}
if ret.Error != "" {
glog.V(0).Infoln("failing to post to volume server", ret.Error)
glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
writeJsonError(w, r, errors.New(ret.Error))
return
}
@ -168,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
glog.V(0).Infoln("failing to write to filer server", db_err.Error())
glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
writeJsonError(w, r, db_err)
return
}

2
go/weed/weed_server/filer_server_handlers_admin.go

@ -1,7 +1,7 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"net/http"
)

8
go/weed/weed_server/master_server.go

@ -1,10 +1,10 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft"
"github.com/gorilla/mux"
"net/http"

6
go/weed/weed_server/master_server_handlers.go

@ -1,9 +1,9 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"net/http"
"strconv"
"strings"

19
go/weed/weed_server/master_server_handlers_admin.go

@ -2,11 +2,11 @@ package weed_server
import (
proto "code.google.com/p/goprotobuf/proto"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"io/ioutil"
@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
}
ms.Topo.RegisterVolumes(joinMessage)
ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
ttl, err := storage.ReadTTL(r.FormValue("ttl"))
if err != nil {
return nil, err
}
volumeGrowOption := &topology.VolumeGrowOption{
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
DataNode: r.FormValue("dataNode"),

4
go/weed/weed_server/raft_server.go

@ -2,8 +2,8 @@ package weed_server
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/topology"
"encoding/json"
"errors"
"fmt"

4
go/weed/weed_server/raft_server_handlers.go

@ -1,8 +1,8 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
"github.com/goraft/raft"
"io/ioutil"

5
go/weed/weed_server/volume_server.go

@ -1,8 +1,8 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"net/http"
"strconv"
@ -35,7 +35,6 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, fol
}
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
r.HandleFunc("/submit", secure(vs.whiteList, vs.submitFromVolumeServerHandler))
r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler))
r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler))
r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler))

12
go/weed/weed_server/volume_server_handlers.go

@ -1,12 +1,12 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/images"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
"io"
"mime"
"mime/multipart"

12
go/weed/weed_server/volume_server_handlers_admin.go

@ -1,9 +1,9 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/util"
"net/http"
"path/filepath"
)
@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"))
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {
@ -50,10 +50,6 @@ func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Reque
glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err)
}
func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) {
submitForClientHandler(w, r, vs.masterNode)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION

2
go/weed/weed_server/volume_server_handlers_vacuum.go

@ -1,7 +1,7 @@
package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/glog"
"net/http"
)

Loading…
Cancel
Save