Browse Source

Merge branch 'feature/auto-replicate' into develop

pull/279/head
tnextday 10 years ago
parent
commit
f121699392
  1. 5
      Makefile
  2. 6
      README.md
  3. 2
      go/filer/client_operations.go
  4. 2
      go/filer/flat_namespace/flat_namespace_store.go
  5. 2
      go/glog/glog.go
  6. 2
      go/operation/assign_file_id.go
  7. 2
      go/operation/chunked_file.go
  8. 4
      go/operation/delete_content.go
  9. 2
      go/operation/list_masters.go
  10. 38
      go/operation/lookup.go
  11. 6
      go/operation/lookup_vid_cache.go
  12. 2
      go/operation/lookup_vid_cache_test.go
  13. 2
      go/operation/submit.go
  14. 3
      go/operation/sync_volume.go
  15. 77
      go/operation/system_message.pb.go
  16. 58
      go/proto/system_message.proto
  17. 73
      go/storage/collection_settings.go
  18. 12
      go/storage/needle_read_write.go
  19. 14
      go/storage/replica_placement.go
  20. 49
      go/storage/store.go
  21. 147
      go/storage/store_task.go
  22. 87
      go/storage/store_task_cli.go
  23. 115
      go/storage/store_task_replication.go
  24. 40
      go/storage/store_task_vacuum.go
  25. 24
      go/storage/volume.go
  26. 13
      go/storage/volume_info.go
  27. 6
      go/storage/volume_info_test.go
  28. 212
      go/storage/volume_pure_reader.go
  29. 22
      go/storage/volume_replicate_test.go
  30. 25
      go/storage/volume_super_block.go
  31. 4
      go/storage/volume_sync.go
  32. 4
      go/storage/volume_ttl.go
  33. 7
      go/storage/volume_vacuum.go
  34. 1
      go/storage/volume_version.go
  35. 3
      go/topology/allocate_volume.go
  36. 43
      go/topology/batch_operation.go
  37. 13
      go/topology/collection.go
  38. 1
      go/topology/data_node.go
  39. 119
      go/topology/node.go
  40. 60
      go/topology/store_replicate.go
  41. 42
      go/topology/topology.go
  42. 9
      go/topology/topology_event_handling.go
  43. 153
      go/topology/topology_replicate.go
  44. 18
      go/topology/topology_vacuum.go
  45. 256
      go/topology/volume_growth.go
  46. 117
      go/topology/volume_growth_test.go
  47. 10
      go/topology/volume_layout.go
  48. 111
      go/topology/volume_location_list.go
  49. 17
      go/util/concurrent_read_map.go
  50. 110
      go/util/http_util.go
  51. 9
      go/weed/backup.go
  52. 7
      go/weed/benchmark.go
  53. 2
      go/weed/compact.go
  54. 4
      go/weed/download.go
  55. 2
      go/weed/shell.go
  56. 2
      go/weed/signal_handling.go
  57. 7
      go/weed/weed_server/common.go
  58. 9
      go/weed/weed_server/filer_server_handlers.go
  59. 7
      go/weed/weed_server/master_server.go
  60. 8
      go/weed/weed_server/master_server_handlers.go
  61. 60
      go/weed/weed_server/master_server_handlers_admin.go
  62. 11
      go/weed/weed_server/volume_server.go
  63. 73
      go/weed/weed_server/volume_server_handlers_admin.go
  64. 2
      go/weed/weed_server/volume_server_handlers_read.go
  65. 58
      go/weed/weed_server/volume_server_handlers_sync.go
  66. 66
      go/weed/weed_server/volume_server_handlers_task.go

5
Makefile

@ -14,7 +14,10 @@ clean:
deps:
go get $(GO_FLAGS) -d $(SOURCE_DIR)
build: deps
fmt:
gofmt -w -s ./go/
build: deps fmt
go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR)
linux: deps

6
README.md

