Browse Source

Merge branch 'master' of https://github.com/chrislusf/weed-fs

pull/33/head
Brian McQueen 10 years ago
parent
commit
a3583e4e7c
  1. 5
      .project
  2. 1
      .travis.yml
  3. 2
      Dockerfile
  4. 42
      README.md
  5. 5
      docs/api.rst
  6. 19
      docs/benchmarks.rst
  7. 2
      docs/directories.rst
  8. 2
      docs/failover.rst
  9. 6
      docs/gettingstarted.rst
  10. 6
      go/filer/client_operations.go
  11. 2
      go/filer/directory.go
  12. 3
      go/filer/directory_in_map.go
  13. 2
      go/filer/filer.go
  14. 3
      go/filer/filer_embedded.go
  15. 1
      go/filer/files_in_leveldb.go
  16. 2
      go/glog/convenient_api.go
  17. 3
      go/images/orientation.go
  18. 3
      go/images/resizing.go
  19. 5
      go/operation/assign_file_id.go
  20. 2
      go/operation/data_struts.go
  21. 3
      go/operation/delete_content.go
  22. 3
      go/operation/list_masters.go
  23. 3
      go/operation/lookup.go
  24. 3
      go/operation/submit.go
  25. 3
      go/operation/system_message_test.go
  26. 3
      go/operation/upload_content.go
  27. 2
      go/sequence/sequence.go
  28. 2
      go/stats/disk.go
  29. 2
      go/stats/disk_notsupported.go
  30. 2
      go/stats/memory_notsupported.go
  31. 5
      go/storage/cdb_map.go
  32. 3
      go/storage/cdb_map_test.go
  33. 2
      go/storage/compact_map.go
  34. 5
      go/storage/compact_map_perf_test.go
  35. 3
      go/storage/compress.go
  36. 3
      go/storage/crc.go
  37. 5
      go/storage/file_id.go
  38. 7
      go/storage/needle.go
  39. 5
      go/storage/needle_map.go
  40. 5
      go/storage/needle_read_write.go
  41. 9
      go/storage/store.go
  42. 3
      go/storage/store_vacuum.go
  43. 26
      go/storage/volume.go
  44. 3
      go/storage/volume_super_block.go
  45. 3
      go/storage/volume_vacuum.go
  46. 2
      go/storage/volume_version.go
  47. 3
      go/tools/read_index.go
  48. 5
      go/topology/allocate_volume.go
  49. 21
      go/topology/collection.go
  50. 2
      go/topology/data_center.go
  51. 3
      go/topology/data_node.go
  52. 5
      go/topology/node.go
  53. 5
      go/topology/store_replicate.go
  54. 38
      go/topology/topology.go
  55. 5
      go/topology/topology_event_handling.go
  56. 9
      go/topology/topology_map.go
  57. 21
      go/topology/topology_vacuum.go
  58. 5
      go/topology/volume_growth.go
  59. 5
      go/topology/volume_growth_test.go
  60. 5
      go/topology/volume_layout.go
  61. 2
      go/topology/volume_location_list.go
  62. 37
      go/util/concurrent_read_map.go
  63. 3
      go/util/config.go
  64. 4
      go/util/constants.go
  65. 3
      go/util/file_util.go
  66. 3
      go/util/net_timeout.go
  67. 277
      go/weed/benchmark.go
  68. 2
      go/weed/compact.go
  69. 5
      go/weed/download.go
  70. 7
      go/weed/export.go
  71. 15
      go/weed/filer.go
  72. 7
      go/weed/fix.go
  73. 12
      go/weed/master.go
  74. 2
      go/weed/mount.go
  75. 7
      go/weed/mount_std.go
  76. 25
      go/weed/server.go
  77. 3
      go/weed/shell.go
  78. 2
      go/weed/signal_handling_notsupported.go
  79. 3
      go/weed/upload.go
  80. 3
      go/weed/version.go
  81. 11
      go/weed/volume.go
  82. 3
      go/weed/volume_test.go
  83. 3
      go/weed/weed.go
  84. 11
      go/weed/weed_server/common.go
  85. 27
      go/weed/weed_server/filer_server.go
  86. 22
      go/weed/weed_server/filer_server_handlers.go
  87. 3
      go/weed/weed_server/filer_server_handlers_admin.go
  88. 9
      go/weed/weed_server/master_server.go
  89. 11
      go/weed/weed_server/master_server_handlers.go
  90. 15
      go/weed/weed_server/master_server_handlers_admin.go
  91. 55
      go/weed/weed_server/raft_server.go
  92. 7
      go/weed/weed_server/raft_server_handlers.go
  93. 5
      go/weed/weed_server/volume_server.go
  94. 13
      go/weed/weed_server/volume_server_handlers.go
  95. 5
      go/weed/weed_server/volume_server_handlers_admin.go
  96. 3
      go/weed/weed_server/volume_server_handlers_vacuum.go

5
.project

@ -5,11 +5,6 @@
<projects> <projects>
</projects> </projects>
<buildSpec> <buildSpec>
<buildCommand>
<name>com.googlecode.goclipse.goBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec> </buildSpec>
<natures> <natures>
<nature>goclipse.goNature</nature> <nature>goclipse.goNature</nature>

1
.travis.yml

@ -1,6 +1,5 @@
language: go language: go
go: go:
- 1.2
- 1.3 - 1.3
- release - release
- tip - tip

2
Dockerfile

@ -1,5 +1,5 @@
FROM cydev/go FROM cydev/go
RUN go get code.google.com/p/weed-fs/go/weed
RUN go get github.com/chrislusf/weed-fs/go/weed
EXPOSE 8080 EXPOSE 8080
EXPOSE 9333 EXPOSE 9333
VOLUME /data VOLUME /data

42
README.md