@ -128,6 +128,12 @@ If you want a nicer URL, you can use one of these alternative URL formats:
http://localhost:8080/3,01637037d6
```
If you want get an scale of image, you can add some params:
```
http://localhost:8080/3/01637037d6.jpg?height=200&width=200
```
### Rack-Aware and Data Center-Aware Replication ###
SeaweedFS apply the replication strategy on a volume level. So when you are getting a file id, you can specify the replication strategy. For example:

2
go/filer/client_operations.go

@ -58,7 +58,7 @@ func call(server string, request ApiRequest, ret interface{}) error {
}
values := make(url.Values)
values.Add("request", string(b))
jsonBlob, err := util.Post("http://"+server+"/__api__", values)
jsonBlob, err := util.Post(server, "/__api__", values)
if err != nil {
return err
}

2
go/filer/flat_namespace/flat_namespace_store.go

@ -1,7 +1,5 @@
package flat_namespace
import ()
type FlatNamespaceStore interface {
Put(fullFileName string, fid string) (err error)
Get(fullFileName string) (fid string, err error)

2
go/glog/glog.go

@ -880,7 +880,7 @@ const flushInterval = 30 * time.Second
// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
for _ = range time.NewTicker(flushInterval).C {
for range time.NewTicker(flushInterval).C {
l.lockAndFlushAll()
}
}

2
go/operation/assign_file_id.go

@ -31,7 +31,7 @@ func Assign(server string, count uint64, replication string, collection string,
if ttl != "" {
values.Add("ttl", ttl)
}
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
jsonBlob, err := util.Post(server, "/dir/assign", values)
glog.V(2).Info("assign result :", string(jsonBlob))
if err != nil {
return nil, err

2
go/operation/chunked_file.go

@ -150,7 +150,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
ci := cm.Chunks[chunkIndex]
// if we need read date from local volume server first?
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true)
if lookupError != nil {
return n, lookupError
}

4
go/operation/delete_content.go

@ -22,7 +22,7 @@ type DeleteResult struct {
}
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId)
fileUrl, err := LookupFileId(master, fileId, false)
if err != nil {
return fmt.Errorf("Failed to lookup %s:%v", fileId, err)
}
@ -97,7 +97,7 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
for _, fid := range fidList {
values.Add("fid", fid)
}
jsonBlob, err := util.Post("http://"+server+"/delete", values)
jsonBlob, err := util.Post(server, "/delete", values)
if err != nil {
ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
return

2
go/operation/list_masters.go

@ -14,7 +14,7 @@ type ClusterStatusResult struct {
}
func ListMasters(server string) ([]string, error) {
jsonBlob, err := util.Get("http://" + server + "/cluster/status")
jsonBlob, err := util.Get(server, "/cluster/status", nil)
glog.V(2).Info("list masters result :", string(jsonBlob))
if err != nil {
return nil, err

38
go/operation/lookup.go

@ -16,16 +16,27 @@ type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
}
type Locations []Location
type LookupResult struct {
VolumeId string `json:"volumeId,omitempty"`
Locations []Location `json:"locations,omitempty"`
Error string `json:"error,omitempty"`
VolumeId string `json:"volumeId,omitempty"`
Locations Locations `json:"locations,omitempty"`
Error string `json:"error,omitempty"`
}
func (lr *LookupResult) String() string {
return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error)
}
func (ls Locations) Head() *Location {
return &ls[0]
}
func (ls Locations) PickForRead() *Location {
return &ls[rand.Intn(len(ls))]
}
var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
@ -42,10 +53,17 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
return
}
func LookupNoCache(server string, vid string) (ret *LookupResult, err error) {
if ret, err = do_lookup(server, vid); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
return
}
func do_lookup(server string, vid string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
jsonBlob, err := util.Post(server, "/dir/lookup", values)
if err != nil {
return nil, err
}
@ -60,7 +78,7 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
@ -72,7 +90,13 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
if len(lookup.Locations) == 0 {
return "", errors.New("File Not Found")
}
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
var u string
if readonly {
u = lookup.Locations.PickForRead().Url
} else {
u = lookup.Locations.Head().Url
}
return util.MkUrl(u, "/"+fileId, nil), nil
}
// LookupVolumeIds find volume locations by cache and actual lookup
@ -99,7 +123,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
for _, vid := range unknown_vids {
values.Add("volumeId", vid)
}
jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
jsonBlob, err := util.Post(server, "/vol/lookup", values)
if err != nil {
return nil, err
}

6
go/operation/lookup_vid_cache.go

@ -9,14 +9,14 @@ import (
)
type VidInfo struct {
Locations []Location
Locations Locations
NextRefreshTime time.Time
}
type VidCache struct {
cache []VidInfo
}
func (vc *VidCache) Get(vid string) ([]Location, error) {
func (vc *VidCache) Get(vid string) (Locations, error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
@ -33,7 +33,7 @@ func (vc *VidCache) Get(vid string) ([]Location, error) {
}
return nil, errors.New("Not Found")
}
func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) {
func (vc *VidCache) Set(vid string, locations Locations, duration time.Duration) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)

2
go/operation/lookup_vid_cache_test.go

@ -10,7 +10,7 @@ func TestCaching(t *testing.T) {
var (
vc VidCache
)
var locations []Location
var locations Locations
locations = append(locations, Location{Url: "a.com:8080"})
vc.Set("123", locations, time.Second)
ret, _ := vc.Get("123")

2
go/operation/submit.go

@ -46,7 +46,7 @@ func SubmitFiles(master string, files []FilePart,
}
ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
if err != nil {
for index, _ := range files {
for index := range files {
results[index].Error = err.Error()
}
return results, err

3
go/operation/sync_volume.go

@ -10,7 +10,6 @@ import (
)
type SyncVolumeResponse struct {
Replication string `json:"Replication,omitempty"`
Ttl string `json:"Ttl,omitempty"`
TailOffset uint64 `json:"TailOffset,omitempty"`
CompactRevision uint16 `json:"CompactRevision,omitempty"`
@ -21,7 +20,7 @@ type SyncVolumeResponse struct {
func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) {
values := make(url.Values)
values.Add("volume", vid)
jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values)
jsonBlob, err := util.Post(server, "/admin/sync/status", values)
glog.V(2).Info("sync volume result :", string(jsonBlob))
if err != nil {
return nil, err

77
go/operation/system_message.pb.go

@ -11,6 +11,9 @@ It is generated from these files:
It has these top-level messages:
VolumeInformationMessage
JoinMessage
CollectionSetting
GlobalSetting
JoinResponse
*/
package operation
@ -29,7 +32,7 @@ type VolumeInformationMessage struct {
DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,opt,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
@ -199,5 +202,77 @@ func (m *JoinMessage) GetAdminPort() uint32 {
return 0
}
type CollectionSetting struct {
Collection *string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"`
ReplicaPlacement *string `protobuf:"bytes,2,opt,name=replica_placement" json:"replica_placement,omitempty"`
VacuumGarbageThreshold *float32 `protobuf:"fixed32,3,opt,name=vacuum_garbage_threshold" json:"vacuum_garbage_threshold,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CollectionSetting) Reset() { *m = CollectionSetting{} }
func (m *CollectionSetting) String() string { return proto.CompactTextString(m) }
func (*CollectionSetting) ProtoMessage() {}
func (m *CollectionSetting) GetCollection() string {
if m != nil && m.Collection != nil {
return *m.Collection
}
return ""
}
func (m *CollectionSetting) GetReplicaPlacement() string {
if m != nil && m.ReplicaPlacement != nil {
return *m.ReplicaPlacement
}
return ""
}
func (m *CollectionSetting) GetVacuumGarbageThreshold() float32 {
if m != nil && m.VacuumGarbageThreshold != nil {
return *m.VacuumGarbageThreshold
}
return 0
}
type GlobalSetting struct {
Settings []*CollectionSetting `protobuf:"bytes,1,rep,name=settings" json:"settings,omitempty"`
MasterPeers []string `protobuf:"bytes,2,rep,name=master_peers" json:"master_peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GlobalSetting) Reset() { *m = GlobalSetting{} }
func (m *GlobalSetting) String() string { return proto.CompactTextString(m) }
func (*GlobalSetting) ProtoMessage() {}
func (m *GlobalSetting) GetSettings() []*CollectionSetting {
if m != nil {
return m.Settings
}
return nil
}
func (m *GlobalSetting) GetMasterPeers() []string {
if m != nil {
return m.MasterPeers
}
return nil
}
type JoinResponse struct {
Settings *GlobalSetting `protobuf:"bytes,1,opt,name=settings" json:"settings,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinResponse) Reset() { *m = JoinResponse{} }
func (m *JoinResponse) String() string { return proto.CompactTextString(m) }
func (*JoinResponse) ProtoMessage() {}
func (m *JoinResponse) GetSettings() *GlobalSetting {
if m != nil {
return m.Settings
}
return nil
}
func init() {
}

58
go/proto/system_message.proto

@ -1,27 +1,45 @@
package operation;
message VolumeInformationMessage {
required uint32 id = 1;
required uint64 size = 2;
optional string collection = 3;
required uint64 file_count = 4;
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
required uint32 id = 1;
required uint64 size = 2;
optional string collection = 3;
required uint64 file_count = 4;
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
optional uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
}
message JoinMessage {
optional bool is_init = 1;
required string ip = 2;
required uint32 port = 3;
optional string public_url = 4;
required uint32 max_volume_count = 5;
required uint64 max_file_key = 6;
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
optional uint32 admin_port = 10;
optional bool is_init = 1;
required string ip = 2;
required uint32 port = 3;
optional string public_url = 4;
required uint32 max_volume_count = 5;
required uint64 max_file_key = 6;
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
optional uint32 admin_port = 10;
}
message CollectionSetting {
optional string collection = 1;
optional string replica_placement = 2;
optional float vacuum_garbage_threshold = 3;
}
message GlobalSetting {
repeated CollectionSetting settings = 1;
repeated string master_peers = 2;
}
message JoinResponse {
optional GlobalSetting settings = 1;
}

73
go/storage/collection_settings.go

@ -0,0 +1,73 @@
package storage
type SettingKey int
const (
keyReplicatePlacement SettingKey = iota
keyGarbageThreshold
)
type CollectionSettings struct {
settings map[string]map[SettingKey]interface{}
}
func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings {
rp, e := NewReplicaPlacementFromString(defaultReplicatePlacement)
if e != nil {
rp, _ = NewReplicaPlacementFromString("000")
}
c := &CollectionSettings{
settings: make(map[string]map[SettingKey]interface{}),
}
c.set("", keyReplicatePlacement, rp)
c.set("", keyGarbageThreshold, defaultGarbageThreshold)
return c
}
func (c *CollectionSettings) get(collection string, key SettingKey) interface{} {
if m, ok := c.settings[collection]; ok {
if v, ok := m[key]; ok {
return v
}
}
if m, ok := c.settings[""]; ok {
return m[key]
}
return nil
}
func (c *CollectionSettings) set(collection string, key SettingKey, value interface{}) {
m := c.settings[collection]
if m == nil {
m = make(map[SettingKey]interface{})
c.settings[collection] = m
}
if value == nil {
//mustn't delete default setting
if collection != "" {
delete(m, key)
}
} else {
m[key] = value
}
}
func (c *CollectionSettings) GetGarbageThreshold(collection string) string {
return c.get(collection, keyGarbageThreshold).(string)
}
func (c *CollectionSettings) SetGarbageThreshold(collection string, gt string) {
c.set(collection, keyGarbageThreshold, gt)
}
func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement {
return c.get(collection, keyReplicatePlacement).(*ReplicaPlacement)
}
func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error {
rp, e := NewReplicaPlacementFromString(t)
if e == nil {
c.set(collection, keyReplicatePlacement, rp)
}
return e
}

12
go/storage/needle_read_write.go

@ -16,6 +16,7 @@ const (
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10
FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle
FlagIsChunkManifest = 0x80
LastModifiedBytesLength = 5
TtlBytesLength = 2
@ -238,13 +239,24 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL
}
n.Data = bytes[:n.Size]
n.Checksum = NewCRC(n.Data)
checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize])
if n.Checksum.Value() != checksum {
glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id)
}
case Version2:
bytes := make([]byte, bodyLength)
if _, err = r.ReadAt(bytes, offset); err != nil {
return
}
n.readNeedleDataVersion2(bytes[0:n.Size])
if n.DataSize == 0 {
return
}
n.Checksum = NewCRC(n.Data)
checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize])
if n.Checksum.Value() != checksum {
glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id)
}
default:
err = fmt.Errorf("Unsupported Version! (%d)", version)
}

14
go/storage/replica_placement.go

@ -51,3 +51,17 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
func (rp *ReplicaPlacement) Compare(rp1 *ReplicaPlacement) int {
if rp.SameRackCount == rp1.SameRackCount &&
rp.DiffRackCount == rp1.DiffRackCount &&
rp.DiffDataCenterCount == rp1.DiffDataCenterCount {
return 0
} else if rp.SameRackCount < rp1.SameRackCount ||
rp.DiffRackCount < rp1.DiffRackCount ||
rp.DiffDataCenterCount < rp1.DiffDataCenterCount {
return -1
} else {
return 1
}
}

49
go/storage/store.go

@ -88,6 +88,8 @@ type Store struct {
connected bool
volumeSizeLimit uint64 //read from the master
masterNodes *MasterNodes
needleMapKind NeedleMapType
TaskManager *TaskManager
}
func (s *Store) String() (str string) {
@ -96,7 +98,13 @@ func (s *Store) String() (str string) {
}
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s = &Store{
Port: port,
Ip: ip,
PublicUrl: publicUrl,
TaskManager: NewTaskManager(),
needleMapKind: needleMapKind,
}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
@ -106,11 +114,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
func (s *Store) AddVolume(volumeListString string, collection string, ttlString string) error {
ttl, e := ReadTTL(ttlString)
if e != nil {
return e
@ -122,7 +126,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
e = s.addVolume(VolumeId(id), collection, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -134,7 +138,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
if err := s.addVolume(VolumeId(id), collection, ttl); err != nil {
e = err
}
}
@ -183,14 +187,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v",
location.Directory, vid, collection, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@ -213,9 +217,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
@ -234,7 +238,6 @@ func (s *Store) Status() []*VolumeInfo {
Id: VolumeId(k),
Size: v.ContentSize(),
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
@ -281,7 +284,6 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.readOnly),
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
}
@ -314,7 +316,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
return "", "", err
}
joinUrl := "http://" + masterNode + "/dir/join"
joinUrl := util.MkUrl(masterNode, "/dir/join", nil)
glog.V(4).Infof("Connecting to %s ...", joinUrl)
jsonBlob, err := util.PostBytes(joinUrl, data)
@ -387,3 +389,16 @@ func (s *Store) HasVolume(i VolumeId) bool {
v := s.findVolume(i)
return v != nil
}
type VolumeWalker func(v *Volume) (e error)
func (s *Store) WalkVolume(walker VolumeWalker) error {
for _, location := range s.Locations {
for _, v := range location.volumes {
if e := walker(v); e != nil {
return e
}
}
}
return nil
}

147
go/storage/store_task.go

@ -0,0 +1,147 @@
package storage
import (
"errors"
"net/url"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
)
const (
TaskVacuum = "vacuum"
TaskReplicate = "replicate"
TaskBalance = "balance"
)
var (
ErrTaskNotFinish = errors.New("TaskNotFinish")
ErrTaskNotFound = errors.New("TaskNotFound")
ErrTaskInvalid = errors.New("TaskInvalid")
ErrTaskExists = errors.New("TaskExists")
)
type TaskWorker interface {
Run() error
Commit() error
Clean() error
Info() url.Values
}
type Task struct {
Id string
startTime time.Time
worker TaskWorker
ch chan bool
result error
cleanWhenFinish bool
}
type TaskManager struct {
TaskList map[string]*Task
}
func NewTask(worker TaskWorker, id string) *Task {
t := &Task{
Id: id,
worker: worker,
startTime: time.Now(),
result: ErrTaskNotFinish,
ch: make(chan bool, 1),
}
go func(t *Task) {
t.result = t.worker.Run()
if t.cleanWhenFinish {
glog.V(0).Infof("clean task (%s) when finish.", t.Id)
t.worker.Clean()
}
t.ch <- true
}(t)
return t
}
func (t *Task) QueryResult(waitDuration time.Duration) error {
if t.result == ErrTaskNotFinish && waitDuration > 0 {
select {
case <-time.After(waitDuration):
case <-t.ch:
}
}
return t.result
}
func NewTaskManager() *TaskManager {
return &TaskManager{
TaskList: make(map[string]*Task),
}
}
func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) {
tt := args.Get("task")
vid := args.Get("volume")
tid = tt + "-" + vid
if _, ok := tm.TaskList[tid]; ok {
return tid, ErrTaskExists
}
var tw TaskWorker
switch tt {
case TaskVacuum:
tw, e = NewVacuumTask(s, args)
case TaskReplicate:
tw, e = NewReplicaTask(s, args)
case TaskBalance:
}
if e != nil {
return
}
if tw == nil {
return "", ErrTaskInvalid
}
tm.TaskList[tid] = NewTask(tw, tid)
return tid, nil
}
func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) {
t, ok := tm.TaskList[tid]
if !ok {
return ErrTaskNotFound
}
return t.QueryResult(waitDuration)
}
func (tm *TaskManager) Commit(tid string) (e error) {
t, ok := tm.TaskList[tid]
if !ok {
return ErrTaskNotFound
}
if t.QueryResult(time.Second*30) == ErrTaskNotFinish {
return ErrTaskNotFinish
}
delete(tm.TaskList, tid)
return t.worker.Commit()
}
func (tm *TaskManager) Clean(tid string) error {
t, ok := tm.TaskList[tid]
if !ok {
return ErrTaskNotFound
}
delete(tm.TaskList, tid)
if t.QueryResult(time.Second*30) == ErrTaskNotFinish {
t.cleanWhenFinish = true
glog.V(0).Infof("task (%s) is not finish, clean it later.", tid)
} else {
t.worker.Clean()
}
return nil
}
func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) {
t, ok := tm.TaskList[tid]
if !ok {
return 0, ErrTaskNotFound
}
return time.Since(t.startTime), nil
}

87
go/storage/store_task_cli.go

@ -0,0 +1,87 @@
package storage
import (
"errors"
"fmt"
"net/url"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/util"
)
type TaskParams map[string]string
var (
ErrTaskTimeout = errors.New("TaskTimeout")
)
type TaskCli struct {
TID string
DataNode string
}
func NewTaskCli(dataNode string, taskType string, params TaskParams) (*TaskCli, error) {
args := url.Values{}
args.Set("task", taskType)
for k, v := range params {
args.Set(k, v)
}
m, e := util.RemoteApiCall(dataNode, "/admin/task/new", args)
if e != nil {
return nil, e
}
tid := m["tid"].(string)
if tid == "" {
return nil, fmt.Errorf("Empty %s task", taskType)
}
return &TaskCli{
TID: tid,
DataNode: dataNode,
}, nil
}
func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error {
startTime := time.Now()
args := url.Values{}
args.Set("tid", c.TID)
args.Set("timeout", time.Minute.String())
tryTimes := 0
for time.Since(startTime) < timeout {
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args)
if e == nil {
//task have finished and have no error
return nil
}
if util.IsRemoteApiError(e) {
if e.Error() == ErrTaskNotFinish.Error() {
tryTimes = 0
continue
}
return e
} else {
tryTimes++
if tryTimes >= 10 {
return e
}
glog.V(0).Infof("query task (%s) error %v, wait 1 minute then retry %d times", c.TID, e, tryTimes)
time.Sleep(time.Minute)
}
}
return ErrTaskTimeout
}
func (c *TaskCli) Commit() error {
args := url.Values{}
args.Set("tid", c.TID)
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args)
return e
}
func (c *TaskCli) Clean() error {
args := url.Values{}
args.Set("tid", c.TID)
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args)
return e
}

115
go/storage/store_task_replication.go

@ -0,0 +1,115 @@
package storage
import (
"errors"
"fmt"
"net/url"
"os"
"path"
"github.com/chrislusf/seaweedfs/go/util"
)
type ReplicaTask struct {
VID VolumeId
Collection string
SrcDataNode string
s *Store
location *DiskLocation
}
func NewReplicaTask(s *Store, args url.Values) (*ReplicaTask, error) {
volumeIdString := args.Get("volume")
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
}
source := args.Get("source")
if source == "" {
return nil, errors.New("Invalid source data node.")
}
location := s.findFreeLocation()
if location == nil {
return nil, errors.New("No more free space left")
}
collection := args.Get("collection")
return &ReplicaTask{
VID: vid,
Collection: collection,
SrcDataNode: source,
s: s,
location: location,
}, nil
}
func (t *ReplicaTask) Run() error {
ch := make(chan error)
go func() {
idxUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/index", url.Values{"volume": {t.VID.String()}})
e := util.DownloadToFile(idxUrl, t.FileName()+".repx")
if e != nil {
e = fmt.Errorf("Replicat error: %s, %v", idxUrl, e)
}
ch <- e
}()
go func() {
datUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/vol_data", url.Values{"volume": {t.VID.String()}})
e := util.DownloadToFile(datUrl, t.FileName()+".repd")
if e != nil {
e = fmt.Errorf("Replicat error: %s, %v", datUrl, e)
}
ch <- e
}()
errs := make([]error, 0)
for i := 0; i < 2; i++ {
if e := <-ch; e != nil {
errs = append(errs, e)
}
}
if len(errs) == 0 {
return nil
} else {
return fmt.Errorf("%v", errs)
}
}
func (t *ReplicaTask) Commit() error {
var (
volume *Volume
e error
)
if e = os.Rename(t.FileName()+".repd", t.FileName()+".dat"); e != nil {
return e
}
if e = os.Rename(t.FileName()+".repx", t.FileName()+".idx"); e != nil {
return e
}
volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil)
if e == nil {
t.location.volumes[t.VID] = volume
t.s.SendHeartbeatToMaster()
}
return e
}
func (t *ReplicaTask) Clean() error {
os.Remove(t.FileName() + ".repx")
os.Remove(t.FileName() + ".repd")
return nil
}
func (t *ReplicaTask) Info() url.Values {
//TODO
return url.Values{}
}
func (t *ReplicaTask) FileName() (fileName string) {
if t.Collection == "" {
fileName = path.Join(t.location.Directory, t.VID.String())
} else {
fileName = path.Join(t.location.Directory, t.Collection+"_"+t.VID.String())
}
return
}

40
go/storage/store_task_vacuum.go

@ -0,0 +1,40 @@
package storage
import (
"fmt"
"net/url"
)
type VacuumTask struct {
V *Volume
}
func NewVacuumTask(s *Store, args url.Values) (*VacuumTask, error) {
volumeIdString := args.Get("volumme")
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
}
v := s.findVolume(vid)
if v == nil {
return nil, fmt.Errorf("volume id %d is not found", vid)
}
return &VacuumTask{V: v}, nil
}
func (t *VacuumTask) Run() error {
return t.V.Compact()
}
func (t *VacuumTask) Commit() error {
return t.V.commitCompact()
}
func (t *VacuumTask) Clean() error {
return t.V.cleanCompact()
}
func (t *VacuumTask) Info() url.Values {
//TODO
return url.Values{}
}

24
go/storage/volume.go

@ -28,9 +28,9 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.SuperBlock = SuperBlock{Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind)
return
@ -87,7 +87,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}
if v.ReplicaPlacement == nil {
if v.version == NoneVersion {
e = v.readSuperBlock()
} else {
e = v.maybeWriteSuperBlock()
@ -145,10 +145,6 @@ func (v *Volume) Close() {
_ = v.dataFile.Close()
}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
func (v *Volume) isFileUnchanged(n *Needle) bool {
@ -426,3 +422,17 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
}
return false
}
func (v *Volume) SetReadOnly(isReadOnly bool) error {
if isReadOnly == false {
if fi, e := v.dataFile.Stat(); e != nil {
return e
} else {
if fi.Mode()&0200 == 0 {
return errors.New(v.FileName() + ".dat is READONLY")
}
}
}
v.readOnly = isReadOnly
return nil
}

13
go/storage/volume_info.go

@ -2,14 +2,14 @@ package storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/go/operation"
"sort"
"github.com/chrislusf/seaweedfs/go/operation"
)
type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
Collection string
Version Version
@ -30,18 +30,13 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
ReadOnly: *m.ReadOnly,
Version: Version(*m.Version),
}
rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
if e != nil {
return vi, e
}
vi.ReplicaPlacement = rp
vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
func (vi VolumeInfo) String() string {
return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
return fmt.Sprintf("Id:%d, Size:%d, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
}
/*VolumesInfo sorting*/

6
go/storage/volume_info_test.go

@ -4,13 +4,13 @@ import "testing"
func TestSortVolumeInfos(t *testing.T) {
vis := []*VolumeInfo{
&VolumeInfo{
{
Id: 2,
},
&VolumeInfo{
{
Id: 1,
},
&VolumeInfo{
{
Id: 3,
},
}

212
go/storage/volume_pure_reader.go

@ -0,0 +1,212 @@
package storage
import (
"io"
"os"
"sort"
"sync"
"github.com/chrislusf/seaweedfs/go/util"
)
type DirtyData struct {
Offset int64 `comment:"Dirty data start offset"`
Size uint32 `comment:"Size of the dirty data"`
}
type DirtyDatas []DirtyData
func (s DirtyDatas) Len() int { return len(s) }
func (s DirtyDatas) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s DirtyDatas) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s DirtyDatas) Sort() { sort.Sort(s) }
func (s DirtyDatas) Search(offset int64) int {
return sort.Search(len(s), func(i int) bool {
v := &s[i]
return v.Offset+int64(v.Size) > offset
})
}
type PureReader struct {
Dirtys DirtyDatas
DataFile *os.File
pr *io.PipeReader
pw *io.PipeWriter
mutex sync.Mutex
}
func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) {
m := NewCompactMap()
for i := 0; i+16 <= len(indexFileContent); i += 16 {
bytes := indexFileContent[i : i+16]
key := util.BytesToUint64(bytes[:8])
offset := util.BytesToUint32(bytes[8:12])
size := util.BytesToUint32(bytes[12:16])
k := Key(key)
if offset != 0 && size != 0 {
m.Set(k, offset, size)
} else {
if nv, ok := m.Get(k); ok {
//mark old needle data as dirty data
if int64(nv.Size)-NeedleHeaderSize > 0 {
dirtys = append(dirtys, DirtyData{
Offset: int64(nv.Offset)*8 + NeedleHeaderSize,
Size: nv.Size,
})
}
}
m.Delete(k)
}
}
dirtys.Sort()
return dirtys
}
func (cr *PureReader) Seek(offset int64, whence int) (int64, error) {
oldOff, e := cr.DataFile.Seek(0, 1)
if e != nil {
return 0, e
}
newOff, e := cr.DataFile.Seek(offset, whence)
if e != nil {
return 0, e
}
if oldOff != newOff {
cr.closePipe(true)
}
return newOff, nil
}
func (cr *PureReader) Size() (int64, error) {
fi, e := cr.DataFile.Stat()
if e != nil {
return 0, e
}
return fi.Size(), nil
}
func (cdr *PureReader) WriteTo(w io.Writer) (written int64, err error) {
off, e := cdr.DataFile.Seek(0, 1)
if e != nil {
return 0, nil
}
const ZeroBufSize = 32 * 1024
zeroBuf := make([]byte, ZeroBufSize)
dirtyIndex := cdr.Dirtys.Search(off)
var nextDirty *DirtyData
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
}
for {
if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) {
sz := nextDirty.Offset + int64(nextDirty.Size) - off
for sz > 0 {
mn := int64(ZeroBufSize)
if mn > sz {
mn = sz
}
var n int
if n, e = w.Write(zeroBuf[:mn]); e != nil {
return
}
written += int64(n)
sz -= int64(n)
off += int64(n)
}
dirtyIndex++
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
} else {
nextDirty = nil
}
if _, e = cdr.DataFile.Seek(off, 0); e != nil {
return
}
} else {
var n, sz int64
if nextDirty != nil {
sz = nextDirty.Offset - off
}
if sz <= 0 {
// copy until eof
n, e = io.Copy(w, cdr.DataFile)
written += n
return
}
if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil {
return
}
off += n
written += n
}
}
return
}
func (cr *PureReader) ReadAt(p []byte, off int64) (n int, err error) {
cr.Seek(off, 0)
return cr.Read(p)
}
func (cr *PureReader) Read(p []byte) (int, error) {
return cr.getPipeReader().Read(p)
}
func (cr *PureReader) Close() (e error) {
cr.closePipe(true)
return cr.DataFile.Close()
}
func (cr *PureReader) closePipe(lock bool) (e error) {
if lock {
cr.mutex.Lock()
defer cr.mutex.Unlock()
}
if cr.pr != nil {
if err := cr.pr.Close(); err != nil {
e = err
}
}
cr.pr = nil
if cr.pw != nil {
if err := cr.pw.Close(); err != nil {
e = err
}
}
cr.pw = nil
return e
}
func (cr *PureReader) getPipeReader() io.Reader {
cr.mutex.Lock()
defer cr.mutex.Unlock()
if cr.pr != nil && cr.pw != nil {
return cr.pr
}
cr.closePipe(false)
cr.pr, cr.pw = io.Pipe()
go func(pw *io.PipeWriter) {
_, e := cr.WriteTo(pw)
pw.CloseWithError(e)
}(cr.pw)
return cr.pr
}
func (v *Volume) GetVolumeCleanReader() (cr *PureReader, err error) {
var dirtys DirtyDatas
if indexData, e := v.nm.IndexFileContent(); e != nil {
return nil, err
} else {
dirtys = ScanDirtyData(indexData)
}
dataFile, e := os.Open(v.FileName() + ".dat")
if e != nil {
return nil, e
}
cr = &PureReader{
Dirtys: dirtys,
DataFile: dataFile,
}
return
}

22
go/storage/volume_replicate_test.go

@ -0,0 +1,22 @@
package storage
import "testing"
func TestDirtyDataSearch(t *testing.T) {
testData := DirtyDatas{
{30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50},
}
testOffset := []int64{
0, 150, 480, 1024,
}
testData.Sort()
t.Logf("TestData = %v", testData)
for _, off := range testOffset {
i := testData.Search(off)
if i < testData.Len() {
t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i])
} else {
t.Logf("Search %d return %d ", off, i)
}
}
}

25
go/storage/volume_super_block.go

@ -15,16 +15,15 @@ const (
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc (Deprecated!)
* Byte 2 and byte 3: Time to live. See TTL for definition
* Byte 4 and byte 5: The number of times the volume has been compacted.
* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
CompactRevision uint16
version Version
Ttl *TTL
CompactRevision uint16
}
func (s *SuperBlock) Version() Version {
@ -33,7 +32,7 @@ func (s *SuperBlock) Version() Version {
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
header[1] = 0
s.Ttl.ToBytes(header[2:4])
util.Uint16toBytes(header[4:6], s.CompactRevision)
return header
@ -59,6 +58,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
@ -70,11 +70,18 @@ func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func (v *Volume) writeSuperBlock() (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil {
return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e)
}
return nil
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
superBlock.Ttl = LoadTTLFromBytes(header[2:4])
superBlock.CompactRevision = util.BytesToUint16(header[4:6])
return

4
go/storage/volume_sync.go

@ -169,7 +169,6 @@ func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.CompactRevision = v.SuperBlock.CompactRevision
syncStatus.Ttl = v.SuperBlock.Ttl.String()
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
return syncStatus
}
@ -202,6 +201,9 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
if err != nil {
return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
}
if needleValue.Size != uint32(len(b)) {
return fmt.Errorf("Reading from %s error: size incorrect", volumeDataContentHandlerUrl)
}
offset, err := v.AppendBlob(b)
if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err)

4
go/storage/volume_ttl.go

@ -114,6 +114,10 @@ func toStoredByte(readableUnitByte byte) byte {
return 0
}
func (t *TTL) Equal(t1 *TTL) bool {
return t.count == t1.count && t.unit == t1.unit
}
func (t TTL) Minutes() uint32 {
switch t.unit {
case Empty:

7
go/storage/volume_vacuum.go

@ -30,6 +30,7 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...")
_ = v.dataFile.Close()
var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}
@ -45,6 +46,12 @@ func (v *Volume) commitCompact() error {
return nil
}
func (v *Volume) cleanCompact() error {
os.Remove(v.FileName() + ".cpd")
os.Remove(v.FileName() + ".cpx")
return nil
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
var (
dst, idx *os.File

1
go/storage/volume_version.go

@ -3,6 +3,7 @@ package storage
type Version uint8
const (
NoneVersion = Version(0)
Version1 = Version(1)
Version2 = Version(2)
CurrentVersion = Version2

3
go/topology/allocate_volume.go

@ -18,9 +18,8 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
jsonBlob, err := util.Post(dn.Url(), "/admin/assign_volume", values)
if err != nil {
return err
}

43
go/topology/batch_operation.go

@ -0,0 +1,43 @@
package topology
import (
"net/url"
"strconv"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/util"
)
func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) {
ch := make(chan bool, locationList.Length())
for _, dn := range locationList.list {
go func(url string, path string, values url.Values) {
_, e := util.RemoteApiCall(url, path, values)
if e != nil {
glog.V(0).Infoln("RemoteApiCall:", util.MkUrl(url, path, values), "error =", e)
}
ch <- e == nil
}(dn.Url(), path, values)
}
isSuccess = true
for range locationList.list {
select {
case canVacuum := <-ch:
isSuccess = isSuccess && canVacuum
case <-time.After(30 * time.Minute):
isSuccess = false
break
}
}
return isSuccess
}
func SetVolumeReadonly(locationList *VolumeLocationList, volume string, isReadonly bool) (isSuccess bool) {
forms := url.Values{}
forms.Set("key", "readonly")
forms.Set("value", strconv.FormatBool(isReadonly))
forms.Set("volume", volume)
return BatchOperation(locationList, "/admin/setting", forms)
}

13
go/topology/collection.go

@ -10,11 +10,12 @@ import (
type Collection struct {
Name string
volumeSizeLimit uint64
rp *storage.ReplicaPlacement
storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit, rp: rp}
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
@ -23,18 +24,18 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
keyString := rp.String()
func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout {
keyString := ""
if ttl != nil {
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {

1
go/topology/data_node.go

@ -42,6 +42,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
} else {
dn.volumes[v.Id] = v
}
return
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {

119
go/topology/node.go

@ -3,7 +3,8 @@ package topology
import (
"errors"
"math/rand"
"strings"
"sort"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
@ -18,11 +19,13 @@ type Node interface {
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
UpAdjustPlannedVolumeCountDelta(delta int)
UpAdjustMaxVolumeId(vid storage.VolumeId)
GetVolumeCount() int
GetActiveVolumeCount() int
GetMaxVolumeCount() int
GetPlannedVolumeCount() int
GetMaxVolumeId() storage.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@ -38,68 +41,80 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
id NodeId
volumeCount int
activeVolumeCount int
maxVolumeCount int
parent Node
children map[NodeId]Node
maxVolumeId storage.VolumeId
id NodeId
volumeCount int
activeVolumeCount int
maxVolumeCount int
plannedVolumeCount int
parent Node
children map[NodeId]Node
maxVolumeId storage.VolumeId
//for rack, data center, topology
nodeType string
value interface{}
}
type NodePicker interface {
PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error)
}
var ErrFilterContinue = errors.New("continue")
type FilterNodeFn func(dn Node) error
type PickNodesFn func(nodes []Node, count int) []Node
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
var errs []string
for _, node := range n.children {
if err := filterFirstNodeFn(node); err == nil {
if err := filterNodeFn(node); err == nil {
candidates = append(candidates, node)
} else if err == ErrFilterContinue {
continue
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
}
}
if len(candidates) == 0 {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
if len(candidates) < numberOfNodes {
return nil, errors.New("Not enough data node found!")
// return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
firstNode = candidates[rand.Intn(len(candidates))]
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
return pickFn(candidates, numberOfNodes), nil
}
restNodes = make([]Node, numberOfNodes-1)
candidates = candidates[:0]
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
}
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
}
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
ret := len(restNodes) == 0
for k, node := range candidates {
if k < len(restNodes) {
restNodes[k] = node
if k == len(restNodes)-1 {
ret = true
}
} else {
r := rand.Intn(k + 1)
if r < len(restNodes) {
restNodes[r] = node
}
}
func RandomlyPickNodeFn(nodes []Node, count int) []Node {
if len(nodes) < count {
return nil
}
if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("Not enough data node found!")
for i := range nodes {
j := rand.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
return
return nodes[:count]
}
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) {
return n.PickNodes(numberOfNodes, filterFirstNodeFn, RandomlyPickNodeFn)
}
type nodeList []Node
func (s nodeList) Len() int { return len(s) }
func (s nodeList) Less(i, j int) bool { return s[i].FreeSpace() < s[j].FreeSpace() }
func (s nodeList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func PickLowUsageNodeFn(nodes []Node, count int) []Node {
if len(nodes) < count {
return nil
}
sort.Sort(sort.Reverse(nodeList(nodes)))
return nodes[:count]
}
func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) {
return n.PickNodes(numberOfNodes, filterFirstNodeFn, PickLowUsageNodeFn)
}
func (n *NodeImpl) IsDataNode() bool {
@ -121,7 +136,7 @@ func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int {
return n.maxVolumeCount - n.volumeCount
return n.maxVolumeCount - n.volumeCount - n.plannedVolumeCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
@ -146,7 +161,7 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
// fmt.Println("assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
}
assignedNode, err = node.ReserveOneVolume(r)
@ -176,6 +191,14 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negative
n.plannedVolumeCount += delta
if n.parent != nil {
n.parent.UpAdjustPlannedVolumeCountDelta(delta)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
@ -197,6 +220,10 @@ func (n *NodeImpl) GetMaxVolumeCount() int {
return n.maxVolumeCount
}
func (n *NodeImpl) GetPlannedVolumeCount() int {
return n.plannedVolumeCount
}
func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node

60
go/topology/store_replicate.go

@ -20,31 +20,25 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId)
if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
} else if ret > 0 {
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
} else {
} else if ret <= 0 {
errorStatus = "Failed to write to local disk"
}
if !needToReplicate && ret > 0 {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
_, err := operation.Upload(
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err == nil
}) {
ret = 0
errorStatus = "Failed to write to replicas for volume " + volumeId.String()
}
//send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
_, err := operation.Upload(
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err == nil
}) {
ret = 0
errorStatus = "Failed to write to replicas for volume " + volumeId.String()
}
}
size = ret
return
}
@ -61,25 +55,19 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
glog.V(0).Infoln("delete error:", err)
return
}
needToReplicate := !store.HasVolume(volumeId)
if !needToReplicate && ret > 0 {
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) {
ret = 0
}
//send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) {
ret = 0
}
}
return
}
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
results := make(chan bool)
@ -95,6 +83,14 @@ func distributedOperation(masterNode string, store *storage.Store, volumeId stor
for i := 0; i < length; i++ {
ret = ret && <-results
}
// we shouldn't check ReplicaPlacement because the needle have been written in head volume
// if volume := store.GetVolume(volumeId); volume != nil {
// if length+1 < volume.ReplicaPlacement.GetCopyCount() {
// glog.V(0).Infof("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
// ret = false
// }
// }
return ret
} else {
glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())

42
go/topology/topology.go

@ -28,12 +28,14 @@ type Topology struct {
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
CollectionSettings *storage.CollectionSettings
configuration *Configuration
RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@ -42,6 +44,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.CollectionSettings = cs
t.Sequence = seq
@ -87,7 +90,7 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil
}
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items {
@ -111,23 +114,23 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := t.GetVolumeLayout(option.Collection, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.Ttl).PickForWrite(count, option)
if err != nil || dataNodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes available!")
}
fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(ttl)
}
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
@ -140,11 +143,11 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
@ -167,6 +170,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
@ -174,6 +178,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
@ -187,3 +192,18 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
t.LinkChildNode(dc)
return dc
}
type DataNodeWalker func(dn *DataNode) (e error)
func (t *Topology) WalkDataNode(walker DataNodeWalker) error {
for _, c := range t.Children() {
for _, rack := range c.(*DataCenter).Children() {
for _, dn := range rack.(*Rack).Children() {
if e := walker(dn.(*DataNode)); e != nil {
return e
}
}
}
}
return nil
}

9
go/topology/topology_event_handling.go

@ -21,8 +21,9 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
if t.IsLeader() {
for _ = range c {
for range c {
t.Vacuum(garbageThreshold)
// t.CheckReplicate()
}
}
}(garbageThreshold)
@ -42,7 +43,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@ -56,7 +57,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@ -66,7 +67,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}

153
go/topology/topology_replicate.go

@ -0,0 +1,153 @@
package topology
import (
"container/list"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
)
var (
isReplicateCheckerRunning = false
)
const ReplicateTaskTimeout = time.Hour
type ReplicateTask struct {
Vid storage.VolumeId
Collection string
SrcDN *DataNode
DstDN *DataNode
}
func (t *ReplicateTask) Run(topo *Topology) error {
//is lookup thread safe?
locationList := topo.Lookup(t.Collection, t.Vid)
rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection)
if locationList.CalcReplicaPlacement().Compare(rp) >= 0 {
glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String())
return nil
}
if !SetVolumeReadonly(locationList, t.Vid.String(), true) {
return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid)
}
defer SetVolumeReadonly(locationList, t.Vid.String(), false)
tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplicate, storage.TaskParams{
"volume": t.Vid.String(),
"source": t.SrcDN.Url(),
"collection": t.Collection,
})
if e != nil {
return e
}
if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil {
tc.Clean()
return e
}
e = tc.Commit()
return e
}
func (t *ReplicateTask) WorkingDataNodes() []*DataNode {
return []*DataNode{
t.SrcDN,
t.DstDN,
}
}
func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
glog.V(0).Infoln("checking replicate on collection:", c.Name)
growOption := &VolumeGrowOption{ReplicaPlacement: c.rp}
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
rp1 := locationList.CalcReplicaPlacement()
if rp1.Compare(volumeLayout.rp) >= 0 {
continue
}
if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil {
for _, s := range additionServers {
s.UpAdjustPlannedVolumeCountDelta(1)
rt := &ReplicateTask{
Vid: vid,
Collection: c.Name,
SrcDN: locationList.PickForRead(),
DstDN: s,
}
tasks = append(tasks, rt)
glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url())
}
} else {
glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e)
}
}
}
}
}
return
}
func (topo *Topology) CheckReplicate() {
isReplicateCheckerRunning = true
defer func() {
isReplicateCheckerRunning = false
}()
glog.V(1).Infoln("Start replicate checker on demand")
busyDataNodes := make(map[*DataNode]int)
taskCount := 0
taskQueue := list.New()
for _, t := range planReplicateTasks(topo) {
taskQueue.PushBack(t)
taskCount++
}
taskChan := make(chan *ReplicateTask)
for taskCount > 0 {
TaskQueueLoop:
for e := taskQueue.Front(); e != nil; e = e.Next() {
task := e.Value.(*ReplicateTask)
//only one task will run on the data node
dns := task.WorkingDataNodes()
for _, dn := range dns {
if busyDataNodes[dn] > 0 {
continue TaskQueueLoop
}
}
for _, dn := range dns {
busyDataNodes[dn]++
}
go func(t *ReplicateTask) {
if e := t.Run(topo); e != nil {
glog.V(0).Infof("ReplicateTask run error, vid: %v, dst: %s. %v", t.Vid, t.DstDN.Url(), e)
} else {
glog.V(2).Infof("ReplicateTask finished, vid: %v, dst: %s", t.Vid, t.DstDN.Url())
}
taskChan <- t
}(task)
taskQueue.Remove(e)
}
finishedTask := <-taskChan
for _, dn := range finishedTask.WorkingDataNodes() {
if busyDataNodes[dn] > 0 {
busyDataNodes[dn]--
}
}
taskCount--
finishedTask.DstDN.UpAdjustPlannedVolumeCountDelta(-1)
}
glog.V(0).Infoln("finish replicate check.")
}
func (topo *Topology) StartCheckReplicate() {
if isReplicateCheckerRunning {
return
}
go topo.CheckReplicate()
}

18
go/topology/topology_vacuum.go

@ -26,7 +26,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
}(index, dn.Url(), vid)
}
isCheckSuccess := true
for _ = range locationlist.list {
for range locationlist.list {
select {
case canVacuum := <-ch:
isCheckSuccess = isCheckSuccess && canVacuum
@ -53,7 +53,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
for _ = range locationlist.list {
for range locationlist.list {
select {
case _ = <-ch:
case <-time.After(30 * time.Minute):
@ -87,11 +87,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
for vid, locationList := range volumeLayout.vid2location {
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList) {
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
}
}
}
@ -110,7 +110,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/check", values)
if err != nil {
glog.V(0).Infoln("parameters:", values)
return err, false
@ -127,7 +127,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/compact", values)
if err != nil {
return err
}
@ -143,7 +143,7 @@ func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/commit", values)
if err != nil {
return err
}

256
go/topology/volume_growth.go

@ -76,7 +76,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp
}
func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
servers, e := FindEmptySlotsForOneVolume(topo, option, nil)
if e != nil {
return 0, e
}
@ -85,127 +85,205 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i
return len(servers), err
}
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
Ttl: option.Ttl,
Version: storage.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
} else {
glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %d: %v", vid, err)
}
}
return nil
}
func filterMainDataCenter(option *VolumeGrowOption, node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
rp := option.ReplicaPlacement
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
}
}
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
}
return nil
}
func filterMainRack(option *VolumeGrowOption, node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
rp := option.ReplicaPlacement
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
}
func makeExceptNodeFilter(nodes []Node) FilterNodeFn {
m := make(map[NodeId]bool)
for _, n := range nodes {
m[n.Id()] = true
}
return func(dn Node) error {
if dn.FreeSpace() <= 0 {
return ErrFilterContinue
}
if _, ok := m[dn.Id()]; ok {
return ErrFilterContinue
}
return nil
}
}
// 1. find the main data node
// 1.1 collect all data nodes that have 1 slots
// 2.2 collect all racks that have rp.SameRackCount+1
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
// 2. find rest data nodes
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
func FindEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, existsServers *VolumeLocationList) (additionServers []*DataNode, err error) {
//find main datacenter and other data centers
pickNodesFn := PickLowUsageNodeFn
rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
pickMainAndRestNodes := func(np NodePicker, totalNodeCount int, filterFirstNodeFn FilterNodeFn, existsNodes []Node) (mainNode Node, restNodes []Node, e error) {
if len(existsNodes) > 0 {
mainNode = existsNodes[0]
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
if mainNode == nil {
mainNodes, err := np.PickNodes(1, filterFirstNodeFn, pickNodesFn)
if err != nil {
return nil, nil, err
}
mainNode = mainNodes[0]
existsNodes = append(existsNodes, mainNode)
}
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
glog.V(3).Infoln(mainNode.Id(), "picked main node:", mainNode.Id())
restCount := totalNodeCount - len(existsNodes)
if restCount > 0 {
restNodes, err = np.PickNodes(restCount,
makeExceptNodeFilter(existsNodes), pickNodesFn)
if err != nil {
return nil, nil, err
}
}
return nil
})
return mainNode, restNodes, nil
}
var existsNode []Node
if existsServers != nil {
existsNode = existsServers.DiffDataCenters()
}
mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount+1,
func(node Node) error {
return filterMainDataCenter(option, node)
}, existsNode)
if dc_err != nil {
return nil, dc_err
}
//find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
})
if existsServers != nil {
existsNode = existsServers.DiffRacks(mainDataCenter.(*DataCenter))
} else {
existsNode = nil
}
mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount+1,
func(node Node) error {
return filterMainRack(option, node)
},
existsNode,
)
if rack_err != nil {
return nil, rack_err
}
//find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
return nil
})
//find main server and other servers
if existsServers != nil {
existsNode = existsServers.SameServers(mainRack.(*Rack))
} else {
existsNode = nil
}
mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount+1,
func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
return nil
},
existsNode,
)
if server_err != nil {
return nil, server_err
}
if existsServers != nil && existsServers.ContainsDataNode(mainServer.(*DataNode)) {
} else {
additionServers = append(additionServers, mainServer.(*DataNode))
}
servers = append(servers, mainServer.(*DataNode))
for _, server := range otherServers {
servers = append(servers, server.(*DataNode))
additionServers = append(additionServers, server.(*DataNode))
}
for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
additionServers = append(additionServers, server)
} else {
return servers, e
return additionServers, e
}
}
for _, datacenter := range otherDataCenters {
r := rand.Intn(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
for _, dc := range otherDataCenters {
r := rand.Intn(dc.FreeSpace())
if server, e := dc.ReserveOneVolume(r); e == nil {
additionServers = append(additionServers, server)
} else {
return servers, e
return additionServers, e
}
}
return
}
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
Version: storage.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
} else {
glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %d: %v", vid, err)
}
}
return nil
}

117
go/topology/volume_growth_test.go

@ -5,8 +5,11 @@ import (
"fmt"
"testing"
"strings"
"github.com/chrislusf/seaweedfs/go/sequence"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/syndtr/goleveldb/leveldb/errors"
)
var topologyLayout = `
@ -19,7 +22,7 @@ var topologyLayout = `
{"id":2, "size":12312},
{"id":3, "size":12312}
],
"limit":3
"limit":15
},
"server112":{
"volumes":[
@ -28,6 +31,18 @@ var topologyLayout = `
{"id":6, "size":12312}
],
"limit":10
},
"server113":{
"volumes":[
{"id":7, "size":12312},
{"id":8, "size":12312},
{"id":9, "size":12312}
],
"limit":8
},
"server114":{
"volumes":[],
"limit":8
}
},
"rack2":{
@ -37,11 +52,15 @@ var topologyLayout = `
{"id":5, "size":12312},
{"id":6, "size":12312}
],
"limit":4
"limit":8
},
"server122":{
"volumes":[],
"limit":4
"limit":8
},
"server124":{
"volumes":[],
"limit":8
},
"server123":{
"volumes":[
@ -54,6 +73,16 @@ var topologyLayout = `
}
},
"dc2":{
"rack2":{
"server221":{
"volumes":[],
"limit":8
},
"server222":{
"volumes":[],
"limit":8
}
}
},
"dc3":{
"rack2":{
@ -63,13 +92,25 @@ var topologyLayout = `
{"id":3, "size":12312},
{"id":5, "size":12312}
],
"limit":4
"limit":8
},
"server322":{
"volumes":[],
"limit":7
}
}
}
}
`
var testLocList = [][]string{
{"server111", "server121"},
{"server111", "server112"},
{"server111", "server112", "server113"},
{"server111", "server221", "server321"},
{"server112"},
}
func setup(topologyLayout string) *Topology {
var data interface{}
err := json.Unmarshal([]byte(topologyLayout), &data)
@ -80,6 +121,7 @@ func setup(topologyLayout string) *Topology {
//need to connect all nodes first before server adding volumes
topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
storage.NewCollectionSettings("000", "0.3"),
sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())
@ -115,8 +157,7 @@ func setup(topologyLayout string) *Topology {
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth()
rp, _ := storage.NewReplicaPlacementFromString("002")
rp, _ := storage.NewReplicaPlacementFromString("111")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
@ -124,12 +165,72 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) {
Rack: "",
DataNode: "",
}
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
servers, err := FindEmptySlotsForOneVolume(topo, volumeGrowOption, nil)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()
}
for _, server := range servers {
fmt.Println("assigned node :", server.Id())
fmt.Printf("assigned node: %s, free space: %d\n", server.Id(), server.FreeSpace())
}
}
func getDataNodeFromId(topo *Topology, id string) (foundDn *DataNode) {
nid := NodeId(id)
topo.WalkDataNode(func(dn *DataNode) (e error) {
if dn.Id() == nid {
foundDn = dn
e = errors.New("Found.")
}
return
})
return
}
func setupTestLocationList(topo *Topology) (ret []*VolumeLocationList) {
for _, ll := range testLocList {
vl := &VolumeLocationList{}
for _, nid := range ll {
if n := getDataNodeFromId(topo, nid); n != nil {
vl.list = append(vl.list, n)
}
}
ret = append(ret, vl)
}
return
}
func joinNodeId(dns []*DataNode) string {
ss := []string{}
for _, dn := range dns {
ss = append(ss, string(dn.Id()))
}
return strings.Join(ss, ", ")
}
func TestFindEmptySlotsWithExistsNodes(t *testing.T) {
topo := setup(topologyLayout)
rp, _ := storage.NewReplicaPlacementFromString("112")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
DataCenter: "dc1",
Rack: "",
DataNode: "",
}
testLocationList := setupTestLocationList(topo)
for _, locationList := range testLocationList {
lrp := locationList.CalcReplicaPlacement()
t.Logf("location list: [%s], replica placement = %s\n", joinNodeId(locationList.list), lrp.String())
if lrp.Compare(rp) < 0 {
servers, err := FindEmptySlotsForOneVolume(topo, volumeGrowOption, locationList)
if err != nil {
t.Log("finding empty slots error :", err)
t.Fail()
}
t.Logf("assigned node: %s\n\n", joinNodeId(servers))
}
}
}

10
go/topology/volume_layout.go

@ -25,7 +25,6 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL
rp: rp,
ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
}
}
@ -42,7 +41,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length())
//TODO balancing data when have more replications
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.AddToWritable(v.Id)
} else {
@ -53,7 +53,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
//TODO only delete data node from locations?
vl.removeFromWritable(v.Id)
delete(vl.vid2location, v.Id)
}
@ -73,9 +73,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
!v.ReadOnly
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList {
if location := vl.vid2location[vid]; location != nil {
return location.list
return location
}
return nil
}

111
go/topology/volume_location_list.go

@ -2,6 +2,9 @@ package topology
import (
"fmt"
"github.com/chrislusf/seaweedfs/go/storage"
"math/rand"
)
type VolumeLocationList struct {
@ -21,6 +24,14 @@ func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0]
}
func (dnll *VolumeLocationList) PickForRead() *DataNode {
return dnll.list[rand.Intn(len(dnll.list))]
}
func (dnll *VolumeLocationList) AllDataNode() []*DataNode {
return dnll.list
}
func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
@ -63,3 +74,103 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
dnll.list = l
}
}
// return all data centers, first is main data center
func (dnll *VolumeLocationList) DiffDataCenters() []Node {
m := make(map[*DataCenter]int)
maxCount := 0
var mainDC *DataCenter
for _, dn := range dnll.list {
var dc *DataCenter
if dc = dn.GetDataCenter(); dc == nil {
continue
}
m[dc] = m[dc] + 1
if m[dc] > maxCount {
mainDC = dc
maxCount = m[dc]
}
}
dataCenters := make([]Node, 0, len(m))
if mainDC != nil {
dataCenters = append(dataCenters, mainDC)
}
for dc := range m {
if dc != mainDC {
dataCenters = append(dataCenters, dc)
}
}
return dataCenters
}
// return all racks if data center set nil
func (dnll *VolumeLocationList) DiffRacks(mainDC *DataCenter) []Node {
m := make(map[*Rack]int)
maxCount := 0
var mainRack *Rack
for _, dn := range dnll.list {
if mainDC != nil && dn.GetDataCenter() != mainDC {
continue
}
var rack *Rack
if rack = dn.GetRack(); rack == nil {
continue
}
m[rack] = m[rack] + 1
if m[rack] > maxCount {
mainRack = rack
maxCount = m[rack]
}
}
racks := make([]Node, 0, len(m))
if mainRack != nil {
racks = append(racks, mainRack)
}
for rack := range m {
if rack != mainRack {
racks = append(racks, rack)
}
}
return racks
}
func (dnll *VolumeLocationList) SameServers(mainRack *Rack) (servers []Node) {
for _, dn := range dnll.list {
if mainRack != nil && dn.GetRack() != mainRack {
continue
}
var rack *Rack
if rack = dn.GetRack(); rack == nil {
continue
}
servers = append(servers, dn)
}
return servers
}
func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) {
var dcs, rs, ss []Node
dcs = dnll.DiffDataCenters()
if len(dcs) > 0 {
rs = dnll.DiffRacks(dcs[0].(*DataCenter))
if len(rs) > 0 {
ss = dnll.SameServers(rs[0].(*Rack))
}
}
rp = &storage.ReplicaPlacement{
SameRackCount: len(ss) - 1,
DiffRackCount: len(rs) - 1,
DiffDataCenterCount: len(dcs) - 1,
}
return
}
func (dnll *VolumeLocationList) ContainsDataNode(n *DataNode) bool {
for _, dn := range dnll.list {
if dn == n {
return true
}
}
return false
}