@ -5,16 +5,6 @@ Seaweed File System
[![GoDoc](https://godoc.org/github.com/chrislusf/weed-fs/go?status.svg)](https://godoc.org/github.com/chrislusf/weed-fs/go) [![GoDoc](https://godoc.org/github.com/chrislusf/weed-fs/go?status.svg)](https://godoc.org/github.com/chrislusf/weed-fs/go)
[![RTD](https://readthedocs.org/projects/weed-fs/badge/?version=latest)](http://weed-fs.readthedocs.org/en/latest/) [![RTD](https://readthedocs.org/projects/weed-fs/badge/?version=latest)](http://weed-fs.readthedocs.org/en/latest/)
## Usage
```
go get github.com/chrislusf/weed-fs/go/weed
```
## Reference
For pre-compiled releases,
https://bintray.com/chrislusf/Weed-FS/seaweed
## Introduction ## Introduction
@ -246,12 +236,34 @@ More tools and documentation, on how to maintain and scale the system. For examp
This is a super exciting project! And I need helpers! This is a super exciting project! And I need helpers!
## Contributions ##
To make contributions easier, I have mirrored a repo in github.com
```
https://github.com/chrislusf/weed-fs.git
```
## Installation guide for users who are not familiar with golang
step 1: install go on your machine and setup the environment by following the instructions from the following link:
https://golang.org/doc/install
make sure you set up your $GOPATH
step 2: also you may need to install Mercurial by following the instructions below
http://mercurial.selenic.com/downloads
step 3: download, compile, and install the project by executing the following command
go get github.com/chrislusf/weed-fs/go/weed
once this is done, you should see the executable "weed" under $GOPATH/bin
step 4: after you modify your code locally, you could start a local build by calling "go install" under $GOPATH/src/github.com/chrislusf/weed-fs/go/weed
## Reference
For pre-compiled releases,
https://bintray.com/chrislusf/Weed-FS/seaweed
## Disk Related topics ## ## Disk Related topics ##
### Hard Drive Performance ### ### Hard Drive Performance ###

5
docs/api.rst

@ -187,10 +187,11 @@ Upload File Directly
.. code-block:: bash .. code-block:: bash
curl -F file=@/home/chris/myphoto.jpg http://localhost:8080/submit
curl -F file=@/home/chris/myphoto.jpg http://localhost:9333/submit
{"fid":"3,01fbe0dc6f1f38","fileName":"myphoto.jpg","fileUrl":"localhost:8080/3,01fbe0dc6f1f38","size":68231} {"fid":"3,01fbe0dc6f1f38","fileName":"myphoto.jpg","fileUrl":"localhost:8080/3,01fbe0dc6f1f38","size":68231}
This API is a little convenient. The volume server would contact the master to get an file id and store it to the right volume server(not necessarily itself).
This API is just for convenience. The master server would get an file id and store the file to the right volume server.
It is a convenient API and does not support different parameters when assigning file id. (or you can add the support and send a push request.)
Delete File Delete File
*********************************** ***********************************

19
docs/benchmarks.rst

@ -1,7 +1,7 @@
Benchmarks Benchmarks
====================== ======================
Do we really need the benchmark? People always use benchmark to compare systems. But benchmarks are misleading. The resources, e.g., CPU, disk, memory, network, all matter a lot. And with Weed File System, single node vs multiple nodes, benchmarking on one machine vs several multiple machines, all matter a lot.
Do we really need the benchmark? People always use benchmark to compare systems. But benchmarks are misleading. The resources, e.g., CPU, disk, memory, network, all matter a lot. And with Seaweed File System, single node vs multiple nodes, benchmarking on one machine vs several multiple machines, all matter a lot.
Here is the steps on how to run benchmark if you really need some numbers. Here is the steps on how to run benchmark if you really need some numbers.
@ -38,7 +38,22 @@ Many options are options are configurable. Please check the help content:
Common Problems Common Problems
############################### ###############################
The most common problem is "too many open files" error. This is because the test itself starts too many network connections on one single machine. In my local macbook, if I ran "random read" following writing right away, the error happens always. I have to run "weed benchmark -write=false" to run the reading test only. Also, changing the concurrency level to "-c=16" would also help.
The most common
I start weed servers in one console for simplicity. Better run servers on different consoles.
For more realistic tests, please start them on different machines.
.. code-block:: bash
# prepare directories
mkdir 3 4 5
# start 3 servers
./weed server -dir=./3 -master.port=9333 -volume.port=8083 &
./weed volume -dir=./4 -port=8084 &
./weed volume -dir=./5 -port=8085 &
./weed benchmark -server=localhost:9333
problem is "too many open files" error. This is because the test itself starts too many network connections on one single machine. In my local macbook, if I ran "random read" following writing right away, the error happens always. I have to run "weed benchmark -write=false" to run the reading test only. Also, changing the concurrency level to "-c=16" would also help.
My own unscientific single machine results My own unscientific single machine results
################################################### ###################################################

2
docs/directories.rst

@ -1,7 +1,7 @@
Directories and files Directories and files
=========================== ===========================
When talking about file systems, many people would assume directories, list files under a directory, etc. These are expected if we want to hook up Weed File System with linux by FUSE, or with Hadoop, etc.
When talking about file systems, many people would assume directories, list files under a directory, etc. These are expected if we want to hook up Seaweed File System with linux by FUSE, or with Hadoop, etc.
Sample usage Sample usage
##################### #####################

2
docs/failover.rst

@ -7,7 +7,7 @@ Introduction
Some user will ask for no single point of failure. Although google runs its file system with a single master for years, no SPOF seems becoming a criteria for architects to pick solutions. Some user will ask for no single point of failure. Although google runs its file system with a single master for years, no SPOF seems becoming a criteria for architects to pick solutions.
Luckily, it's not too difficult to enable Weed File System with failover master servers.
Luckily, it's not too difficult to enable Seaweed File System with failover master servers.
Cheat Sheet: Startup multiple servers Cheat Sheet: Startup multiple servers
######################################## ########################################

6
docs/gettingstarted.rst

@ -1,6 +1,6 @@
Getting started Getting started
=================================== ===================================
Installing Weed-Fs
Installing Seaweed-FS
################################### ###################################
Download a proper version from `Seaweed-FS download page <https://bintray.com/chrislusf/Weed-FS/weed/>`_. Download a proper version from `Seaweed-FS download page <https://bintray.com/chrislusf/Weed-FS/weed/>`_.
@ -57,7 +57,7 @@ Actually, forget about previous commands. You can setup one master server and on
# use "weed server -h" to find out more # use "weed server -h" to find out more
./weed server -master.port=9333 -volume.port=8080 -dir="./data" ./weed server -master.port=9333 -volume.port=8080 -dir="./data"
Testing Weed-Fs
Testing Seaweed-FS
################################### ###################################
With the master and volume server up, now what? Let's pump in a lot of files into the system! With the master and volume server up, now what? Let's pump in a lot of files into the system!
@ -77,7 +77,7 @@ Then, you can simply check "du -m -s /some/big/folder" to see the actual disk us
Now you can use your tools to hit weed-fs as hard as you can. Now you can use your tools to hit weed-fs as hard as you can.
Using Weed-Fs in docker
Using Seaweed-FS in docker
#################################### ####################################
You can use image "cydev/weed" or build your own with `dockerfile <https://github.com/chrislusf/weed-fs/blob/master/Dockerfile>`_ in the root of repo. You can use image "cydev/weed" or build your own with `dockerfile <https://github.com/chrislusf/weed-fs/blob/master/Dockerfile>`_ in the root of repo.

6
go/filer/client_operations.go

@ -1,12 +1,12 @@
package filer package filer
import ()
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/chrislusf/weed-fs/go/util"
"net/url" "net/url"
) )

2
go/filer/directory.go

@ -1,7 +1,5 @@
package filer package filer
import ()
type DirectoryId int32 type DirectoryId int32
type DirectoryEntry struct { type DirectoryEntry struct {

3
go/filer/directory_in_map.go

@ -2,7 +2,6 @@ package filer
import ( import (
"bufio" "bufio"
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -10,6 +9,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"github.com/chrislusf/weed-fs/go/util"
) )
var writeLock sync.Mutex //serialize changes to dir.log var writeLock sync.Mutex //serialize changes to dir.log

2
go/filer/filer.go

@ -1,7 +1,5 @@
package filer package filer
import ()
type FileId string //file id on weedfs type FileId string //file id on weedfs
type FileEntry struct { type FileEntry struct {

3
go/filer/filer_embedded.go

@ -1,11 +1,12 @@
package filer package filer
import ( import (
"github.com/chrislusf/weed-fs/go/operation"
"errors" "errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/operation"
) )
type FilerEmbedded struct { type FilerEmbedded struct {

1
go/filer/files_in_leveldb.go

@ -2,6 +2,7 @@ package filer
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"

2
go/glog/convenient_api.go

@ -1,7 +1,5 @@
package glog package glog
import ()
/* /*
Copying the original glog because it is missing several convenient methods. Copying the original glog because it is missing several convenient methods.
1. remove nano time in log format 1. remove nano time in log format

3
go/images/orientation.go

@ -2,11 +2,12 @@ package images
import ( import (
"bytes" "bytes"
"github.com/rwcarlsen/goexif/exif"
"image" "image"
"image/draw" "image/draw"
"image/jpeg" "image/jpeg"
"log" "log"
"github.com/rwcarlsen/goexif/exif"
) )
//many code is copied from http://camlistore.org/pkg/images/images.go //many code is copied from http://camlistore.org/pkg/images/images.go

3
go/images/resizing.go

@ -2,11 +2,12 @@ package images
import ( import (
"bytes" "bytes"
"github.com/disintegration/imaging"
"image" "image"
"image/gif" "image/gif"
"image/jpeg" "image/jpeg"
"image/png" "image/png"
"github.com/disintegration/imaging"
) )
func Resized(ext string, data []byte, width, height int) (resized []byte, w int, h int) { func Resized(ext string, data []byte, width, height int) (resized []byte, w int, h int) {

5
go/operation/assign_file_id.go

@ -1,12 +1,13 @@
package operation package operation
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"net/url" "net/url"
"strconv" "strconv"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
) )
type AssignResult struct { type AssignResult struct {

2
go/operation/data_struts.go

@ -1,7 +1,5 @@
package operation package operation
import ()
type JoinResult struct { type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`

3
go/operation/delete_content.go

@ -1,12 +1,13 @@
package operation package operation
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
"github.com/chrislusf/weed-fs/go/util"
) )
type DeleteResult struct { type DeleteResult struct {

3
go/operation/list_masters.go

@ -1,9 +1,10 @@
package operation package operation
import ( import (
"encoding/json"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
) )
type ClusterStatusResult struct { type ClusterStatusResult struct {

3
go/operation/lookup.go

@ -1,7 +1,6 @@
package operation package operation
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
_ "fmt" _ "fmt"
@ -9,6 +8,8 @@ import (
"net/url" "net/url"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/util"
) )
type Location struct { type Location struct {

3
go/operation/submit.go

@ -2,13 +2,14 @@ package operation
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"io" "io"
"mime" "mime"
"os" "os"
"path" "path"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
) )
type FilePart struct { type FilePart struct {

3
go/operation/system_message_test.go

@ -1,10 +1,11 @@
package operation package operation
import ( import (
proto "code.google.com/p/goprotobuf/proto"
"encoding/json" "encoding/json"
"log" "log"
"testing" "testing"
proto "code.google.com/p/goprotobuf/proto"
) )
func TestSerialDeserial(t *testing.T) { func TestSerialDeserial(t *testing.T) {

3
go/operation/upload_content.go

@ -2,7 +2,6 @@ package operation
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -14,6 +13,8 @@ import (
"net/textproto" "net/textproto"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
) )
type UploadResult struct { type UploadResult struct {

2
go/sequence/sequence.go

@ -1,7 +1,5 @@
package sequence package sequence
import ()
type Sequencer interface { type Sequencer interface {
NextFileId(count int) (uint64, int) NextFileId(count int) (uint64, int)
SetMax(uint64) SetMax(uint64)

2
go/stats/disk.go

@ -1,7 +1,5 @@
package stats package stats
import ()
type DiskStatus struct { type DiskStatus struct {
Dir string Dir string
All uint64 All uint64

2
go/stats/disk_notsupported.go

@ -2,8 +2,6 @@
package stats package stats
import ()
func (disk *DiskStatus) fillInStatus() { func (disk *DiskStatus) fillInStatus() {
return return
} }

2
go/stats/memory_notsupported.go

@ -2,8 +2,6 @@
package stats package stats
import ()
func (mem *MemStatus) fillInStatus() { func (mem *MemStatus) fillInStatus() {
return return
} }

5
go/storage/cdb_map.go

@ -1,13 +1,14 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/tgulacsi/go-cdb"
"os" "os"
"path/filepath" "path/filepath"
"github.com/chrislusf/weed-fs/go/util"
"github.com/tgulacsi/go-cdb"
) )
// CDB-backed read-only needle map // CDB-backed read-only needle map

3
go/storage/cdb_map_test.go

@ -1,11 +1,12 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"math/rand" "math/rand"
"os" "os"
"runtime" "runtime"
"testing" "testing"
"github.com/chrislusf/weed-fs/go/glog"
) )
var testIndexFilename string = "../../test/sample.idx" var testIndexFilename string = "../../test/sample.idx"

2
go/storage/compact_map.go

@ -1,7 +1,5 @@
package storage package storage
import ()
type NeedleValue struct { type NeedleValue struct {
Key Key Key Key
Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G

5
go/storage/compact_map_perf_test.go

@ -1,11 +1,12 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"log" "log"
"os" "os"
"testing" "testing"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
) )
func TestMemoryUsage(t *testing.T) { func TestMemoryUsage(t *testing.T) {

3
go/storage/compress.go

@ -2,11 +2,12 @@ package storage
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"compress/flate" "compress/flate"
"compress/gzip" "compress/gzip"
"io/ioutil" "io/ioutil"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
) )
/* /*

3
go/storage/crc.go

@ -1,9 +1,10 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"github.com/chrislusf/weed-fs/go/util"
) )
var table = crc32.MakeTable(crc32.Castagnoli) var table = crc32.MakeTable(crc32.Castagnoli)

5
go/storage/file_id.go

@ -1,11 +1,12 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"encoding/hex" "encoding/hex"
"errors" "errors"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
) )
type FileId struct { type FileId struct {

7
go/storage/needle.go

@ -1,9 +1,6 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/util"
"encoding/hex" "encoding/hex"
"errors" "errors"
"io/ioutil" "io/ioutil"
@ -13,6 +10,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/util"
) )
const ( const (

5
go/storage/needle_map.go

@ -1,11 +1,12 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"io" "io"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
) )
type NeedleMapper interface { type NeedleMapper interface {

5
go/storage/needle_read_write.go

@ -1,12 +1,13 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
) )
const ( const (

9
go/storage/store.go

@ -1,10 +1,6 @@
package storage package storage
import ( import (
proto "code.google.com/p/goprotobuf/proto"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -12,6 +8,11 @@ import (
"math/rand" "math/rand"
"strconv" "strconv"
"strings" "strings"
proto "code.google.com/p/goprotobuf/proto"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
) )
const ( const (

3
go/storage/store_vacuum.go

@ -1,9 +1,10 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"fmt" "fmt"
"strconv" "strconv"
"github.com/chrislusf/weed-fs/go/glog"
) )
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {

26
go/storage/volume.go

@ -2,7 +2,6 @@ package storage
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -10,6 +9,8 @@ import (
"path" "path"
"sync" "sync"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
) )
type Volume struct { type Volume struct {
@ -72,7 +73,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
if e != nil { if e != nil {
if !os.IsPermission(e) { if !os.IsPermission(e) {
return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
} }
} }
@ -92,12 +93,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
if v.readOnly { if v.readOnly {
glog.V(1).Infoln("open to read file", fileName+".idx") glog.V(1).Infoln("open to read file", fileName+".idx")
if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
return fmt.Errorf("cannot read Volume Index %s.idx: %s", fileName, e.Error())
return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
} }
} else { } else {
glog.V(1).Infoln("open to write file", fileName+".idx") glog.V(1).Infoln("open to write file", fileName+".idx")
if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
return fmt.Errorf("cannot write Volume Index %s.idx: %s", fileName, e.Error())
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
} }
} }
glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
@ -115,7 +116,7 @@ func (v *Volume) Size() int64 {
if e == nil { if e == nil {
return stat.Size() return stat.Size()
} }
glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
return -1 return -1
} }
func (v *Volume) Close() { func (v *Volume) Close() {
@ -170,6 +171,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
} }
var offset int64 var offset int64
if offset, err = v.dataFile.Seek(0, 2); err != nil { if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("faile to seek the end of file: %v", err)
return return
} }
@ -177,21 +179,21 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
if offset%NeedlePaddingSize != 0 { if offset%NeedlePaddingSize != 0 {
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
if offset, err = v.dataFile.Seek(offset, 0); err != nil { if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return return
} }
} }
if size, err = n.Append(v.dataFile, v.Version()); err != nil { if size, err = n.Append(v.dataFile, v.Version()); err != nil {
if e := v.dataFile.Truncate(offset); e != nil { if e := v.dataFile.Truncate(offset); e != nil {
err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile.Name(), e.Error())
err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
} }
return return
} }
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
} }
} }
if v.lastModifiedTime < n.LastModified { if v.lastModifiedTime < n.LastModified {
@ -292,13 +294,13 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
offset := int64(SuperBlockSize) offset := int64(SuperBlockSize)
n, rest, e := ReadNeedleHeader(v.dataFile, version, offset) n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
if e != nil { if e != nil {
err = fmt.Errorf("cannot read needle header: %s", e)
err = fmt.Errorf("cannot read needle header: %v", e)
return return
} }
for n != nil { for n != nil {
if readNeedleBody { if readNeedleBody {
if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
err = fmt.Errorf("cannot read needle body: %s", err)
err = fmt.Errorf("cannot read needle body: %v", err)
return return
} }
} }
@ -310,7 +312,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
return fmt.Errorf("cannot read needle header: %s", err)
return fmt.Errorf("cannot read needle header: %v", err)
} }
} }
@ -360,7 +362,7 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
defer indexFile.Close() defer indexFile.Close()
glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
return false return false
} }
return true return true

3
go/storage/volume_super_block.go

@ -1,9 +1,10 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"fmt" "fmt"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
) )
const ( const (

3
go/storage/volume_vacuum.go

@ -1,10 +1,11 @@
package storage package storage
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"fmt" "fmt"
"os" "os"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
) )
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {

2
go/storage/volume_version.go

@ -1,7 +1,5 @@
package storage package storage
import ()
type Version uint8 type Version uint8
const ( const (

3
go/tools/read_index.go

@ -1,11 +1,12 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/storage"
"flag" "flag"
"fmt" "fmt"
"log" "log"
"os" "os"
"github.com/chrislusf/weed-fs/go/storage"
) )
var ( var (

5
go/topology/allocate_volume.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"net/url" "net/url"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
) )
type AllocateVolumeResult struct { type AllocateVolumeResult struct {

21
go/topology/collection.go

@ -2,17 +2,18 @@ package topology
import ( import (
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
) )
type Collection struct { type Collection struct {
Name string Name string
volumeSizeLimit uint64 volumeSizeLimit uint64
storageType2VolumeLayout map[string]*VolumeLayout
storageType2VolumeLayout *util.ConcurrentReadMap
} }
func NewCollection(name string, volumeSizeLimit uint64) *Collection { func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c return c
} }
@ -21,16 +22,16 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
if ttl != nil { if ttl != nil {
keyString += ttl.String() keyString += ttl.String()
} }
if c.storageType2VolumeLayout[keyString] == nil {
c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
}
return c.storageType2VolumeLayout[keyString]
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
} }
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
if list := vl.Lookup(vid); list != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list return list
} }
} }
@ -39,9 +40,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
} }
func (c *Collection) ListVolumeServers() (nodes []*DataNode) { func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.storageType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...) nodes = append(nodes, list...)
} }
} }

2
go/topology/data_center.go

@ -1,7 +1,5 @@
package topology package topology
import ()
type DataCenter struct { type DataCenter struct {
NodeImpl NodeImpl
} }

3
go/topology/data_node.go

@ -1,9 +1,10 @@
package topology package topology
import ( import (
"strconv"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"strconv"
) )
type DataNode struct { type DataNode struct {

5
go/topology/node.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"errors" "errors"
"math/rand" "math/rand"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
type NodeId string type NodeId string

5
go/topology/store_replicate.go

@ -2,12 +2,13 @@ package topology
import ( import (
"bytes" "bytes"
"net/http"
"strconv"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"net/http"
"strconv"
) )
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {

38
go/topology/topology.go

@ -1,20 +1,22 @@
package topology package topology
import ( import (
"errors"
"io/ioutil"
"math/rand"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"errors"
"github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft" "github.com/goraft/raft"
"io/ioutil"
"math/rand"
) )
type Topology struct { type Topology struct {
NodeImpl NodeImpl
collectionMap map[string]*Collection
collectionMap *util.ConcurrentReadMap
pulse int64 pulse int64
@ -37,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology" t.nodeType = "Topology"
t.NodeImpl.value = t t.NodeImpl.value = t
t.children = make(map[NodeId]Node) t.children = make(map[NodeId]Node)
t.collectionMap = make(map[string]*Collection)
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse) t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit t.volumeSizeLimit = volumeSizeLimit
@ -89,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections? //maybe an issue if lots of collections?
if collection == "" { if collection == "" {
for _, c := range t.collectionMap {
if list := c.Lookup(vid); list != nil {
for _, c := range t.collectionMap.Items {
if list := c.(*Collection).Lookup(vid); list != nil {
return list return list
} }
} }
} else { } else {
if c, ok := t.collectionMap[collection]; ok {
return c.Lookup(vid)
if c, ok := t.collectionMap.Items[collection]; ok {
return c.(*Collection).Lookup(vid)
} }
} }
return nil return nil
@ -109,7 +111,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return next return next
} }
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0 return vl.GetActiveVolumeCount(option) > 0
} }
@ -124,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
} }
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
} }
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
collection, ok = t.collectionMap[collectionName]
return
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Items[collectionName]
return c.(*Collection), hasCollection
} }
func (t *Topology) DeleteCollection(collectionName string) { func (t *Topology) DeleteCollection(collectionName string) {
delete(t.collectionMap, collectionName)
delete(t.collectionMap.Items, collectionName)
} }
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {

5
go/topology/topology_event_handling.go

@ -1,10 +1,11 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"math/rand" "math/rand"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {

9
go/topology/topology_map.go

@ -1,7 +1,5 @@
package topology package topology
import ()
func (t *Topology) ToMap() interface{} { func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount() m["Max"] = t.GetMaxVolumeCount()
@ -13,10 +11,11 @@ func (t *Topology) ToMap() interface{} {
} }
m["DataCenters"] = dcs m["DataCenters"] = dcs
var layouts []interface{} var layouts []interface{}
for _, c := range t.collectionMap {
for _, layout := range c.storageType2VolumeLayout {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items {
if layout != nil { if layout != nil {
tmp := layout.ToMap()
tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name tmp["collection"] = c.Name
layouts = append(layouts, tmp) layouts = append(layouts, tmp)
} }

21
go/topology/topology_vacuum.go

@ -1,13 +1,14 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"net/url" "net/url"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
) )
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
@ -79,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess return isCommitSuccess
} }
func (t *Topology) Vacuum(garbageThreshold string) int { func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
for _, vl := range c.storageType2VolumeLayout {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(vl, vid, locationlist) {
batchVacuumVolumeCommit(vl, vid, locationlist)
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
} }
} }
} }

5
go/topology/volume_growth.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"fmt" "fmt"
"math/rand" "math/rand"
"sync" "sync"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
/* /*

5
go/topology/volume_growth_test.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
"encoding/json" "encoding/json"
"fmt" "fmt"
"testing" "testing"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
) )
var topologyLayout = ` var topologyLayout = `

5
go/topology/volume_layout.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"errors" "errors"
"math/rand" "math/rand"
"sync" "sync"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
// mapping from volume to its locations, inverted from server to volume // mapping from volume to its locations, inverted from server to volume

2
go/topology/volume_location_list.go

@ -1,7 +1,5 @@
package topology package topology
import ()
type VolumeLocationList struct { type VolumeLocationList struct {
list []*DataNode list []*DataNode
} }

37
go/util/concurrent_read_map.go

@ -0,0 +1,37 @@
package util
import "sync"
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
rmutex sync.RWMutex
mutex sync.Mutex
Items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
return &ConcurrentReadMap{Items: make(map[string]interface{})}
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
if value, ok := m.Items[key]; ok {
return value
}
value = newEntry()
m.Items[key] = value
return value
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rmutex.RLock()
if value, ok := m.Items[key]; ok {
m.rmutex.RUnlock()
return value
} else {
m.rmutex.RUnlock()
return m.initMapEntry(key, newEntry)
}
}

3
go/util/config.go

@ -10,9 +10,10 @@ package util
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"encoding/json" "encoding/json"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
) )
type Config struct { type Config struct {

4
go/util/constants.go

@ -1,7 +1,5 @@
package util package util
import ()
const ( const (
VERSION = "0.64"
VERSION = "0.67"
) )

3
go/util/file_util.go

@ -2,9 +2,10 @@ package util
import ( import (
"bufio" "bufio"
"github.com/chrislusf/weed-fs/go/glog"
"errors" "errors"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
) )
func TestFolderWritable(folder string) (err error) { func TestFolderWritable(folder string) (err error) {

3
go/util/net_timeout.go

@ -1,9 +1,10 @@
package util package util
import ( import (
"github.com/chrislusf/weed-fs/go/stats"
"net" "net"
"time" "time"
"github.com/chrislusf/weed-fs/go/stats"
) )
// Listener wraps a net.Listener, and gives a place to store the timeout // Listener wraps a net.Listener, and gives a place to store the timeout

277
go/weed/benchmark.go

@ -2,9 +2,6 @@ package main
import ( import (
"bufio" "bufio"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -16,6 +13,10 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
) )
type BenchmarkOptions struct { type BenchmarkOptions struct {
@ -30,11 +31,14 @@ type BenchmarkOptions struct {
sequentialRead *bool sequentialRead *bool
collection *string collection *string
cpuprofile *string cpuprofile *string
maxCpu *int
vid2server map[string]string //cache for vid locations vid2server map[string]string //cache for vid locations
} }
var ( var (
b BenchmarkOptions
b BenchmarkOptions
sharedBytes []byte
) )
func init() { func init() {
@ -50,33 +54,35 @@ func init() {
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.vid2server = make(map[string]string) b.vid2server = make(map[string]string)
sharedBytes = make([]byte, 1024)
} }
var cmdBenchmark = &Command{ var cmdBenchmark = &Command{
UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
Short: "benchmark on writing millions of files and read out", Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty weed file system. Long: `benchmark on an empty weed file system.
Two tests during benchmark: Two tests during benchmark:
1) write lots of small files to the system 1) write lots of small files to the system
2) read the files out 2) read the files out
The file content is mostly zero, but no compression is done. The file content is mostly zero, but no compression is done.
You can choose to only benchmark read or write. You can choose to only benchmark read or write.
During write, the list of uploaded file ids is stored in "-list" specified file. During write, the list of uploaded file ids is stored in "-list" specified file.
You can also use your own list of file ids to run read test. You can also use your own list of file ids to run read test.
Write speed and read speed will be collected. Write speed and read speed will be collected.
The numbers are used to get a sense of the system. The numbers are used to get a sense of the system.
Usually your network or the hard drive is the real bottleneck. Usually your network or the hard drive is the real bottleneck.
Another thing to watch is whether the volumes are evenly distributed Another thing to watch is whether the volumes are evenly distributed
to each volume server. Because the 7 more benchmark volumes are randomly distributed to each volume server. Because the 7 more benchmark volumes are randomly distributed
to servers with free slots, it's highly possible some servers have uneven amount of to servers with free slots, it's highly possible some servers have uneven amount of
benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
before starting the benchmark command: before starting the benchmark command:
http://localhost:9333/vol/grow?collection=benchmark&count=5 http://localhost:9333/vol/grow?collection=benchmark&count=5
@ -87,18 +93,17 @@ var cmdBenchmark = &Command{
} }
var ( var (
wait sync.WaitGroup
writeStats *stats
readStats *stats
serverLimitChan map[string]chan bool
wait sync.WaitGroup
writeStats *stats
readStats *stats
) )
func init() {
serverLimitChan = make(map[string]chan bool)
}
func runbenchmark(cmd *Command, args []string) bool { func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*b.maxCpu)
if *b.cpuprofile != "" { if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile) f, err := os.Create(*b.cpuprofile)
if err != nil { if err != nil {
@ -122,12 +127,12 @@ func runbenchmark(cmd *Command, args []string) bool {
func bench_write() { func bench_write() {
fileIdLineChan := make(chan string) fileIdLineChan := make(chan string)
finishChan := make(chan bool) finishChan := make(chan bool)
writeStats = newStats()
writeStats = newStats(*b.concurrency)
idChan := make(chan int) idChan := make(chan int)
wait.Add(*b.concurrency)
go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
for i := 0; i < *b.concurrency; i++ { for i := 0; i < *b.concurrency; i++ {
go writeFiles(idChan, fileIdLineChan, writeStats)
wait.Add(1)
go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
} }
writeStats.start = time.Now() writeStats.start = time.Now()
writeStats.total = *b.numberOfFiles writeStats.total = *b.numberOfFiles
@ -138,28 +143,30 @@ func bench_write() {
close(idChan) close(idChan)
wait.Wait() wait.Wait()
writeStats.end = time.Now() writeStats.end = time.Now()
wait.Add(1)
wait.Add(2)
finishChan <- true finishChan <- true
finishChan <- true finishChan <- true
close(finishChan)
wait.Wait() wait.Wait()
close(finishChan)
writeStats.printStats() writeStats.printStats()
} }
func bench_read() { func bench_read() {
fileIdLineChan := make(chan string) fileIdLineChan := make(chan string)
finishChan := make(chan bool) finishChan := make(chan bool)
readStats = newStats()
wait.Add(*b.concurrency)
readStats = newStats(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan) go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now() readStats.start = time.Now()
readStats.total = *b.numberOfFiles readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan) go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ { for i := 0; i < *b.concurrency; i++ {
go readFiles(fileIdLineChan, readStats)
wait.Add(1)
go readFiles(fileIdLineChan, &readStats.localStats[i])
} }
wait.Wait() wait.Wait()
wait.Add(1)
finishChan <- true finishChan <- true
wait.Wait()
close(finishChan) close(finishChan)
readStats.end = time.Now() readStats.end = time.Now()
readStats.printStats() readStats.printStats()
@ -170,126 +177,102 @@ type delayedFile struct {
fp *operation.FilePart fp *operation.FilePart
} }
func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100) delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup var waitForDeletions sync.WaitGroup
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
waitForDeletions.Add(1)
go func() { go func() {
waitForDeletions.Add(1)
defer waitForDeletions.Done()
for df := range delayedDeleteChan { for df := range delayedDeleteChan {
if df == nil {
break
}
if df.enterTime.After(time.Now()) { if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now())) time.Sleep(df.enterTime.Sub(time.Now()))
} }
fp := df.fp
serverLimitChan[fp.Server] <- true
if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
s.completed++ s.completed++
} else { } else {
s.failed++ s.failed++
} }
<-serverLimitChan[fp.Server]
} }
waitForDeletions.Done()
}() }()
} }
for {
if id, ok := <-idChan; ok {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)
}
serverLimitChan[fp.Server] <- true
if _, err := fp.Upload(0, *b.server); err == nil {
if rand.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else {
fileIdLineChan <- fp.Fid
}
s.completed++
s.transferred += fileSize
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, err := fp.Upload(0, *b.server); err == nil {
if rand.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else { } else {
s.failed++
}
writeStats.addSample(time.Now().Sub(start))
<-serverLimitChan[fp.Server]
if *cmdBenchmark.IsDebug {
fmt.Printf("writing %d file %s\n", id, fp.Fid)
fileIdLineChan <- fp.Fid
} }
s.completed++
s.transferred += fileSize
} else { } else {
s.failed++ s.failed++
println("writing file error:", err.Error())
fmt.Printf("Failed to write with error:%v\n", err)
}
writeStats.addSample(time.Now().Sub(start))
if *cmdBenchmark.IsDebug {
fmt.Printf("writing %d file %s\n", id, fp.Fid)
} }
} else { } else {
break
s.failed++
println("writing file error:", err.Error())
} }
} }
close(delayedDeleteChan) close(delayedDeleteChan)
waitForDeletions.Wait() waitForDeletions.Wait()
wait.Done()
} }
func readFiles(fileIdLineChan chan string, s *stats) {
serverLimitChan := make(map[string]chan bool)
func readFiles(fileIdLineChan chan string, s *stat) {
defer wait.Done()
masterLimitChan := make(chan bool, 1) masterLimitChan := make(chan bool, 1)
for {
if fid, ok := <-fileIdLineChan; ok {
if len(fid) == 0 {
continue
}
if fid[0] == '#' {
continue
}
if *cmdBenchmark.IsDebug {
fmt.Printf("reading file %s\n", fid)
}
parts := strings.SplitN(fid, ",", 2)
vid := parts[0]
start := time.Now()
if server, ok := b.vid2server[vid]; !ok {
masterLimitChan <- true
if _, now_ok := b.vid2server[vid]; !now_ok {
if ret, err := operation.Lookup(*b.server, vid); err == nil {
if len(ret.Locations) > 0 {
server = ret.Locations[0].PublicUrl
b.vid2server[vid] = server
}
for fid := range fileIdLineChan {
if len(fid) == 0 {
continue
}
if fid[0] == '#' {
continue
}
if *cmdBenchmark.IsDebug {
fmt.Printf("reading file %s\n", fid)
}
parts := strings.SplitN(fid, ",", 2)
vid := parts[0]
start := time.Now()
if server, ok := b.vid2server[vid]; !ok {
masterLimitChan <- true
if _, now_ok := b.vid2server[vid]; !now_ok {
if ret, err := operation.Lookup(*b.server, vid); err == nil {
if len(ret.Locations) > 0 {
server = ret.Locations[0].PublicUrl
b.vid2server[vid] = server
} }
} }
<-masterLimitChan
} }
if server, ok := b.vid2server[vid]; ok {
if _, ok := serverLimitChan[server]; !ok {
serverLimitChan[server] = make(chan bool, 7)
}
serverLimitChan[server] <- true
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil {
s.completed++
s.transferred += int64(len(bytesRead))
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
println("!!!! Failed to read from ", url, " !!!!!")
}
<-serverLimitChan[server]
<-masterLimitChan
}
if server, ok := b.vid2server[vid]; ok {
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil {
s.completed++
s.transferred += int64(len(bytesRead))
readStats.addSample(time.Now().Sub(start))
} else { } else {
s.failed++ s.failed++
println("!!!! volume id ", vid, " location not found!!!!!")
fmt.Printf("Failed to read %s error:%v\n", url, err)
} }
} else { } else {
break
s.failed++
println("!!!! volume id ", vid, " location not found!!!!!")
} }
} }
wait.Done()
} }
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
@ -353,20 +336,28 @@ const (
// An efficient statics collecting and rendering // An efficient statics collecting and rendering
type stats struct { type stats struct {
data []int
overflow []int
data []int
overflow []int
localStats []stat
start time.Time
end time.Time
total int
}
type stat struct {
completed int completed int
failed int failed int
total int total int
transferred int64 transferred int64
start time.Time
end time.Time
} }
var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100} var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
func newStats() *stats {
return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
func newStats(n int) *stats {
return &stats{
data: make([]int, benchResolution),
overflow: make([]int, 0),
localStats: make([]stat, n),
}
} }
func (s *stats) addSample(d time.Duration) { func (s *stats) addSample(d time.Duration) {
@ -387,28 +378,41 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
for { for {
select { select {
case <-finishChan: case <-finishChan:
wait.Done()
return return
case t := <-ticker: case t := <-ticker:
completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
for _, localStat := range s.localStats {
completed += localStat.completed
transferred += localStat.transferred
total += localStat.total
}
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n", fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
s.completed, s.total, float64(s.completed)*100/float64(s.total),
float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
completed, total, float64(completed)*100/float64(total),
float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
) )
lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
lastCompleted, lastTransferred, lastTime = completed, transferred, t
} }
} }
} }
func (s *stats) printStats() { func (s *stats) printStats() {
completed, failed, transferred, total := 0, 0, int64(0), s.total
for _, localStat := range s.localStats {
completed += localStat.completed
failed += localStat.failed
transferred += localStat.transferred
total += localStat.total
}
timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
fmt.Printf("Complete requests: %d\n", s.completed)
fmt.Printf("Failed requests: %d\n", s.failed)
fmt.Printf("Total transferred: %d bytes\n", s.transferred)
fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
fmt.Printf("Complete requests: %d\n", completed)
fmt.Printf("Failed requests: %d\n", failed)
fmt.Printf("Total transferred: %d bytes\n", transferred)
fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
n, sum := 0, 0 n, sum := 0, 0
min, max := 10000000, 0 min, max := 10000000, 0
for i := 0; i < len(s.data); i++ { for i := 0; i < len(s.data); i++ {
@ -496,15 +500,32 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
} else { } else {
n = len(p) n = len(p)
} }
for i := 0; i < n-8; i += 8 {
for s := uint(0); s < 8; s++ {
p[i] = byte(l.id >> (s * 8))
if n >= 8 {
for i := 0; i < 8; i++ {
p[i] = byte(l.id >> uint(i*8))
} }
} }
l.size -= int64(n) l.size -= int64(n)
return return
} }
func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
size := int(l.size)
bufferSize := len(sharedBytes)
for size > 0 {
tempBuffer := sharedBytes
if size < bufferSize {
tempBuffer = sharedBytes[0:size]
}
count, e := w.Write(tempBuffer)
if e != nil {
return int64(size), e
}
size -= count
}
return l.size, nil
}
func Readln(r *bufio.Reader) ([]byte, error) { func Readln(r *bufio.Reader) ([]byte, error) {
var ( var (
isPrefix bool = true isPrefix bool = true

2
go/weed/compact.go

@ -12,7 +12,7 @@ func init() {
var cmdCompact = &Command{ var cmdCompact = &Command{
UsageLine: "compact -dir=/tmp -volumeId=234", UsageLine: "compact -dir=/tmp -volumeId=234",
Short: "run weed tool compact on volume file if corrupted",
Short: "run weed tool compact on volume file",
Long: `Force an compaction to remove deleted files from volume files. Long: `Force an compaction to remove deleted files from volume files.
The compacted .dat file is stored as .cpd file. The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file. The compacted .idx file is stored as .cpx file.

5
go/weed/download.go

@ -1,14 +1,15 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
) )
var ( var (

7
go/weed/export.go

@ -3,8 +3,6 @@ package main
import ( import (
"archive/tar" "archive/tar"
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"fmt" "fmt"
"os" "os"
"path" "path"
@ -12,6 +10,9 @@ import (
"strings" "strings"
"text/template" "text/template"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
func init() { func init() {
@ -36,7 +37,7 @@ var cmdExport = &Command{
var ( var (
exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name") exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name")
exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
tarFh *tar.Writer tarFh *tar.Writer

15
go/weed/filer.go

@ -1,13 +1,14 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
) )
var ( var (
@ -20,6 +21,7 @@ type FilerOptions struct {
collection *string collection *string
defaultReplicaPlacement *string defaultReplicaPlacement *string
dir *string dir *string
redirectOnRead *bool
} }
func init() { func init() {
@ -28,7 +30,8 @@ func init() {
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data") f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
} }
var cmdFiler = &Command{ var cmdFiler = &Command{
@ -59,7 +62,9 @@ func runFiler(cmd *Command, args []string) bool {
} }
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection)
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead,
)
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf(nfs_err.Error()) glog.Fatalf(nfs_err.Error())
} }

7
go/weed/fix.go

@ -1,11 +1,12 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"os" "os"
"path" "path"
"strconv" "strconv"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
func init() { func init() {
@ -16,7 +17,7 @@ func init() {
var cmdFix = &Command{ var cmdFix = &Command{
UsageLine: "fix -dir=/tmp -volumeId=234", UsageLine: "fix -dir=/tmp -volumeId=234",
Short: "run weed tool fix on index file if corrupted", Short: "run weed tool fix on index file if corrupted",
Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
Long: `Fix runs the SeeweedFS fix command to re-create the index .idx file.
`, `,
} }

12
go/weed/master.go

@ -1,16 +1,17 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
) )
func init() { func init() {
@ -29,6 +30,7 @@ var cmdMaster = &Command{
var ( var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port") mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces") masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces")
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
mPublicIp = cmdMaster.Flag.String("publicIp", "", "peer accessible <ip>|<server_name>") mPublicIp = cmdMaster.Flag.String("publicIp", "", "peer accessible <ip>|<server_name>")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
@ -61,7 +63,7 @@ func runMaster(cmd *Command, args []string) bool {
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList, *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
) )
listeningAddress := *masterIp + ":" + strconv.Itoa(*mport)
listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)

2
go/weed/mount.go

@ -1,7 +1,5 @@
package main package main
import ()
type MountOptions struct { type MountOptions struct {
filer *string filer *string
dir *string dir *string

7
go/weed/mount_std.go

@ -3,15 +3,16 @@
package main package main
import ( import (
"fmt"
"os"
"runtime"
"bazil.org/fuse" "bazil.org/fuse"
"bazil.org/fuse/fs" "bazil.org/fuse/fs"
"github.com/chrislusf/weed-fs/go/filer" "github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"fmt"
"os"
"runtime"
) )
func runMount(cmd *Command, args []string) bool { func runMount(cmd *Command, args []string) bool {

25
go/weed/server.go

@ -1,10 +1,6 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
@ -13,6 +9,11 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"github.com/gorilla/mux"
) )
type ServerOptions struct { type ServerOptions struct {
@ -31,17 +32,17 @@ func init() {
var cmdServer = &Command{ var cmdServer = &Command{
UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name", UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name",
Short: "start a server, including volume server, and automatically elect a master server", Short: "start a server, including volume server, and automatically elect a master server",
Long: `start both a volume server to provide storage spaces
Long: `start both a volume server to provide storage spaces
and a master server to provide volume=>location mapping service and sequence number of file ids and a master server to provide volume=>location mapping service and sequence number of file ids
This is provided as a convenient way to start both volume server and master server. This is provided as a convenient way to start both volume server and master server.
The servers are exactly the same as starting them separately. The servers are exactly the same as starting them separately.
So other volume servers can use this embedded master server also. So other volume servers can use this embedded master server also.
Optionally, one filer server can be started. Logically, filer servers should not be in a cluster. Optionally, one filer server can be started. Logically, filer servers should not be in a cluster.
They run with meta data on disk, not shared. So each filer server is different. They run with meta data on disk, not shared. So each filer server is different.
`, `,
} }
@ -72,12 +73,14 @@ var (
) )
func init() { func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "write cpu profile to file")
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
@ -149,7 +152,9 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingFiler { if *isStartingFiler {
go func() { go func() {
r := http.NewServeMux() r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection)
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
)
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf(nfs_err.Error()) glog.Fatalf(nfs_err.Error())
} }

3
go/weed/shell.go

@ -2,9 +2,10 @@ package main
import ( import (
"bufio" "bufio"
"github.com/chrislusf/weed-fs/go/glog"
"fmt" "fmt"
"os" "os"
"github.com/chrislusf/weed-fs/go/glog"
) )
func init() { func init() {

2
go/weed/signal_handling_notsupported.go

@ -2,7 +2,5 @@
package main package main
import ()
func OnInterrupt(fn func()) { func OnInterrupt(fn func()) {
} }

3
go/weed/upload.go

@ -1,11 +1,12 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/operation"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"github.com/chrislusf/weed-fs/go/operation"
) )
var ( var (

3
go/weed/version.go

@ -1,9 +1,10 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/util"
"fmt" "fmt"
"runtime" "runtime"
"github.com/chrislusf/weed-fs/go/util"
) )
var cmdVersion = &Command{ var cmdVersion = &Command{

11
go/weed/volume.go

@ -1,15 +1,16 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/util"
"github.com/chrislusf/weed-fs/go/weed/weed_server"
) )
func init() { func init() {
@ -30,7 +31,7 @@ var (
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name") ip = cmdVolume.Flag.String("ip", "", "ip or server name")
publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>") publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
@ -85,7 +86,7 @@ func runVolume(cmd *Command, args []string) bool {
*fixJpgOrientation, *fixJpgOrientation,
) )
listeningAddress := *bindIp + ":" + strconv.Itoa(*vport)
listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)

3
go/weed/volume_test.go

@ -1,10 +1,11 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"net/http" "net/http"
"testing" "testing"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
) )
func TestXYZ(t *testing.T) { func TestXYZ(t *testing.T) {

3
go/weed/weed.go

@ -1,7 +1,6 @@
package main package main
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -13,6 +12,8 @@ import (
"time" "time"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"github.com/chrislusf/weed-fs/go/glog"
) )
var IsDebug *bool var IsDebug *bool

11
go/weed/weed_server/common.go

@ -2,11 +2,6 @@ package weed_server
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
@ -14,6 +9,12 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
) )
var serverStats *stats.ServerStats var serverStats *stats.ServerStats

27
go/weed/weed_server/filer_server.go

@ -1,24 +1,31 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog"
"net/http" "net/http"
"strconv" "strconv"
"github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog"
) )
type FilerServer struct { type FilerServer struct {
port string
master string
collection string
filer filer.Filer
port string
master string
collection string
defaultReplication string
redirectOnRead bool
filer filer.Filer
} }
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) {
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
replication string, redirectOnRead bool,
) (fs *FilerServer, err error) {
fs = &FilerServer{ fs = &FilerServer{
master: master,
collection: collection,
port: ":" + strconv.Itoa(port),
master: master,
collection: collection,
defaultReplication: replication,
redirectOnRead: redirectOnRead,
port: ":" + strconv.Itoa(port),
} }
if fs.filer, err = filer.NewFilerEmbedded(master, dir); err != nil { if fs.filer, err = filer.NewFilerEmbedded(master, dir); err != nil {

22
go/weed/weed_server/filer_server_handlers.go

@ -1,12 +1,8 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/syndtr/goleveldb/leveldb"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -14,6 +10,11 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/util"
"github.com/syndtr/goleveldb/leveldb"
) )
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
@ -80,7 +81,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return return
} }
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl
u, _ := url.Parse("http://" + urlLocation + "/" + fileId)
urlString := "http://" + urlLocation + "/" + fileId
if fs.redirectOnRead {
http.Redirect(w, r, urlString, http.StatusFound)
return
}
u, _ := url.Parse(urlString)
request := &http.Request{ request := &http.Request{
Method: r.Method, Method: r.Method,
URL: u, URL: u,
@ -109,7 +115,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query() query := r.URL.Query()
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
replication := query.Get("replication")
if replication == "" {
replication = fs.defaultReplication
}
assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl"))
if ae != nil { if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error()) glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, ae) writeJsonError(w, r, ae)

3
go/weed/weed_server/filer_server_handlers_admin.go

@ -1,8 +1,9 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"net/http" "net/http"
"github.com/chrislusf/weed-fs/go/glog"
) )
/* /*

9
go/weed/weed_server/master_server.go

@ -1,16 +1,17 @@
package weed_server package weed_server
import ( import (
"net/http"
"net/http/httputil"
"net/url"
"sync"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/topology" "github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft" "github.com/goraft/raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"net/http"
"net/http/httputil"
"net/url"
"sync"
) )
type MasterServer struct { type MasterServer struct {

11
go/weed/weed_server/master_server_handlers.go

@ -1,12 +1,13 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
) )
func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
@ -78,7 +79,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return return
} }
if !ms.Topo.HasWriableVolume(option) {
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 { if ms.Topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, operation.AssignResult{Error: "No free volumes left!"}) writeJsonQuiet(w, r, operation.AssignResult{Error: "No free volumes left!"})
@ -86,7 +87,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
} else { } else {
ms.vgLock.Lock() ms.vgLock.Lock()
defer ms.vgLock.Unlock() defer ms.vgLock.Unlock()
if !ms.Topo.HasWriableVolume(option) {
if !ms.Topo.HasWritableVolume(option) {
if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
writeJsonQuiet(w, r, operation.AssignResult{Error: "Cannot grow volume group! " + err.Error()}) writeJsonQuiet(w, r, operation.AssignResult{Error: "Cannot grow volume group! " + err.Error()})
return return

15
go/weed/weed_server/master_server_handlers_admin.go

@ -1,18 +1,19 @@
package weed_server package weed_server
import ( import (
proto "code.google.com/p/goprotobuf/proto"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
proto "code.google.com/p/goprotobuf/proto"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util"
) )
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@ -143,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
} }
} }
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0 return vl.GetActiveVolumeCount(option) > 0
} }

55
go/weed/weed_server/raft_server.go

@ -2,19 +2,22 @@ package weed_server
import ( import (
"bytes" "bytes"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/topology"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/goraft/raft"
"github.com/gorilla/mux"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os"
"path"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/goraft/raft"
"github.com/gorilla/mux"
) )
type RaftServer struct { type RaftServer struct {
@ -46,6 +49,13 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
transporter.Transport.MaxIdleConnsPerHost = 1024 transporter.Transport.MaxIdleConnsPerHost = 1024
glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr)
// Clear old cluster configurations if peers are set
if len(s.peers) > 0 {
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
if err != nil { if err != nil {
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
@ -53,35 +63,30 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
} }
transporter.Install(s.raftServer, s) transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(1 * time.Second) s.raftServer.SetHeartbeatInterval(1 * time.Second)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond)
s.raftServer.Start() s.raftServer.Start()
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
// Join to leader if specified.
if len(s.peers) > 0 { if len(s.peers) > 0 {
if !s.raftServer.IsLogEmpty() {
glog.V(0).Infoln("Starting cluster with existing logs.")
} else {
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
firstJoinError := s.Join(s.peers)
if firstJoinError != nil {
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: "http://" + s.httpAddr,
})
if err != nil {
glog.V(0).Infoln(err)
return nil
}
// Join to leader if specified.
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
firstJoinError := s.Join(s.peers)
if firstJoinError != nil {
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: "http://" + s.httpAddr,
})
if err != nil {
glog.V(0).Infoln(err)
return nil
} }
} }
// Initialize the server by joining itself.
} else if s.raftServer.IsLogEmpty() { } else if s.raftServer.IsLogEmpty() {
// Initialize the server by joining itself.
glog.V(0).Infoln("Initializing new cluster") glog.V(0).Infoln("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{ _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
@ -95,7 +100,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
} }
} else { } else {
glog.V(0).Infoln("Recovered from log")
glog.V(0).Infoln("Old conf,log,snapshot should have been removed.")
} }
return s return s

7
go/weed/weed_server/raft_server_handlers.go

@ -1,13 +1,14 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"encoding/json" "encoding/json"
"github.com/goraft/raft"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/goraft/raft"
) )
// Handles incoming RAFT joins. // Handles incoming RAFT joins.

5
go/weed/weed_server/volume_server.go

@ -1,12 +1,13 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"math/rand" "math/rand"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
) )
type VolumeServer struct { type VolumeServer struct {

13
go/weed/weed_server/volume_server_handlers.go

@ -1,12 +1,6 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
"io" "io"
"mime" "mime"
"mime/multipart" "mime/multipart"
@ -14,6 +8,13 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/images"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/topology"
) )
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")

5
go/weed/weed_server/volume_server_handlers_admin.go

@ -1,11 +1,12 @@
package weed_server package weed_server
import ( import (
"net/http"
"path/filepath"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/stats" "github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/util" "github.com/chrislusf/weed-fs/go/util"
"net/http"
"path/filepath"
) )
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {

3
go/weed/weed_server/volume_server_handlers_vacuum.go

@ -1,8 +1,9 @@
package weed_server package weed_server
import ( import (
"github.com/chrislusf/weed-fs/go/glog"
"net/http" "net/http"
"github.com/chrislusf/weed-fs/go/glog"
) )
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {

Loading…
Cancel
Save