17
go/util/concurrent_read_map.go

@ -7,9 +7,8 @@ import (
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
rmutex sync.RWMutex
mutex sync.Mutex
Items map[string]interface{}
rwmutex sync.RWMutex
Items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
@ -17,8 +16,8 @@ func NewConcurrentReadMap() *ConcurrentReadMap {
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
if value, ok := m.Items[key]; ok {
return value
}
@ -28,11 +27,11 @@ func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rmutex.RLock()
if value, ok := m.Items[key]; ok {
m.rmutex.RUnlock()
m.rwmutex.RLock()
value, ok := m.Items[key]
m.rwmutex.RUnlock()
if ok {
return value
}
m.rmutex.RUnlock()
return m.initMapEntry(key, newEntry)
}

110
go/util/http_util.go

@ -11,7 +11,12 @@ import (
"net/url"
"strings"
"os"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/pierrec/lz4"
"strconv"
)
var (
@ -26,6 +31,18 @@ func init() {
client = &http.Client{Transport: Transport}
}
func MkUrl(host, path string, args url.Values) string {
u := url.URL{
Scheme: "http",
Host: host,
Path: path,
}
if args != nil {
u.RawQuery = args.Encode()
}
return u.String()
}
func PostBytes(url string, body []byte) ([]byte, error) {
r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
if err != nil {
@ -39,20 +56,62 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return b, nil
}
func Post(url string, values url.Values) ([]byte, error) {
func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) {
url := MkUrl(host, path, nil)
glog.V(4).Infoln("Post", url+"?"+values.Encode())
r, err := client.PostForm(url, values)
if err != nil {
return nil, err
return nil, 0, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
return nil, r.StatusCode, err
}
return b, nil
return b, r.StatusCode, nil
}
func Post(host, path string, values url.Values) (content []byte, e error) {
content, _, e = PostEx(host, path, values)
return
}
type RApiError struct {
E string
}
func (e *RApiError) Error() string {
return e.E
}
func Get(url string) ([]byte, error) {
func IsRemoteApiError(e error) bool {
switch e.(type) {
case *RApiError:
return true
}
return false
}
func RemoteApiCall(host, path string, values url.Values) (result map[string]interface{}, e error) {
jsonBlob, code, e := PostEx(host, path, values)
if e != nil {
return nil, e
}
result = make(map[string]interface{})
if e := json.Unmarshal(jsonBlob, &result); e != nil {
return nil, e
}
if err, ok := result["error"]; ok && err.(string) != "" {
return nil, &RApiError{E: err.(string)}
}
if code != http.StatusOK && code != http.StatusAccepted {
return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code)
}
return result, nil
}
func Get(host, path string, values url.Values) ([]byte, error) {
url := MkUrl(host, path, values)
r, err := client.Get(url)
if err != nil {
return nil, err
@ -90,7 +149,7 @@ func Delete(url string, jwt security.EncodedJwt) error {
return nil
}
m := make(map[string]interface{})
if e := json.Unmarshal(body, m); e == nil {
if e := json.Unmarshal(body, &m); e == nil {
if s, ok := m["error"].(string); ok {
return errors.New(s)
}
@ -140,6 +199,10 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
if err != nil {
return "", nil, err
}
if response.StatusCode != http.StatusOK {
response.Body.Close()
return "", nil, fmt.Errorf("%s: %s", fileUrl, response.Status)
}
contentDisposition := response.Header["Content-Disposition"]
if len(contentDisposition) > 0 {
if strings.HasPrefix(contentDisposition[0], "filename=") {
@ -151,6 +214,41 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
return
}
func DownloadToFile(fileUrl, savePath string) (e error) {
response, err := client.Get(fileUrl)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("%s: %s", fileUrl, response.Status)
}
var r io.Reader
content_encoding := strings.ToLower(response.Header.Get("Content-Encoding"))
size := response.ContentLength
if n, e := strconv.ParseInt(response.Header.Get("X-Content-Length"), 10, 64); e == nil {
size = n
}
switch content_encoding {
case "lz4":
r = lz4.NewReader(response.Body)
default:
r = response.Body
}
var f *os.File
if f, e = os.OpenFile(savePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); e != nil {
return
}
if size >= 0 {
_, e = io.CopyN(f, r, size)
} else {
_, e = io.Copy(f, r)
}
f.Close()
return
}
func Do(req *http.Request) (resp *http.Response, err error) {
return client.Do(req)
}

9
go/weed/backup.go

@ -57,7 +57,7 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
volumeServer := lookup.Locations[0].Url
volumeServer := lookup.Locations.Head().Url
stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {
@ -69,13 +69,8 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
return true
}
replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
if err != nil {
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
return true
}
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl)
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, ttl)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true

7
go/weed/benchmark.go

@ -254,15 +254,14 @@ func readFiles(fileIdLineChan chan string, s *stat) {
println("!!!! volume id ", vid, " location not found!!!!!")
continue
}
server := ret.Locations[rand.Intn(len(ret.Locations))].Url
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil {
server := ret.Locations.PickForRead().Url
if bytesRead, err := util.Get(server, "/"+fid, nil); err == nil {
s.completed++
s.transferred += int64(len(bytesRead))
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
fmt.Printf("Failed to read %s error:%v\n", url, err)
fmt.Printf("Failed to read %s/%s error:%v\n", server, fid, err)
}
}
}

2
go/weed/compact.go

@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil)
storage.NeedleMapInMemory, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}

4
go/weed/download.go

@ -51,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool {
}
func downloadToFile(server, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
if lookupError != nil {
return lookupError
}
@ -103,7 +103,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
if lookupError != nil {
return "", nil, lookupError
}

2
go/weed/shell.go

@ -20,8 +20,6 @@ var cmdShell = &Command{
`,
}
var ()
func runShell(command *Command, args []string) bool {
r := bufio.NewReader(os.Stdin)
o := bufio.NewWriter(os.Stdout)

2
go/weed/signal_handling.go

@ -22,7 +22,7 @@ func OnInterrupt(fn func()) {
syscall.SIGTERM,
syscall.SIGQUIT)
go func() {
for _ = range signalChan {
for range signalChan {
fn()
os.Exit(0)
}

7
go/weed/weed_server/common.go

@ -69,7 +69,12 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj
}
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
m := make(map[string]interface{})
m["error"] = err.Error()
if err == nil {
m["error"] = ""
} else {
m["error"] = err.Error()
}
writeJsonQuiet(w, r, httpStatus, m)
}

9
go/weed/weed_server/filer_server_handlers.go

@ -5,7 +5,6 @@ import (
"errors"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"strconv"
@ -91,7 +90,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.WriteHeader(http.StatusNotFound)
return
}
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url
urlLocation := lookup.Locations.PickForRead().Url
urlString := "http://" + urlLocation + "/" + fileId
if fs.redirectOnRead {
http.Redirect(w, r, urlString, http.StatusFound)
@ -130,7 +129,11 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if replication == "" {
replication = fs.defaultReplication
}
assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl"))
collection := query.Get("collection")
if collection == "" {
collection = fs.collection
}
assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, http.StatusInternalServerError, ae)

7
go/weed/weed_server/master_server.go

@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/sequence"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/topology"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/gorilla/mux"
@ -50,9 +51,10 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
cs := storage.NewCollectionSettings(defaultReplicaPlacement, garbageThreshold)
var e error
if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
if ms.Topo, e = topology.NewTopology("topo", confFile, cs,
seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}
ms.vg = topology.NewDefaultVolumeGrowth()
@ -71,6 +73,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/vol/check_replicate", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeCheckReplicateHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))

8
go/weed/weed_server/master_server_handlers.go

@ -23,10 +23,10 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
}
volumeId, err := storage.NewVolumeId(vid)
if err == nil {
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
var ret []operation.Location
for _, dn := range machines {
locationList := ms.Topo.Lookup(collection, volumeId)
if locationList != nil {
var ret operation.Locations
for _, dn := range locationList.AllDataNode() {
ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
}
volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}

60
go/weed/weed_server/master_server_handlers_admin.go

@ -5,11 +5,13 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"strings"
"net/url"
"sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/storage"
@ -25,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
_, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
_, err := util.Get(server.Ip+":"+strconv.Itoa(server.Port), "/admin/delete_collection", url.Values{"collection": r.Form["collection"]})
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@ -82,6 +84,11 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
ms.dirStatusHandler(w, r)
}
func (ms *MasterServer) volumeCheckReplicateHandler(w http.ResponseWriter, r *http.Request) {
ms.Topo.StartCheckReplicate()
writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"status": "running"})
}
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0
option, err := ms.getVolumeGrowOption(r)
@ -122,8 +129,14 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
return
}
machines := ms.Topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
if machines != nil && machines.Length() > 0 {
var url string
if r.URL.RawQuery != "" {
url = util.NormalizeUrl(machines.PickForRead().PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
} else {
url = util.NormalizeUrl(machines.PickForRead().PublicUrl) + r.URL.Path
}
http.Redirect(w, r, url, http.StatusMovedPermanently)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId))
}
@ -157,7 +170,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@ -184,3 +197,40 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
}
return volumeGrowOption, nil
}
func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, volumes, collections []string) (result map[string]interface{}) {
forms := url.Values{}
forms.Set("key", settingKey)
forms.Set("value", settingValue)
if len(volumes) == 0 && len(collections) == 0 {
forms.Set("all", "true")
} else {
forms["volume"] = volumes
forms["collection"] = collections
}
var wg sync.WaitGroup
ms.Topo.WalkDataNode(func(dn *topology.DataNode) (e error) {
wg.Add(1)
go func(server string, values url.Values) {
defer wg.Done()
jsonBlob, e := util.Post(server, "/admin/setting", values)
if e != nil {
result[server] = map[string]interface{}{
"error": e.Error() + " " + string(jsonBlob),
}
}
var ret interface{}
if e := json.Unmarshal(jsonBlob, ret); e == nil {
result[server] = ret
} else {
result[server] = map[string]interface{}{
"error": e.Error() + " " + string(jsonBlob),
}
}
}(dn.Url(), forms)
return nil
})
wg.Wait()
return
}

11
go/weed/weed_server/volume_server.go

@ -20,7 +20,6 @@ type VolumeServer struct {
store *storage.Store
guard *security.Guard
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
ReadRedirect bool
}
@ -38,12 +37,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
}
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind)
vs.guard = security.NewGuard(whiteList, "")
@ -53,10 +51,17 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/setting", vs.guard.WhiteList(vs.setVolumeOptionHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler))
adminMux.HandleFunc("/admin/task/new", vs.guard.WhiteList(vs.newTaskHandler))
adminMux.HandleFunc("/admin/task/query", vs.guard.WhiteList(vs.queryTaskHandler))
adminMux.HandleFunc("/admin/task/commit", vs.guard.WhiteList(vs.commitTaskHandler))
adminMux.HandleFunc("/admin/task/clean", vs.guard.WhiteList(vs.cleanTaskHandler))
adminMux.HandleFunc("/admin/task/all", vs.guard.WhiteList(vs.allTaskHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

73
go/weed/weed_server/volume_server_handlers_admin.go

@ -1,14 +1,23 @@
package weed_server
import (
"errors"
"net/http"
"path/filepath"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/stats"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
)
type VolumeOptError struct {
Volume string `json:"volume"`
Err string `json:"err"`
}
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
@ -17,7 +26,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl"))
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
@ -48,3 +57,65 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
m["DiskStatuses"] = ds
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
errs := []VolumeOptError{}
var (
setter storage.VolumeWalker
)
key := r.FormValue("key")
value := r.FormValue("value")
if key == "readonly" {
isReadOnly, e := strconv.ParseBool(value)
if e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
setter = func(v *storage.Volume) error {
if e := v.SetReadOnly(isReadOnly); e != nil {
errs = append(errs, VolumeOptError{
Volume: v.Id.String(),
Err: e.Error(),
})
}
return nil
}
} else {
writeJsonError(w, r, http.StatusBadRequest, errors.New("Unkonw setting: "+key))
return
}
all, _ := strconv.ParseBool(r.FormValue("all"))
if all {
vs.store.WalkVolume(setter)
} else {
volumesSet := make(map[string]bool)
for _, volume := range r.Form["volume"] {
volumesSet[strings.TrimSpace(volume)] = true
}
collectionsSet := make(map[string]bool)
for _, c := range r.Form["collection"] {
collectionsSet[strings.TrimSpace(c)] = true
}
if len(collectionsSet) > 0 || len(volumesSet) > 0 {
vs.store.WalkVolume(func(v *storage.Volume) (e error) {
if !collectionsSet[v.Collection] && !volumesSet[v.Id.String()] {
return nil
}
setter(v)
return nil
})
}
}
result := make(map[string]interface{})
if len(errs) > 0 {
result["error"] = "set volume replica error."
result["errors"] = errs
}
writeJson(w, r, http.StatusAccepted, result)
}

2
go/weed/weed_server/volume_server_handlers_read.go

@ -46,7 +46,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
} else {
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)

58
go/weed/weed_server/volume_server_handlers_sync.go

@ -2,11 +2,14 @@ package weed_server
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/pierrec/lz4"
)
func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) {
@ -84,3 +87,58 @@ func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (
}
return v, nil
}
func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) {
v, e := vs.getVolume("volume", r)
if v == nil {
http.Error(w, e.Error(), http.StatusBadRequest)
return
}
cr, e := v.GetVolumeCleanReader()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError)
return
}
totalSize, e := cr.Size()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume size: %v", e), http.StatusInternalServerError)
return
}
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id))
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("X-Content-Length", strconv.FormatInt(totalSize, 10))
w.Header().Set("Content-Encoding", "lz4")
lz4w := lz4.NewWriter(w)
if _, e = io.Copy(lz4w, cr); e != nil {
glog.V(4).Infoln("response write error:", e)
}
lz4w.Close()
return
}
ranges, e := parseRange(rangeReq, totalSize)
if e != nil {
http.Error(w, e.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
if len(ranges) != 1 {
http.Error(w, "Only support one range", http.StatusNotImplemented)
return
}
ra := ranges[0]
if _, e := cr.Seek(ra.start, 0); e != nil {
http.Error(w, e.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("X-Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.Header().Set("Content-Encoding", "lz4")
w.WriteHeader(http.StatusPartialContent)
lz4w := lz4.NewWriter(w)
if _, e = io.CopyN(lz4w, cr, ra.length); e != nil {
glog.V(2).Infoln("response write error:", e)
}
lz4w.Close()
}

66
go/weed/weed_server/volume_server_handlers_task.go

@ -0,0 +1,66 @@
package weed_server
import (
"net/http"
"time"
"strings"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
)
func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
tid, e := vs.store.TaskManager.NewTask(vs.store, r.Form)
if e == nil {
writeJsonQuiet(w, r, http.StatusOK, map[string]string{"tid": tid})
} else {
writeJsonError(w, r, http.StatusInternalServerError, e)
}
glog.V(2).Infoln("new store task =", tid, ", error =", e)
}
func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) {
tid := r.FormValue("tid")
timeoutStr := strings.TrimSpace(r.FormValue("timeout"))
d := time.Minute
if td, e := time.ParseDuration(timeoutStr); e == nil {
d = td
}
err := vs.store.TaskManager.QueryResult(tid, d)
if err == storage.ErrTaskNotFinish {
writeJsonError(w, r, http.StatusRequestTimeout, err)
} else if err == nil {
writeJsonError(w, r, http.StatusOK, err)
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("query task =", tid, ", error =", err)
}
func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request) {
tid := r.FormValue("tid")
err := vs.store.TaskManager.Commit(tid)
if err == storage.ErrTaskNotFinish {
writeJsonError(w, r, http.StatusRequestTimeout, err)
} else if err == nil {
writeJsonError(w, r, http.StatusOK, err)
}
glog.V(2).Infoln("query task =", tid, ", error =", err)
}
func (vs *VolumeServer) cleanTaskHandler(w http.ResponseWriter, r *http.Request) {
tid := r.FormValue("tid")
err := vs.store.TaskManager.Clean(tid)
if err == storage.ErrTaskNotFinish {
writeJsonError(w, r, http.StatusRequestTimeout, err)
} else if err == nil {
writeJsonError(w, r, http.StatusOK, err)
}
glog.V(2).Infoln("clean task =", tid, ", error =", err)
}
func (vs *VolumeServer) allTaskHandler(w http.ResponseWriter, r *http.Request) {
//TODO get all task
glog.V(2).Infoln("TODO: get all task")
}
Loading…
Cancel
Save