diff --git a/Makefile b/Makefile index 6719a7bdd..52c2ef8ca 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,10 @@ clean: deps: go get $(GO_FLAGS) -d $(SOURCE_DIR) -build: deps +fmt: + gofmt -w -s ./go/ + +build: deps fmt go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR) linux: deps diff --git a/README.md b/README.md index 7ef2a72b8..f79b3c44d 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,12 @@ If you want a nicer URL, you can use one of these alternative URL formats: http://localhost:8080/3,01637037d6 ``` +If you want get an scale of image, you can add some params: + +``` +http://localhost:8080/3/01637037d6.jpg?height=200&width=200 +``` + ### Rack-Aware and Data Center-Aware Replication ### SeaweedFS apply the replication strategy on a volume level. So when you are getting a file id, you can specify the replication strategy. For example: diff --git a/go/filer/client_operations.go b/go/filer/client_operations.go index 80ac51693..b3ccc633a 100644 --- a/go/filer/client_operations.go +++ b/go/filer/client_operations.go @@ -58,7 +58,7 @@ func call(server string, request ApiRequest, ret interface{}) error { } values := make(url.Values) values.Add("request", string(b)) - jsonBlob, err := util.Post("http://"+server+"/__api__", values) + jsonBlob, err := util.Post(server, "/__api__", values) if err != nil { return err } diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go index 832b70e40..068201adf 100644 --- a/go/filer/flat_namespace/flat_namespace_store.go +++ b/go/filer/flat_namespace/flat_namespace_store.go @@ -1,7 +1,5 @@ package flat_namespace -import () - type FlatNamespaceStore interface { Put(fullFileName string, fid string) (err error) Get(fullFileName string) (fid string, err error) diff --git a/go/glog/glog.go b/go/glog/glog.go index abd5678d4..6f6c96518 100644 --- a/go/glog/glog.go +++ b/go/glog/glog.go @@ -880,7 +880,7 @@ const flushInterval = 30 * time.Second // flushDaemon periodically flushes the log file buffers. func (l *loggingT) flushDaemon() { - for _ = range time.NewTicker(flushInterval).C { + for range time.NewTicker(flushInterval).C { l.lockAndFlushAll() } } diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index fa436b651..45ac5c362 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -31,7 +31,7 @@ func Assign(server string, count uint64, replication string, collection string, if ttl != "" { values.Add("ttl", ttl) } - jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) + jsonBlob, err := util.Post(server, "/dir/assign", values) glog.V(2).Info("assign result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/operation/chunked_file.go b/go/operation/chunked_file.go index 786b8a989..3c9d2ec85 100644 --- a/go/operation/chunked_file.go +++ b/go/operation/chunked_file.go @@ -150,7 +150,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { ci := cm.Chunks[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) + fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true) if lookupError != nil { return n, lookupError } diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index c808cc75a..a8cd46f71 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -22,7 +22,7 @@ type DeleteResult struct { } func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId) + fileUrl, err := LookupFileId(master, fileId, false) if err != nil { return fmt.Errorf("Failed to lookup %s:%v", fileId, err) } @@ -97,7 +97,7 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { for _, fid := range fidList { values.Add("fid", fid) } - jsonBlob, err := util.Post("http://"+server+"/delete", values) + jsonBlob, err := util.Post(server, "/delete", values) if err != nil { ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) return diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go index bda6f3c65..8aa1eae58 100644 --- a/go/operation/list_masters.go +++ b/go/operation/list_masters.go @@ -14,7 +14,7 @@ type ClusterStatusResult struct { } func ListMasters(server string) ([]string, error) { - jsonBlob, err := util.Get("http://" + server + "/cluster/status") + jsonBlob, err := util.Get(server, "/cluster/status", nil) glog.V(2).Info("list masters result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/operation/lookup.go b/go/operation/lookup.go index f77d1ec9b..86a2ff760 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -16,16 +16,27 @@ type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` } + +type Locations []Location + type LookupResult struct { - VolumeId string `json:"volumeId,omitempty"` - Locations []Location `json:"locations,omitempty"` - Error string `json:"error,omitempty"` + VolumeId string `json:"volumeId,omitempty"` + Locations Locations `json:"locations,omitempty"` + Error string `json:"error,omitempty"` } func (lr *LookupResult) String() string { return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error) } +func (ls Locations) Head() *Location { + return &ls[0] +} + +func (ls Locations) PickForRead() *Location { + return &ls[rand.Intn(len(ls))] +} + var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) @@ -42,10 +53,17 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } +func LookupNoCache(server string, vid string) (ret *LookupResult, err error) { + if ret, err = do_lookup(server, vid); err == nil { + vc.Set(vid, ret.Locations, 10*time.Minute) + } + return +} + func do_lookup(server string, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + jsonBlob, err := util.Post(server, "/dir/lookup", values) if err != nil { return nil, err } @@ -60,7 +78,7 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string) (fullUrl string, err error) { +func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) @@ -72,7 +90,13 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { if len(lookup.Locations) == 0 { return "", errors.New("File Not Found") } - return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil + var u string + if readonly { + u = lookup.Locations.PickForRead().Url + } else { + u = lookup.Locations.Head().Url + } + return util.MkUrl(u, "/"+fileId, nil), nil } // LookupVolumeIds find volume locations by cache and actual lookup @@ -99,7 +123,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err for _, vid := range unknown_vids { values.Add("volumeId", vid) } - jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) + jsonBlob, err := util.Post(server, "/vol/lookup", values) if err != nil { return nil, err } diff --git a/go/operation/lookup_vid_cache.go b/go/operation/lookup_vid_cache.go index ac4240102..ecbfbfade 100644 --- a/go/operation/lookup_vid_cache.go +++ b/go/operation/lookup_vid_cache.go @@ -9,14 +9,14 @@ import ( ) type VidInfo struct { - Locations []Location + Locations Locations NextRefreshTime time.Time } type VidCache struct { cache []VidInfo } -func (vc *VidCache) Get(vid string) ([]Location, error) { +func (vc *VidCache) Get(vid string) (Locations, error) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) @@ -33,7 +33,7 @@ func (vc *VidCache) Get(vid string) ([]Location, error) { } return nil, errors.New("Not Found") } -func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) { +func (vc *VidCache) Set(vid string, locations Locations, duration time.Duration) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) diff --git a/go/operation/lookup_vid_cache_test.go b/go/operation/lookup_vid_cache_test.go index 9c9e2affb..e3e24e37e 100644 --- a/go/operation/lookup_vid_cache_test.go +++ b/go/operation/lookup_vid_cache_test.go @@ -10,7 +10,7 @@ func TestCaching(t *testing.T) { var ( vc VidCache ) - var locations []Location + var locations Locations locations = append(locations, Location{Url: "a.com:8080"}) vc.Set("123", locations, time.Second) ret, _ := vc.Get("123") diff --git a/go/operation/submit.go b/go/operation/submit.go index 18484680a..8f5239f16 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -46,7 +46,7 @@ func SubmitFiles(master string, files []FilePart, } ret, err := Assign(master, uint64(len(files)), replication, collection, ttl) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go index 54944a64e..f63d6f96a 100644 --- a/go/operation/sync_volume.go +++ b/go/operation/sync_volume.go @@ -10,7 +10,6 @@ import ( ) type SyncVolumeResponse struct { - Replication string `json:"Replication,omitempty"` Ttl string `json:"Ttl,omitempty"` TailOffset uint64 `json:"TailOffset,omitempty"` CompactRevision uint16 `json:"CompactRevision,omitempty"` @@ -21,7 +20,7 @@ type SyncVolumeResponse struct { func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { values := make(url.Values) values.Add("volume", vid) - jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values) + jsonBlob, err := util.Post(server, "/admin/sync/status", values) glog.V(2).Info("sync volume result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go index 742a1ca4e..2574b2af6 100644 --- a/go/operation/system_message.pb.go +++ b/go/operation/system_message.pb.go @@ -11,6 +11,9 @@ It is generated from these files: It has these top-level messages: VolumeInformationMessage JoinMessage + CollectionSetting + GlobalSetting + JoinResponse */ package operation @@ -29,7 +32,7 @@ type VolumeInformationMessage struct { DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` - ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` + ReplicaPlacement *uint32 `protobuf:"varint,8,opt,name=replica_placement" json:"replica_placement,omitempty"` Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` XXX_unrecognized []byte `json:"-"` @@ -199,5 +202,77 @@ func (m *JoinMessage) GetAdminPort() uint32 { return 0 } +type CollectionSetting struct { + Collection *string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"` + ReplicaPlacement *string `protobuf:"bytes,2,opt,name=replica_placement" json:"replica_placement,omitempty"` + VacuumGarbageThreshold *float32 `protobuf:"fixed32,3,opt,name=vacuum_garbage_threshold" json:"vacuum_garbage_threshold,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CollectionSetting) Reset() { *m = CollectionSetting{} } +func (m *CollectionSetting) String() string { return proto.CompactTextString(m) } +func (*CollectionSetting) ProtoMessage() {} + +func (m *CollectionSetting) GetCollection() string { + if m != nil && m.Collection != nil { + return *m.Collection + } + return "" +} + +func (m *CollectionSetting) GetReplicaPlacement() string { + if m != nil && m.ReplicaPlacement != nil { + return *m.ReplicaPlacement + } + return "" +} + +func (m *CollectionSetting) GetVacuumGarbageThreshold() float32 { + if m != nil && m.VacuumGarbageThreshold != nil { + return *m.VacuumGarbageThreshold + } + return 0 +} + +type GlobalSetting struct { + Settings []*CollectionSetting `protobuf:"bytes,1,rep,name=settings" json:"settings,omitempty"` + MasterPeers []string `protobuf:"bytes,2,rep,name=master_peers" json:"master_peers,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *GlobalSetting) Reset() { *m = GlobalSetting{} } +func (m *GlobalSetting) String() string { return proto.CompactTextString(m) } +func (*GlobalSetting) ProtoMessage() {} + +func (m *GlobalSetting) GetSettings() []*CollectionSetting { + if m != nil { + return m.Settings + } + return nil +} + +func (m *GlobalSetting) GetMasterPeers() []string { + if m != nil { + return m.MasterPeers + } + return nil +} + +type JoinResponse struct { + Settings *GlobalSetting `protobuf:"bytes,1,opt,name=settings" json:"settings,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *JoinResponse) Reset() { *m = JoinResponse{} } +func (m *JoinResponse) String() string { return proto.CompactTextString(m) } +func (*JoinResponse) ProtoMessage() {} + +func (m *JoinResponse) GetSettings() *GlobalSetting { + if m != nil { + return m.Settings + } + return nil +} + func init() { } diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index 548360b27..30dd2a22c 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -1,27 +1,45 @@ package operation; message VolumeInformationMessage { - required uint32 id = 1; - required uint64 size = 2; - optional string collection = 3; - required uint64 file_count = 4; - required uint64 delete_count = 5; - required uint64 deleted_byte_count = 6; - optional bool read_only = 7; - required uint32 replica_placement = 8; - optional uint32 version = 9 [default=2]; - optional uint32 ttl = 10; + required uint32 id = 1; + required uint64 size = 2; + optional string collection = 3; + required uint64 file_count = 4; + required uint64 delete_count = 5; + required uint64 deleted_byte_count = 6; + optional bool read_only = 7; + optional uint32 replica_placement = 8; + optional uint32 version = 9 [default=2]; + optional uint32 ttl = 10; } message JoinMessage { - optional bool is_init = 1; - required string ip = 2; - required uint32 port = 3; - optional string public_url = 4; - required uint32 max_volume_count = 5; - required uint64 max_file_key = 6; - optional string data_center = 7; - optional string rack = 8; - repeated VolumeInformationMessage volumes = 9; - optional uint32 admin_port = 10; + optional bool is_init = 1; + required string ip = 2; + required uint32 port = 3; + optional string public_url = 4; + required uint32 max_volume_count = 5; + required uint64 max_file_key = 6; + optional string data_center = 7; + optional string rack = 8; + repeated VolumeInformationMessage volumes = 9; + optional uint32 admin_port = 10; } + + +message CollectionSetting { + optional string collection = 1; + optional string replica_placement = 2; + optional float vacuum_garbage_threshold = 3; +} + +message GlobalSetting { + repeated CollectionSetting settings = 1; + repeated string master_peers = 2; +} + +message JoinResponse { + optional GlobalSetting settings = 1; +} + + diff --git a/go/storage/collection_settings.go b/go/storage/collection_settings.go new file mode 100644 index 000000000..ec98b5d9b --- /dev/null +++ b/go/storage/collection_settings.go @@ -0,0 +1,73 @@ +package storage + +type SettingKey int + +const ( + keyReplicatePlacement SettingKey = iota + keyGarbageThreshold +) + +type CollectionSettings struct { + settings map[string]map[SettingKey]interface{} +} + +func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings { + rp, e := NewReplicaPlacementFromString(defaultReplicatePlacement) + if e != nil { + rp, _ = NewReplicaPlacementFromString("000") + } + c := &CollectionSettings{ + settings: make(map[string]map[SettingKey]interface{}), + } + c.set("", keyReplicatePlacement, rp) + c.set("", keyGarbageThreshold, defaultGarbageThreshold) + return c +} + +func (c *CollectionSettings) get(collection string, key SettingKey) interface{} { + if m, ok := c.settings[collection]; ok { + if v, ok := m[key]; ok { + return v + } + } + if m, ok := c.settings[""]; ok { + return m[key] + } + return nil +} + +func (c *CollectionSettings) set(collection string, key SettingKey, value interface{}) { + m := c.settings[collection] + if m == nil { + m = make(map[SettingKey]interface{}) + c.settings[collection] = m + } + if value == nil { + //mustn't delete default setting + if collection != "" { + delete(m, key) + } + } else { + m[key] = value + } +} + +func (c *CollectionSettings) GetGarbageThreshold(collection string) string { + return c.get(collection, keyGarbageThreshold).(string) +} + +func (c *CollectionSettings) SetGarbageThreshold(collection string, gt string) { + c.set(collection, keyGarbageThreshold, gt) +} + +func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { + return c.get(collection, keyReplicatePlacement).(*ReplicaPlacement) +} + +func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error { + rp, e := NewReplicaPlacementFromString(t) + if e == nil { + c.set(collection, keyReplicatePlacement, rp) + } + return e +} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index ccfe1d498..073b2899a 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -16,6 +16,7 @@ const ( FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 FlagHasTtl = 0x10 + FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle FlagIsChunkManifest = 0x80 LastModifiedBytesLength = 5 TtlBytesLength = 2 @@ -238,13 +239,24 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL } n.Data = bytes[:n.Size] n.Checksum = NewCRC(n.Data) + checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize]) + if n.Checksum.Value() != checksum { + glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id) + } case Version2: bytes := make([]byte, bodyLength) if _, err = r.ReadAt(bytes, offset); err != nil { return } n.readNeedleDataVersion2(bytes[0:n.Size]) + if n.DataSize == 0 { + return + } n.Checksum = NewCRC(n.Data) + checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize]) + if n.Checksum.Value() != checksum { + glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id) + } default: err = fmt.Errorf("Unsupported Version! (%d)", version) } diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index c1aca52eb..adc93cdcc 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -51,3 +51,17 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } + +func (rp *ReplicaPlacement) Compare(rp1 *ReplicaPlacement) int { + if rp.SameRackCount == rp1.SameRackCount && + rp.DiffRackCount == rp1.DiffRackCount && + rp.DiffDataCenterCount == rp1.DiffDataCenterCount { + return 0 + } else if rp.SameRackCount < rp1.SameRackCount || + rp.DiffRackCount < rp1.DiffRackCount || + rp.DiffDataCenterCount < rp1.DiffDataCenterCount { + return -1 + } else { + return 1 + } +} diff --git a/go/storage/store.go b/go/storage/store.go index ebf01d09f..702db99fa 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -88,6 +88,8 @@ type Store struct { connected bool volumeSizeLimit uint64 //read from the master masterNodes *MasterNodes + needleMapKind NeedleMapType + TaskManager *TaskManager } func (s *Store) String() (str string) { @@ -96,7 +98,13 @@ func (s *Store) String() (str string) { } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s = &Store{ + Port: port, + Ip: ip, + PublicUrl: publicUrl, + TaskManager: NewTaskManager(), + needleMapKind: needleMapKind, + } s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} @@ -106,11 +114,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { - rt, e := NewReplicaPlacementFromString(replicaPlacement) - if e != nil { - return e - } +func (s *Store) AddVolume(volumeListString string, collection string, ttlString string) error { ttl, e := ReadTTL(ttlString) if e != nil { return e @@ -122,7 +126,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) + e = s.addVolume(VolumeId(id), collection, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -134,7 +138,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, ttl); err != nil { e = err } } @@ -183,14 +187,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", - location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v", + location.Directory, vid, collection, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, s.needleMapKind, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -213,9 +217,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String()) } else { glog.V(0).Infof("new volume %s error %s", name, e) } @@ -234,7 +238,6 @@ func (s *Store) Status() []*VolumeInfo { Id: VolumeId(k), Size: v.ContentSize(), Collection: v.Collection, - ReplicaPlacement: v.ReplicaPlacement, Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), @@ -281,7 +284,6 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), Version: proto.Uint32(uint32(v.Version())), Ttl: proto.Uint32(v.Ttl.ToUint32()), } @@ -314,7 +316,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S return "", "", err } - joinUrl := "http://" + masterNode + "/dir/join" + joinUrl := util.MkUrl(masterNode, "/dir/join", nil) glog.V(4).Infof("Connecting to %s ...", joinUrl) jsonBlob, err := util.PostBytes(joinUrl, data) @@ -387,3 +389,16 @@ func (s *Store) HasVolume(i VolumeId) bool { v := s.findVolume(i) return v != nil } + +type VolumeWalker func(v *Volume) (e error) + +func (s *Store) WalkVolume(walker VolumeWalker) error { + for _, location := range s.Locations { + for _, v := range location.volumes { + if e := walker(v); e != nil { + return e + } + } + } + return nil +} diff --git a/go/storage/store_task.go b/go/storage/store_task.go new file mode 100644 index 000000000..25e6f1fb4 --- /dev/null +++ b/go/storage/store_task.go @@ -0,0 +1,147 @@ +package storage + +import ( + "errors" + "net/url" + "time" + + "github.com/chrislusf/seaweedfs/go/glog" +) + +const ( + TaskVacuum = "vacuum" + TaskReplicate = "replicate" + TaskBalance = "balance" +) + +var ( + ErrTaskNotFinish = errors.New("TaskNotFinish") + ErrTaskNotFound = errors.New("TaskNotFound") + ErrTaskInvalid = errors.New("TaskInvalid") + ErrTaskExists = errors.New("TaskExists") +) + +type TaskWorker interface { + Run() error + Commit() error + Clean() error + Info() url.Values +} + +type Task struct { + Id string + startTime time.Time + worker TaskWorker + ch chan bool + result error + cleanWhenFinish bool +} + +type TaskManager struct { + TaskList map[string]*Task +} + +func NewTask(worker TaskWorker, id string) *Task { + t := &Task{ + Id: id, + worker: worker, + startTime: time.Now(), + result: ErrTaskNotFinish, + ch: make(chan bool, 1), + } + go func(t *Task) { + t.result = t.worker.Run() + if t.cleanWhenFinish { + glog.V(0).Infof("clean task (%s) when finish.", t.Id) + t.worker.Clean() + } + t.ch <- true + + }(t) + return t +} + +func (t *Task) QueryResult(waitDuration time.Duration) error { + if t.result == ErrTaskNotFinish && waitDuration > 0 { + select { + case <-time.After(waitDuration): + case <-t.ch: + } + } + return t.result +} + +func NewTaskManager() *TaskManager { + return &TaskManager{ + TaskList: make(map[string]*Task), + } +} + +func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) { + tt := args.Get("task") + vid := args.Get("volume") + tid = tt + "-" + vid + if _, ok := tm.TaskList[tid]; ok { + return tid, ErrTaskExists + } + + var tw TaskWorker + switch tt { + case TaskVacuum: + tw, e = NewVacuumTask(s, args) + case TaskReplicate: + tw, e = NewReplicaTask(s, args) + case TaskBalance: + } + if e != nil { + return + } + if tw == nil { + return "", ErrTaskInvalid + } + tm.TaskList[tid] = NewTask(tw, tid) + return tid, nil +} + +func (tm *TaskManager) QueryResult(tid string, waitDuration time.Duration) (e error) { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + return t.QueryResult(waitDuration) +} + +func (tm *TaskManager) Commit(tid string) (e error) { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + return ErrTaskNotFinish + } + delete(tm.TaskList, tid) + return t.worker.Commit() +} + +func (tm *TaskManager) Clean(tid string) error { + t, ok := tm.TaskList[tid] + if !ok { + return ErrTaskNotFound + } + delete(tm.TaskList, tid) + if t.QueryResult(time.Second*30) == ErrTaskNotFinish { + t.cleanWhenFinish = true + glog.V(0).Infof("task (%s) is not finish, clean it later.", tid) + } else { + t.worker.Clean() + } + return nil +} + +func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { + t, ok := tm.TaskList[tid] + if !ok { + return 0, ErrTaskNotFound + } + return time.Since(t.startTime), nil +} diff --git a/go/storage/store_task_cli.go b/go/storage/store_task_cli.go new file mode 100644 index 000000000..90d22ce83 --- /dev/null +++ b/go/storage/store_task_cli.go @@ -0,0 +1,87 @@ +package storage + +import ( + "errors" + "fmt" + "net/url" + "time" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/util" +) + +type TaskParams map[string]string + +var ( + ErrTaskTimeout = errors.New("TaskTimeout") +) + +type TaskCli struct { + TID string + DataNode string +} + +func NewTaskCli(dataNode string, taskType string, params TaskParams) (*TaskCli, error) { + args := url.Values{} + args.Set("task", taskType) + for k, v := range params { + args.Set(k, v) + } + m, e := util.RemoteApiCall(dataNode, "/admin/task/new", args) + if e != nil { + return nil, e + } + tid := m["tid"].(string) + if tid == "" { + return nil, fmt.Errorf("Empty %s task", taskType) + } + return &TaskCli{ + TID: tid, + DataNode: dataNode, + }, nil +} + +func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { + startTime := time.Now() + args := url.Values{} + args.Set("tid", c.TID) + args.Set("timeout", time.Minute.String()) + tryTimes := 0 + for time.Since(startTime) < timeout { + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args) + if e == nil { + //task have finished and have no error + return nil + } + if util.IsRemoteApiError(e) { + if e.Error() == ErrTaskNotFinish.Error() { + tryTimes = 0 + continue + } + return e + } else { + tryTimes++ + if tryTimes >= 10 { + return e + } + glog.V(0).Infof("query task (%s) error %v, wait 1 minute then retry %d times", c.TID, e, tryTimes) + time.Sleep(time.Minute) + } + + } + return ErrTaskTimeout +} + +func (c *TaskCli) Commit() error { + args := url.Values{} + args.Set("tid", c.TID) + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args) + return e +} + +func (c *TaskCli) Clean() error { + args := url.Values{} + args.Set("tid", c.TID) + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args) + return e +} diff --git a/go/storage/store_task_replication.go b/go/storage/store_task_replication.go new file mode 100644 index 000000000..0931c831e --- /dev/null +++ b/go/storage/store_task_replication.go @@ -0,0 +1,115 @@ +package storage + +import ( + "errors" + "fmt" + "net/url" + "os" + "path" + + "github.com/chrislusf/seaweedfs/go/util" +) + +type ReplicaTask struct { + VID VolumeId + Collection string + SrcDataNode string + s *Store + location *DiskLocation +} + +func NewReplicaTask(s *Store, args url.Values) (*ReplicaTask, error) { + volumeIdString := args.Get("volume") + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + source := args.Get("source") + if source == "" { + return nil, errors.New("Invalid source data node.") + + } + location := s.findFreeLocation() + if location == nil { + return nil, errors.New("No more free space left") + } + collection := args.Get("collection") + return &ReplicaTask{ + VID: vid, + Collection: collection, + SrcDataNode: source, + s: s, + location: location, + }, nil +} + +func (t *ReplicaTask) Run() error { + ch := make(chan error) + go func() { + idxUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/index", url.Values{"volume": {t.VID.String()}}) + e := util.DownloadToFile(idxUrl, t.FileName()+".repx") + if e != nil { + e = fmt.Errorf("Replicat error: %s, %v", idxUrl, e) + } + ch <- e + }() + go func() { + datUrl := util.MkUrl(t.SrcDataNode, "/admin/sync/vol_data", url.Values{"volume": {t.VID.String()}}) + e := util.DownloadToFile(datUrl, t.FileName()+".repd") + if e != nil { + e = fmt.Errorf("Replicat error: %s, %v", datUrl, e) + } + ch <- e + }() + errs := make([]error, 0) + for i := 0; i < 2; i++ { + if e := <-ch; e != nil { + errs = append(errs, e) + } + } + if len(errs) == 0 { + return nil + } else { + return fmt.Errorf("%v", errs) + } +} + +func (t *ReplicaTask) Commit() error { + var ( + volume *Volume + e error + ) + + if e = os.Rename(t.FileName()+".repd", t.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(t.FileName()+".repx", t.FileName()+".idx"); e != nil { + return e + } + volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) + if e == nil { + t.location.volumes[t.VID] = volume + t.s.SendHeartbeatToMaster() + } + return e +} + +func (t *ReplicaTask) Clean() error { + os.Remove(t.FileName() + ".repx") + os.Remove(t.FileName() + ".repd") + return nil +} + +func (t *ReplicaTask) Info() url.Values { + //TODO + return url.Values{} +} + +func (t *ReplicaTask) FileName() (fileName string) { + if t.Collection == "" { + fileName = path.Join(t.location.Directory, t.VID.String()) + } else { + fileName = path.Join(t.location.Directory, t.Collection+"_"+t.VID.String()) + } + return +} diff --git a/go/storage/store_task_vacuum.go b/go/storage/store_task_vacuum.go new file mode 100644 index 000000000..1da789960 --- /dev/null +++ b/go/storage/store_task_vacuum.go @@ -0,0 +1,40 @@ +package storage + +import ( + "fmt" + "net/url" +) + +type VacuumTask struct { + V *Volume +} + +func NewVacuumTask(s *Store, args url.Values) (*VacuumTask, error) { + volumeIdString := args.Get("volumme") + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return nil, fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + } + v := s.findVolume(vid) + if v == nil { + return nil, fmt.Errorf("volume id %d is not found", vid) + } + return &VacuumTask{V: v}, nil +} + +func (t *VacuumTask) Run() error { + return t.V.Compact() +} + +func (t *VacuumTask) Commit() error { + return t.V.commitCompact() +} + +func (t *VacuumTask) Clean() error { + return t.V.cleanCompact() +} + +func (t *VacuumTask) Info() url.Values { + //TODO + return url.Values{} +} diff --git a/go/storage/volume.go b/go/storage/volume.go index 5c6b12e9b..b7b492b9d 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -28,9 +28,9 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v.SuperBlock = SuperBlock{Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind) return @@ -87,7 +87,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } - if v.ReplicaPlacement == nil { + if v.version == NoneVersion { e = v.readSuperBlock() } else { e = v.maybeWriteSuperBlock() @@ -145,10 +145,6 @@ func (v *Volume) Close() { _ = v.dataFile.Close() } -func (v *Volume) NeedToReplicate() bool { - return v.ReplicaPlacement.GetCopyCount() > 1 -} - // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. func (v *Volume) isFileUnchanged(n *Needle) bool { @@ -426,3 +422,17 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { } return false } + +func (v *Volume) SetReadOnly(isReadOnly bool) error { + if isReadOnly == false { + if fi, e := v.dataFile.Stat(); e != nil { + return e + } else { + if fi.Mode()&0200 == 0 { + return errors.New(v.FileName() + ".dat is READONLY") + } + } + } + v.readOnly = isReadOnly + return nil +} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index a2f139c89..659faf213 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -2,14 +2,14 @@ package storage import ( "fmt" - "github.com/chrislusf/seaweedfs/go/operation" "sort" + + "github.com/chrislusf/seaweedfs/go/operation" ) type VolumeInfo struct { Id VolumeId Size uint64 - ReplicaPlacement *ReplicaPlacement Ttl *TTL Collection string Version Version @@ -30,18 +30,13 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er ReadOnly: *m.ReadOnly, Version: Version(*m.Version), } - rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) - if e != nil { - return vi, e - } - vi.ReplicaPlacement = rp vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } func (vi VolumeInfo) String() string { - return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", - vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) + return fmt.Sprintf("Id:%d, Size:%d, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", + vi.Id, vi.Size, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) } /*VolumesInfo sorting*/ diff --git a/go/storage/volume_info_test.go b/go/storage/volume_info_test.go index 9a9c43ad2..c90ca2336 100644 --- a/go/storage/volume_info_test.go +++ b/go/storage/volume_info_test.go @@ -4,13 +4,13 @@ import "testing" func TestSortVolumeInfos(t *testing.T) { vis := []*VolumeInfo{ - &VolumeInfo{ + { Id: 2, }, - &VolumeInfo{ + { Id: 1, }, - &VolumeInfo{ + { Id: 3, }, } diff --git a/go/storage/volume_pure_reader.go b/go/storage/volume_pure_reader.go new file mode 100644 index 000000000..a03c88327 --- /dev/null +++ b/go/storage/volume_pure_reader.go @@ -0,0 +1,212 @@ +package storage + +import ( + "io" + "os" + "sort" + "sync" + + "github.com/chrislusf/seaweedfs/go/util" +) + +type DirtyData struct { + Offset int64 `comment:"Dirty data start offset"` + Size uint32 `comment:"Size of the dirty data"` +} + +type DirtyDatas []DirtyData + +func (s DirtyDatas) Len() int { return len(s) } +func (s DirtyDatas) Less(i, j int) bool { return s[i].Offset < s[j].Offset } +func (s DirtyDatas) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s DirtyDatas) Sort() { sort.Sort(s) } +func (s DirtyDatas) Search(offset int64) int { + return sort.Search(len(s), func(i int) bool { + v := &s[i] + return v.Offset+int64(v.Size) > offset + }) +} + +type PureReader struct { + Dirtys DirtyDatas + DataFile *os.File + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex +} + +func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { + m := NewCompactMap() + for i := 0; i+16 <= len(indexFileContent); i += 16 { + bytes := indexFileContent[i : i+16] + key := util.BytesToUint64(bytes[:8]) + offset := util.BytesToUint32(bytes[8:12]) + size := util.BytesToUint32(bytes[12:16]) + k := Key(key) + if offset != 0 && size != 0 { + m.Set(k, offset, size) + } else { + if nv, ok := m.Get(k); ok { + //mark old needle data as dirty data + if int64(nv.Size)-NeedleHeaderSize > 0 { + dirtys = append(dirtys, DirtyData{ + Offset: int64(nv.Offset)*8 + NeedleHeaderSize, + Size: nv.Size, + }) + } + } + m.Delete(k) + } + } + dirtys.Sort() + return dirtys +} + +func (cr *PureReader) Seek(offset int64, whence int) (int64, error) { + oldOff, e := cr.DataFile.Seek(0, 1) + if e != nil { + return 0, e + } + newOff, e := cr.DataFile.Seek(offset, whence) + if e != nil { + return 0, e + } + if oldOff != newOff { + cr.closePipe(true) + } + return newOff, nil +} + +func (cr *PureReader) Size() (int64, error) { + fi, e := cr.DataFile.Stat() + if e != nil { + return 0, e + } + return fi.Size(), nil +} + +func (cdr *PureReader) WriteTo(w io.Writer) (written int64, err error) { + off, e := cdr.DataFile.Seek(0, 1) + if e != nil { + return 0, nil + } + const ZeroBufSize = 32 * 1024 + zeroBuf := make([]byte, ZeroBufSize) + dirtyIndex := cdr.Dirtys.Search(off) + var nextDirty *DirtyData + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] + } + for { + if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) { + sz := nextDirty.Offset + int64(nextDirty.Size) - off + for sz > 0 { + mn := int64(ZeroBufSize) + if mn > sz { + mn = sz + } + var n int + if n, e = w.Write(zeroBuf[:mn]); e != nil { + return + } + written += int64(n) + sz -= int64(n) + off += int64(n) + } + dirtyIndex++ + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] + } else { + nextDirty = nil + } + if _, e = cdr.DataFile.Seek(off, 0); e != nil { + return + } + } else { + var n, sz int64 + if nextDirty != nil { + sz = nextDirty.Offset - off + } + if sz <= 0 { + // copy until eof + n, e = io.Copy(w, cdr.DataFile) + written += n + return + } + if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil { + return + } + off += n + written += n + } + } + return +} + +func (cr *PureReader) ReadAt(p []byte, off int64) (n int, err error) { + cr.Seek(off, 0) + return cr.Read(p) +} + +func (cr *PureReader) Read(p []byte) (int, error) { + return cr.getPipeReader().Read(p) +} + +func (cr *PureReader) Close() (e error) { + cr.closePipe(true) + return cr.DataFile.Close() +} + +func (cr *PureReader) closePipe(lock bool) (e error) { + if lock { + cr.mutex.Lock() + defer cr.mutex.Unlock() + } + if cr.pr != nil { + if err := cr.pr.Close(); err != nil { + e = err + } + } + cr.pr = nil + if cr.pw != nil { + if err := cr.pw.Close(); err != nil { + e = err + } + } + cr.pw = nil + return e +} + +func (cr *PureReader) getPipeReader() io.Reader { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if cr.pr != nil && cr.pw != nil { + return cr.pr + } + cr.closePipe(false) + cr.pr, cr.pw = io.Pipe() + go func(pw *io.PipeWriter) { + _, e := cr.WriteTo(pw) + pw.CloseWithError(e) + }(cr.pw) + return cr.pr +} + +func (v *Volume) GetVolumeCleanReader() (cr *PureReader, err error) { + var dirtys DirtyDatas + if indexData, e := v.nm.IndexFileContent(); e != nil { + return nil, err + } else { + dirtys = ScanDirtyData(indexData) + } + dataFile, e := os.Open(v.FileName() + ".dat") + + if e != nil { + return nil, e + } + cr = &PureReader{ + Dirtys: dirtys, + DataFile: dataFile, + } + return +} diff --git a/go/storage/volume_replicate_test.go b/go/storage/volume_replicate_test.go new file mode 100644 index 000000000..d1da211c3 --- /dev/null +++ b/go/storage/volume_replicate_test.go @@ -0,0 +1,22 @@ +package storage + +import "testing" + +func TestDirtyDataSearch(t *testing.T) { + testData := DirtyDatas{ + {30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50}, + } + testOffset := []int64{ + 0, 150, 480, 1024, + } + testData.Sort() + t.Logf("TestData = %v", testData) + for _, off := range testOffset { + i := testData.Search(off) + if i < testData.Len() { + t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i]) + } else { + t.Logf("Search %d return %d ", off, i) + } + } +} diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index e37360075..fc8c33900 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -15,16 +15,15 @@ const ( /* * Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 -* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc +* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc (Deprecated!) * Byte 2 and byte 3: Time to live. See TTL for definition * Byte 4 and byte 5: The number of times the volume has been compacted. * Rest bytes: Reserved */ type SuperBlock struct { - version Version - ReplicaPlacement *ReplicaPlacement - Ttl *TTL - CompactRevision uint16 + version Version + Ttl *TTL + CompactRevision uint16 } func (s *SuperBlock) Version() Version { @@ -33,7 +32,7 @@ func (s *SuperBlock) Version() Version { func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.version) - header[1] = s.ReplicaPlacement.Byte() + header[1] = 0 s.Ttl.ToBytes(header[2:4]) util.Uint16toBytes(header[4:6], s.CompactRevision) return header @@ -59,6 +58,7 @@ func (v *Volume) maybeWriteSuperBlock() error { } return e } + func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) @@ -70,11 +70,18 @@ func (v *Volume) readSuperBlock() (err error) { v.SuperBlock, err = ParseSuperBlock(header) return err } + +func (v *Volume) writeSuperBlock() (err error) { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil { + return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e) + } + return nil +} + func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.version = Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - } superBlock.Ttl = LoadTTLFromBytes(header[2:4]) superBlock.CompactRevision = util.BytesToUint16(header[4:6]) return diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go index 2c72d62f0..7d09c873d 100644 --- a/go/storage/volume_sync.go +++ b/go/storage/volume_sync.go @@ -169,7 +169,6 @@ func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { syncStatus.IdxFileSize = v.nm.IndexFileSize() syncStatus.CompactRevision = v.SuperBlock.CompactRevision syncStatus.Ttl = v.SuperBlock.Ttl.String() - syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() return syncStatus } @@ -202,6 +201,9 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, if err != nil { return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) } + if needleValue.Size != uint32(len(b)) { + return fmt.Errorf("Reading from %s error: size incorrect", volumeDataContentHandlerUrl) + } offset, err := v.AppendBlob(b) if err != nil { return fmt.Errorf("Appending volume %d error: %v", v.Id, err) diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go index 4318bb048..676479cfb 100644 --- a/go/storage/volume_ttl.go +++ b/go/storage/volume_ttl.go @@ -114,6 +114,10 @@ func toStoredByte(readableUnitByte byte) byte { return 0 } +func (t *TTL) Equal(t1 *TTL) bool { + return t.count == t1.count && t.unit == t1.unit +} + func (t TTL) Minutes() uint32 { switch t.unit { case Empty: diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7377afdc9..3941a568f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -30,6 +30,7 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") _ = v.dataFile.Close() var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { return e } @@ -45,6 +46,12 @@ func (v *Volume) commitCompact() error { return nil } +func (v *Volume) cleanCompact() error { + os.Remove(v.FileName() + ".cpd") + os.Remove(v.FileName() + ".cpx") + return nil +} + func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { var ( dst, idx *os.File diff --git a/go/storage/volume_version.go b/go/storage/volume_version.go index 2e9f58aa2..8cd132c58 100644 --- a/go/storage/volume_version.go +++ b/go/storage/volume_version.go @@ -3,6 +3,7 @@ package storage type Version uint8 const ( + NoneVersion = Version(0) Version1 = Version(1) Version2 = Version(2) CurrentVersion = Version2 diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index f014c3527..e48f01495 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -18,9 +18,8 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption values := make(url.Values) values.Add("volume", vid.String()) values.Add("collection", option.Collection) - values.Add("replication", option.ReplicaPlacement.String()) values.Add("ttl", option.Ttl.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + jsonBlob, err := util.Post(dn.Url(), "/admin/assign_volume", values) if err != nil { return err } diff --git a/go/topology/batch_operation.go b/go/topology/batch_operation.go new file mode 100644 index 000000000..3cf791d1e --- /dev/null +++ b/go/topology/batch_operation.go @@ -0,0 +1,43 @@ +package topology + +import ( + "net/url" + "strconv" + "time" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/util" +) + +func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) { + ch := make(chan bool, locationList.Length()) + for _, dn := range locationList.list { + go func(url string, path string, values url.Values) { + _, e := util.RemoteApiCall(url, path, values) + if e != nil { + glog.V(0).Infoln("RemoteApiCall:", util.MkUrl(url, path, values), "error =", e) + } + ch <- e == nil + + }(dn.Url(), path, values) + } + isSuccess = true + for range locationList.list { + select { + case canVacuum := <-ch: + isSuccess = isSuccess && canVacuum + case <-time.After(30 * time.Minute): + isSuccess = false + break + } + } + return isSuccess +} + +func SetVolumeReadonly(locationList *VolumeLocationList, volume string, isReadonly bool) (isSuccess bool) { + forms := url.Values{} + forms.Set("key", "readonly") + forms.Set("value", strconv.FormatBool(isReadonly)) + forms.Set("volume", volume) + return BatchOperation(locationList, "/admin/setting", forms) +} diff --git a/go/topology/collection.go b/go/topology/collection.go index 376b62405..e5c7b0f0f 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -10,11 +10,12 @@ import ( type Collection struct { Name string volumeSizeLimit uint64 + rp *storage.ReplicaPlacement storageType2VolumeLayout *util.ConcurrentReadMap } -func NewCollection(name string, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} +func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection { + c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit, rp: rp} c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } @@ -23,18 +24,18 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - keyString := rp.String() +func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout { + keyString := "" if ttl != nil { keyString += ttl.String() } vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit) }) return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList { for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/go/topology/data_node.go b/go/topology/data_node.go index fe0926e85..19f3870de 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -42,6 +42,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { } else { dn.volumes[v.Id] = v } + return } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { diff --git a/go/topology/node.go b/go/topology/node.go index 3b6d55ce9..655e496b1 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -3,7 +3,8 @@ package topology import ( "errors" "math/rand" - "strings" + + "sort" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" @@ -18,11 +19,13 @@ type Node interface { UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + UpAdjustPlannedVolumeCountDelta(delta int) UpAdjustMaxVolumeId(vid storage.VolumeId) GetVolumeCount() int GetActiveVolumeCount() int GetMaxVolumeCount() int + GetPlannedVolumeCount() int GetMaxVolumeId() storage.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -38,68 +41,80 @@ type Node interface { GetValue() interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { - id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int - parent Node - children map[NodeId]Node - maxVolumeId storage.VolumeId + id NodeId + volumeCount int + activeVolumeCount int + maxVolumeCount int + plannedVolumeCount int + parent Node + children map[NodeId]Node + maxVolumeId storage.VolumeId //for rack, data center, topology nodeType string value interface{} } +type NodePicker interface { + PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) +} + +var ErrFilterContinue = errors.New("continue") + +type FilterNodeFn func(dn Node) error +type PickNodesFn func(nodes []Node, count int) []Node + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { +func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) { candidates := make([]Node, 0, len(n.children)) var errs []string for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { + if err := filterNodeFn(node); err == nil { candidates = append(candidates, node) + } else if err == ErrFilterContinue { + continue } else { errs = append(errs, string(node.Id())+":"+err.Error()) } } - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes { + return nil, errors.New("Not enough data node found!") + // return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) + return pickFn(candidates, numberOfNodes), nil +} - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue - } - glog.V(2).Infoln("select rest node candidate:", node.Id()) - candidates = append(candidates, node) - } - glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") - ret := len(restNodes) == 0 - for k, node := range candidates { - if k < len(restNodes) { - restNodes[k] = node - if k == len(restNodes)-1 { - ret = true - } - } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } - } +func RandomlyPickNodeFn(nodes []Node, count int) []Node { + if len(nodes) < count { + return nil } - if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("Not enough data node found!") + for i := range nodes { + j := rand.Intn(i + 1) + nodes[i], nodes[j] = nodes[j], nodes[i] } - return + return nodes[:count] +} + +func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) { + return n.PickNodes(numberOfNodes, filterFirstNodeFn, RandomlyPickNodeFn) +} + +type nodeList []Node + +func (s nodeList) Len() int { return len(s) } +func (s nodeList) Less(i, j int) bool { return s[i].FreeSpace() < s[j].FreeSpace() } +func (s nodeList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func PickLowUsageNodeFn(nodes []Node, count int) []Node { + if len(nodes) < count { + return nil + } + sort.Sort(sort.Reverse(nodeList(nodes))) + return nodes[:count] +} + +func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) { + return n.PickNodes(numberOfNodes, filterFirstNodeFn, PickLowUsageNodeFn) } func (n *NodeImpl) IsDataNode() bool { @@ -121,7 +136,7 @@ func (n *NodeImpl) Id() NodeId { return n.id } func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.volumeCount + return n.maxVolumeCount - n.volumeCount - n.plannedVolumeCount } func (n *NodeImpl) SetParent(node Node) { n.parent = node @@ -146,7 +161,7 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { - // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + // fmt.Println("assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } assignedNode, err = node.ReserveOneVolume(r) @@ -176,6 +191,14 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } + +func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negative + n.plannedVolumeCount += delta + if n.parent != nil { + n.parent.UpAdjustPlannedVolumeCountDelta(delta) + } +} + func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid @@ -197,6 +220,10 @@ func (n *NodeImpl) GetMaxVolumeCount() int { return n.maxVolumeCount } +func (n *NodeImpl) GetPlannedVolumeCount() int { + return n.plannedVolumeCount +} + func (n *NodeImpl) LinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index dc26dade0..772bf8516 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -20,31 +20,25 @@ func ReplicatedWrite(masterNode string, s *storage.Store, jwt := security.GetJwt(r) ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) if err != nil { errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - } else { + } else if ret <= 0 { errorStatus = "Failed to write to local disk" } - if !needToReplicate && ret > 0 { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - _, err := operation.Upload( - "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), - jwt) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } + //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + _, err := operation.Upload( + "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), + string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + jwt) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() } } + size = ret return } @@ -61,25 +55,19 @@ func ReplicatedDelete(masterNode string, store *storage.Store, glog.V(0).Infoln("delete error:", err) return } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) - }) { - ret = 0 - } + //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) + }) { + ret = 0 } } return } func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { + if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String()); lookupErr == nil { length := 0 selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) results := make(chan bool) @@ -95,6 +83,14 @@ func distributedOperation(masterNode string, store *storage.Store, volumeId stor for i := 0; i < length; i++ { ret = ret && <-results } + // we shouldn't check ReplicaPlacement because the needle have been written in head volume + + // if volume := store.GetVolume(volumeId); volume != nil { + // if length+1 < volume.ReplicaPlacement.GetCopyCount() { + // glog.V(0).Infof("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) + // ret = false + // } + // } return ret } else { glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) diff --git a/go/topology/topology.go b/go/topology/topology.go index ee1477cd2..410a1c70e 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -28,12 +28,14 @@ type Topology struct { chanRecoveredDataNodes chan *DataNode chanFullVolumes chan storage.VolumeInfo + CollectionSettings *storage.CollectionSettings + configuration *Configuration RaftServer raft.Server } -func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { +func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -42,6 +44,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.CollectionSettings = cs t.Sequence = seq @@ -87,7 +90,7 @@ func (t *Topology) loadConfiguration(configurationFile string) error { return nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items { @@ -111,23 +114,23 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { + vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.Ttl).PickForWrite(count, option) + if err != nil || dataNodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(ttl) } func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { @@ -140,11 +143,11 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { glog.Infof("removing volume info:%+v", v) - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn) } func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { @@ -167,6 +170,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) @@ -174,6 +178,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { @@ -187,3 +192,18 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { t.LinkChildNode(dc) return dc } + +type DataNodeWalker func(dn *DataNode) (e error) + +func (t *Topology) WalkDataNode(walker DataNodeWalker) error { + for _, c := range t.Children() { + for _, rack := range c.(*DataCenter).Children() { + for _, dn := range rack.(*Rack).Children() { + if e := walker(dn.(*DataNode)); e != nil { + return e + } + } + } + } + return nil +} diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5f5faf04e..65792a08a 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -21,8 +21,9 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { go func(garbageThreshold string) { c := time.Tick(15 * time.Minute) if t.IsLeader() { - for _ = range c { + for range c { t.Vacuum(garbageThreshold) + // t.CheckReplicate() } } }(garbageThreshold) @@ -42,7 +43,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.Ttl) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -56,7 +57,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl := t.GetVolumeLayout(v.Collection, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -66,7 +67,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl := t.GetVolumeLayout(v.Collection, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go new file mode 100644 index 000000000..9ed262dd2 --- /dev/null +++ b/go/topology/topology_replicate.go @@ -0,0 +1,153 @@ +package topology + +import ( + "container/list" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" +) + +var ( + isReplicateCheckerRunning = false +) + +const ReplicateTaskTimeout = time.Hour + +type ReplicateTask struct { + Vid storage.VolumeId + Collection string + SrcDN *DataNode + DstDN *DataNode +} + +func (t *ReplicateTask) Run(topo *Topology) error { + //is lookup thread safe? + locationList := topo.Lookup(t.Collection, t.Vid) + rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection) + if locationList.CalcReplicaPlacement().Compare(rp) >= 0 { + glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String()) + return nil + } + if !SetVolumeReadonly(locationList, t.Vid.String(), true) { + return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid) + } + defer SetVolumeReadonly(locationList, t.Vid.String(), false) + tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplicate, storage.TaskParams{ + "volume": t.Vid.String(), + "source": t.SrcDN.Url(), + "collection": t.Collection, + }) + if e != nil { + return e + } + if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil { + tc.Clean() + return e + } + e = tc.Commit() + return e +} + +func (t *ReplicateTask) WorkingDataNodes() []*DataNode { + return []*DataNode{ + t.SrcDN, + t.DstDN, + } +} + +func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + glog.V(0).Infoln("checking replicate on collection:", c.Name) + growOption := &VolumeGrowOption{ReplicaPlacement: c.rp} + for _, vl := range c.storageType2VolumeLayout.Items { + if vl != nil { + volumeLayout := vl.(*VolumeLayout) + for vid, locationList := range volumeLayout.vid2location { + rp1 := locationList.CalcReplicaPlacement() + if rp1.Compare(volumeLayout.rp) >= 0 { + continue + } + if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil { + for _, s := range additionServers { + s.UpAdjustPlannedVolumeCountDelta(1) + rt := &ReplicateTask{ + Vid: vid, + Collection: c.Name, + SrcDN: locationList.PickForRead(), + DstDN: s, + } + tasks = append(tasks, rt) + glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url()) + } + } else { + glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e) + } + } + } + } + } + return +} + +func (topo *Topology) CheckReplicate() { + isReplicateCheckerRunning = true + defer func() { + isReplicateCheckerRunning = false + }() + glog.V(1).Infoln("Start replicate checker on demand") + busyDataNodes := make(map[*DataNode]int) + taskCount := 0 + taskQueue := list.New() + for _, t := range planReplicateTasks(topo) { + taskQueue.PushBack(t) + taskCount++ + } + taskChan := make(chan *ReplicateTask) + for taskCount > 0 { + TaskQueueLoop: + for e := taskQueue.Front(); e != nil; e = e.Next() { + task := e.Value.(*ReplicateTask) + //only one task will run on the data node + dns := task.WorkingDataNodes() + for _, dn := range dns { + if busyDataNodes[dn] > 0 { + continue TaskQueueLoop + } + } + for _, dn := range dns { + busyDataNodes[dn]++ + } + go func(t *ReplicateTask) { + if e := t.Run(topo); e != nil { + glog.V(0).Infof("ReplicateTask run error, vid: %v, dst: %s. %v", t.Vid, t.DstDN.Url(), e) + } else { + glog.V(2).Infof("ReplicateTask finished, vid: %v, dst: %s", t.Vid, t.DstDN.Url()) + + } + taskChan <- t + }(task) + taskQueue.Remove(e) + + } + + finishedTask := <-taskChan + for _, dn := range finishedTask.WorkingDataNodes() { + if busyDataNodes[dn] > 0 { + busyDataNodes[dn]-- + } + } + taskCount-- + finishedTask.DstDN.UpAdjustPlannedVolumeCountDelta(-1) + } + glog.V(0).Infoln("finish replicate check.") +} + +func (topo *Topology) StartCheckReplicate() { + if isReplicateCheckerRunning { + return + } + go topo.CheckReplicate() +} diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 48bc8311d..446eb0c1c 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -26,7 +26,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist }(index, dn.Url(), vid) } isCheckSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case canVacuum := <-ch: isCheckSuccess = isCheckSuccess && canVacuum @@ -53,7 +53,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli }(index, dn.Url(), vid) } isVacuumSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case _ = <-ch: case <-time.After(30 * time.Minute): @@ -87,11 +87,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int { for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { volumeLayout := vl.(*VolumeLayout) - for vid, locationlist := range volumeLayout.vid2location { + for vid, locationList := range volumeLayout.vid2location { glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { - batchVacuumVolumeCommit(volumeLayout, vid, locationlist) + if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationList) { + batchVacuumVolumeCommit(volumeLayout, vid, locationList) } } } @@ -110,7 +110,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho values := make(url.Values) values.Add("volume", vid.String()) values.Add("garbageThreshold", garbageThreshold) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/check", values) if err != nil { glog.V(0).Infoln("parameters:", values) return err, false @@ -127,7 +127,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/compact", values) if err != nil { return err } @@ -143,7 +143,7 @@ func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/commit", values) if err != nil { return err } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index a25ba116b..f010894b9 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -76,7 +76,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp } func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, option) + servers, e := FindEmptySlotsForOneVolume(topo, option, nil) if e != nil { return 0, e } @@ -85,127 +85,205 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i return len(servers), err } +func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { + for _, server := range servers { + if err := AllocateVolume(server, vid, option); err == nil { + vi := storage.VolumeInfo{ + Id: vid, + Size: 0, + Collection: option.Collection, + Ttl: option.Ttl, + Version: storage.CurrentVersion, + } + server.AddOrUpdateVolume(vi) + topo.RegisterVolumeLayout(vi, server) + glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String()) + } else { + glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err) + return fmt.Errorf("Failed to assign %d: %v", vid, err) + } + } + return nil +} + +func filterMainDataCenter(option *VolumeGrowOption, node Node) error { + if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { + return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) + } + rp := option.ReplicaPlacement + if len(node.Children()) < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) + } + if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + } + possibleRacksCount := 0 + for _, rack := range node.Children() { + possibleDataNodesCount := 0 + for _, n := range rack.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount >= rp.SameRackCount+1 { + possibleRacksCount++ + } + } + if possibleRacksCount < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) + } + return nil +} + +func filterMainRack(option *VolumeGrowOption, node Node) error { + if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { + return fmt.Errorf("Not matching preferred rack:%s", option.Rack) + } + rp := option.ReplicaPlacement + if node.FreeSpace() < rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + } + if len(node.Children()) < rp.SameRackCount+1 { + // a bit faster way to test free racks + return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) + } + possibleDataNodesCount := 0 + for _, n := range node.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount < rp.SameRackCount+1 { + return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) + } + return nil +} + +func makeExceptNodeFilter(nodes []Node) FilterNodeFn { + m := make(map[NodeId]bool) + for _, n := range nodes { + m[n.Id()] = true + } + return func(dn Node) error { + if dn.FreeSpace() <= 0 { + return ErrFilterContinue + } + if _, ok := m[dn.Id()]; ok { + return ErrFilterContinue + } + return nil + } +} + // 1. find the main data node // 1.1 collect all data nodes that have 1 slots // 2.2 collect all racks that have rp.SameRackCount+1 // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 // 2. find rest data nodes -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { +func FindEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, existsServers *VolumeLocationList) (additionServers []*DataNode, err error) { //find main datacenter and other data centers + pickNodesFn := PickLowUsageNodeFn rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { - if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { - return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) - } - if len(node.Children()) < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) - } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + + pickMainAndRestNodes := func(np NodePicker, totalNodeCount int, filterFirstNodeFn FilterNodeFn, existsNodes []Node) (mainNode Node, restNodes []Node, e error) { + if len(existsNodes) > 0 { + mainNode = existsNodes[0] } - possibleRacksCount := 0 - for _, rack := range node.Children() { - possibleDataNodesCount := 0 - for _, n := range rack.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount >= rp.SameRackCount+1 { - possibleRacksCount++ + + if mainNode == nil { + mainNodes, err := np.PickNodes(1, filterFirstNodeFn, pickNodesFn) + if err != nil { + return nil, nil, err } + mainNode = mainNodes[0] + existsNodes = append(existsNodes, mainNode) } - if possibleRacksCount < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) + glog.V(3).Infoln(mainNode.Id(), "picked main node:", mainNode.Id()) + + restCount := totalNodeCount - len(existsNodes) + + if restCount > 0 { + restNodes, err = np.PickNodes(restCount, + makeExceptNodeFilter(existsNodes), pickNodesFn) + if err != nil { + return nil, nil, err + } } - return nil - }) + + return mainNode, restNodes, nil + } + var existsNode []Node + if existsServers != nil { + existsNode = existsServers.DiffDataCenters() + } + mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount+1, + func(node Node) error { + return filterMainDataCenter(option, node) + }, existsNode) if dc_err != nil { return nil, dc_err } - //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { - if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { - return fmt.Errorf("Not matching preferred rack:%s", option.Rack) - } - if node.FreeSpace() < rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) - } - if len(node.Children()) < rp.SameRackCount+1 { - // a bit faster way to test free racks - return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) - } - possibleDataNodesCount := 0 - for _, n := range node.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount < rp.SameRackCount+1 { - return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) - } - return nil - }) + if existsServers != nil { + existsNode = existsServers.DiffRacks(mainDataCenter.(*DataCenter)) + } else { + existsNode = nil + } + mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount+1, + func(node Node) error { + return filterMainRack(option, node) + }, + existsNode, + ) if rack_err != nil { return nil, rack_err } - //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { - if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { - return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) - } - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) - } - return nil - }) + //find main server and other servers + if existsServers != nil { + existsNode = existsServers.SameServers(mainRack.(*Rack)) + } else { + existsNode = nil + } + mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount+1, + func(node Node) error { + if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { + return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) + } + if node.FreeSpace() < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + } + return nil + }, + existsNode, + ) + if server_err != nil { return nil, server_err } + if existsServers != nil && existsServers.ContainsDataNode(mainServer.(*DataNode)) { + } else { + additionServers = append(additionServers, mainServer.(*DataNode)) + } - servers = append(servers, mainServer.(*DataNode)) for _, server := range otherServers { - servers = append(servers, server.(*DataNode)) + additionServers = append(additionServers, server.(*DataNode)) } for _, rack := range otherRacks { r := rand.Intn(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { - servers = append(servers, server) + additionServers = append(additionServers, server) } else { - return servers, e + return additionServers, e } } - for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { - servers = append(servers, server) + for _, dc := range otherDataCenters { + r := rand.Intn(dc.FreeSpace()) + if server, e := dc.ReserveOneVolume(r); e == nil { + additionServers = append(additionServers, server) } else { - return servers, e + return additionServers, e } } return } - -func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { - for _, server := range servers { - if err := AllocateVolume(server, vid, option); err == nil { - vi := storage.VolumeInfo{ - Id: vid, - Size: 0, - Collection: option.Collection, - ReplicaPlacement: option.ReplicaPlacement, - Ttl: option.Ttl, - Version: storage.CurrentVersion, - } - server.AddOrUpdateVolume(vi) - topo.RegisterVolumeLayout(vi, server) - glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String()) - } else { - glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err) - return fmt.Errorf("Failed to assign %d: %v", vid, err) - } - } - return nil -} diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 15abfcc73..04125e7f2 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -5,8 +5,11 @@ import ( "fmt" "testing" + "strings" + "github.com/chrislusf/seaweedfs/go/sequence" "github.com/chrislusf/seaweedfs/go/storage" + "github.com/syndtr/goleveldb/leveldb/errors" ) var topologyLayout = ` @@ -19,7 +22,7 @@ var topologyLayout = ` {"id":2, "size":12312}, {"id":3, "size":12312} ], - "limit":3 + "limit":15 }, "server112":{ "volumes":[ @@ -28,6 +31,18 @@ var topologyLayout = ` {"id":6, "size":12312} ], "limit":10 + }, + "server113":{ + "volumes":[ + {"id":7, "size":12312}, + {"id":8, "size":12312}, + {"id":9, "size":12312} + ], + "limit":8 + }, + "server114":{ + "volumes":[], + "limit":8 } }, "rack2":{ @@ -37,11 +52,15 @@ var topologyLayout = ` {"id":5, "size":12312}, {"id":6, "size":12312} ], - "limit":4 + "limit":8 }, "server122":{ "volumes":[], - "limit":4 + "limit":8 + }, + "server124":{ + "volumes":[], + "limit":8 }, "server123":{ "volumes":[ @@ -54,6 +73,16 @@ var topologyLayout = ` } }, "dc2":{ + "rack2":{ + "server221":{ + "volumes":[], + "limit":8 + }, + "server222":{ + "volumes":[], + "limit":8 + } + } }, "dc3":{ "rack2":{ @@ -63,13 +92,25 @@ var topologyLayout = ` {"id":3, "size":12312}, {"id":5, "size":12312} ], - "limit":4 + "limit":8 + }, + "server322":{ + "volumes":[], + "limit":7 } } } } ` +var testLocList = [][]string{ + {"server111", "server121"}, + {"server111", "server112"}, + {"server111", "server112", "server113"}, + {"server111", "server221", "server321"}, + {"server112"}, +} + func setup(topologyLayout string) *Topology { var data interface{} err := json.Unmarshal([]byte(topologyLayout), &data) @@ -80,6 +121,7 @@ func setup(topologyLayout string) *Topology { //need to connect all nodes first before server adding volumes topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", + storage.NewCollectionSettings("000", "0.3"), sequence.NewMemorySequencer(), 32*1024, 5) if err != nil { panic("error: " + err.Error()) @@ -115,8 +157,7 @@ func setup(topologyLayout string) *Topology { func TestFindEmptySlotsForOneVolume(t *testing.T) { topo := setup(topologyLayout) - vg := NewDefaultVolumeGrowth() - rp, _ := storage.NewReplicaPlacementFromString("002") + rp, _ := storage.NewReplicaPlacementFromString("111") volumeGrowOption := &VolumeGrowOption{ Collection: "", ReplicaPlacement: rp, @@ -124,12 +165,72 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) { Rack: "", DataNode: "", } - servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + servers, err := FindEmptySlotsForOneVolume(topo, volumeGrowOption, nil) if err != nil { fmt.Println("finding empty slots error :", err) t.Fail() } for _, server := range servers { - fmt.Println("assigned node :", server.Id()) + fmt.Printf("assigned node: %s, free space: %d\n", server.Id(), server.FreeSpace()) + } + +} + +func getDataNodeFromId(topo *Topology, id string) (foundDn *DataNode) { + nid := NodeId(id) + topo.WalkDataNode(func(dn *DataNode) (e error) { + if dn.Id() == nid { + foundDn = dn + e = errors.New("Found.") + } + return + }) + return +} + +func setupTestLocationList(topo *Topology) (ret []*VolumeLocationList) { + + for _, ll := range testLocList { + vl := &VolumeLocationList{} + for _, nid := range ll { + if n := getDataNodeFromId(topo, nid); n != nil { + vl.list = append(vl.list, n) + } + } + ret = append(ret, vl) + } + return +} + +func joinNodeId(dns []*DataNode) string { + ss := []string{} + for _, dn := range dns { + ss = append(ss, string(dn.Id())) + } + return strings.Join(ss, ", ") +} + +func TestFindEmptySlotsWithExistsNodes(t *testing.T) { + topo := setup(topologyLayout) + rp, _ := storage.NewReplicaPlacementFromString("112") + volumeGrowOption := &VolumeGrowOption{ + Collection: "", + ReplicaPlacement: rp, + DataCenter: "dc1", + Rack: "", + DataNode: "", + } + testLocationList := setupTestLocationList(topo) + for _, locationList := range testLocationList { + lrp := locationList.CalcReplicaPlacement() + t.Logf("location list: [%s], replica placement = %s\n", joinNodeId(locationList.list), lrp.String()) + if lrp.Compare(rp) < 0 { + servers, err := FindEmptySlotsForOneVolume(topo, volumeGrowOption, locationList) + if err != nil { + t.Log("finding empty slots error :", err) + t.Fail() + } + t.Logf("assigned node: %s\n\n", joinNodeId(servers)) + } } } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 3c1dd9503..2a38fda99 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -25,7 +25,6 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL rp: rp, ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, } } @@ -42,7 +41,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } vl.vid2location[v.Id].Set(dn) - glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length()) + //TODO balancing data when have more replications if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { vl.AddToWritable(v.Id) } else { @@ -53,7 +53,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - + //TODO only delete data node from locations? vl.removeFromWritable(v.Id) delete(vl.vid2location, v.Id) } @@ -73,9 +73,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { !v.ReadOnly } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList { if location := vl.vid2location[vid]; location != nil { - return location.list + return location } return nil } diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index d5eaf5e92..929d0c8a3 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -2,6 +2,9 @@ package topology import ( "fmt" + + "github.com/chrislusf/seaweedfs/go/storage" + "math/rand" ) type VolumeLocationList struct { @@ -21,6 +24,14 @@ func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } +func (dnll *VolumeLocationList) PickForRead() *DataNode { + return dnll.list[rand.Intn(len(dnll.list))] +} + +func (dnll *VolumeLocationList) AllDataNode() []*DataNode { + return dnll.list +} + func (dnll *VolumeLocationList) Length() int { return len(dnll.list) } @@ -63,3 +74,103 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { dnll.list = l } } + +// return all data centers, first is main data center +func (dnll *VolumeLocationList) DiffDataCenters() []Node { + m := make(map[*DataCenter]int) + maxCount := 0 + var mainDC *DataCenter + for _, dn := range dnll.list { + var dc *DataCenter + if dc = dn.GetDataCenter(); dc == nil { + continue + } + m[dc] = m[dc] + 1 + if m[dc] > maxCount { + mainDC = dc + maxCount = m[dc] + } + } + dataCenters := make([]Node, 0, len(m)) + if mainDC != nil { + dataCenters = append(dataCenters, mainDC) + } + for dc := range m { + if dc != mainDC { + dataCenters = append(dataCenters, dc) + } + } + return dataCenters +} + +// return all racks if data center set nil +func (dnll *VolumeLocationList) DiffRacks(mainDC *DataCenter) []Node { + m := make(map[*Rack]int) + maxCount := 0 + var mainRack *Rack + for _, dn := range dnll.list { + if mainDC != nil && dn.GetDataCenter() != mainDC { + continue + } + var rack *Rack + if rack = dn.GetRack(); rack == nil { + continue + } + m[rack] = m[rack] + 1 + if m[rack] > maxCount { + mainRack = rack + maxCount = m[rack] + } + } + racks := make([]Node, 0, len(m)) + if mainRack != nil { + racks = append(racks, mainRack) + } + for rack := range m { + if rack != mainRack { + racks = append(racks, rack) + } + } + return racks +} + +func (dnll *VolumeLocationList) SameServers(mainRack *Rack) (servers []Node) { + for _, dn := range dnll.list { + if mainRack != nil && dn.GetRack() != mainRack { + continue + } + var rack *Rack + if rack = dn.GetRack(); rack == nil { + continue + } + servers = append(servers, dn) + } + return servers +} + +func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) { + var dcs, rs, ss []Node + dcs = dnll.DiffDataCenters() + if len(dcs) > 0 { + rs = dnll.DiffRacks(dcs[0].(*DataCenter)) + if len(rs) > 0 { + ss = dnll.SameServers(rs[0].(*Rack)) + } + } + + rp = &storage.ReplicaPlacement{ + SameRackCount: len(ss) - 1, + DiffRackCount: len(rs) - 1, + DiffDataCenterCount: len(dcs) - 1, + } + return +} + +func (dnll *VolumeLocationList) ContainsDataNode(n *DataNode) bool { + for _, dn := range dnll.list { + if dn == n { + return true + } + } + return false +} diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go index 41cce8b82..9e9e7f438 100644 --- a/go/util/concurrent_read_map.go +++ b/go/util/concurrent_read_map.go @@ -7,9 +7,8 @@ import ( // A mostly for read map, which can thread-safely // initialize the map entries. type ConcurrentReadMap struct { - rmutex sync.RWMutex - mutex sync.Mutex - Items map[string]interface{} + rwmutex sync.RWMutex + Items map[string]interface{} } func NewConcurrentReadMap() *ConcurrentReadMap { @@ -17,8 +16,8 @@ func NewConcurrentReadMap() *ConcurrentReadMap { } func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.Lock() + defer m.rwmutex.Unlock() if value, ok := m.Items[key]; ok { return value } @@ -28,11 +27,11 @@ func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{} } func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { - m.rmutex.RLock() - if value, ok := m.Items[key]; ok { - m.rmutex.RUnlock() + m.rwmutex.RLock() + value, ok := m.Items[key] + m.rwmutex.RUnlock() + if ok { return value } - m.rmutex.RUnlock() return m.initMapEntry(key, newEntry) } diff --git a/go/util/http_util.go b/go/util/http_util.go index 29b2043ee..7a395aca6 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -11,7 +11,12 @@ import ( "net/url" "strings" + "os" + + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" + "github.com/pierrec/lz4" + "strconv" ) var ( @@ -26,6 +31,18 @@ func init() { client = &http.Client{Transport: Transport} } +func MkUrl(host, path string, args url.Values) string { + u := url.URL{ + Scheme: "http", + Host: host, + Path: path, + } + if args != nil { + u.RawQuery = args.Encode() + } + return u.String() +} + func PostBytes(url string, body []byte) ([]byte, error) { r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) if err != nil { @@ -39,20 +56,62 @@ func PostBytes(url string, body []byte) ([]byte, error) { return b, nil } -func Post(url string, values url.Values) ([]byte, error) { +func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) { + url := MkUrl(host, path, nil) + glog.V(4).Infoln("Post", url+"?"+values.Encode()) r, err := client.PostForm(url, values) if err != nil { - return nil, err + return nil, 0, err } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - return nil, err + return nil, r.StatusCode, err } - return b, nil + return b, r.StatusCode, nil +} + +func Post(host, path string, values url.Values) (content []byte, e error) { + content, _, e = PostEx(host, path, values) + return +} + +type RApiError struct { + E string +} + +func (e *RApiError) Error() string { + return e.E } -func Get(url string) ([]byte, error) { +func IsRemoteApiError(e error) bool { + switch e.(type) { + case *RApiError: + return true + } + return false +} + +func RemoteApiCall(host, path string, values url.Values) (result map[string]interface{}, e error) { + jsonBlob, code, e := PostEx(host, path, values) + if e != nil { + return nil, e + } + result = make(map[string]interface{}) + if e := json.Unmarshal(jsonBlob, &result); e != nil { + return nil, e + } + if err, ok := result["error"]; ok && err.(string) != "" { + return nil, &RApiError{E: err.(string)} + } + if code != http.StatusOK && code != http.StatusAccepted { + return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code) + } + return result, nil +} + +func Get(host, path string, values url.Values) ([]byte, error) { + url := MkUrl(host, path, values) r, err := client.Get(url) if err != nil { return nil, err @@ -90,7 +149,7 @@ func Delete(url string, jwt security.EncodedJwt) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -140,6 +199,10 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { if err != nil { return "", nil, err } + if response.StatusCode != http.StatusOK { + response.Body.Close() + return "", nil, fmt.Errorf("%s: %s", fileUrl, response.Status) + } contentDisposition := response.Header["Content-Disposition"] if len(contentDisposition) > 0 { if strings.HasPrefix(contentDisposition[0], "filename=") { @@ -151,6 +214,41 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) { return } +func DownloadToFile(fileUrl, savePath string) (e error) { + response, err := client.Get(fileUrl) + if err != nil { + return err + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return fmt.Errorf("%s: %s", fileUrl, response.Status) + } + var r io.Reader + content_encoding := strings.ToLower(response.Header.Get("Content-Encoding")) + size := response.ContentLength + if n, e := strconv.ParseInt(response.Header.Get("X-Content-Length"), 10, 64); e == nil { + size = n + } + switch content_encoding { + case "lz4": + r = lz4.NewReader(response.Body) + default: + r = response.Body + } + var f *os.File + if f, e = os.OpenFile(savePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); e != nil { + return + } + if size >= 0 { + _, e = io.CopyN(f, r, size) + } else { + _, e = io.Copy(f, r) + } + + f.Close() + return +} + func Do(req *http.Request) (resp *http.Response, err error) { return client.Do(req) } diff --git a/go/weed/backup.go b/go/weed/backup.go index 5e51a8b03..2f97751f8 100644 --- a/go/weed/backup.go +++ b/go/weed/backup.go @@ -57,7 +57,7 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations.Head().Url stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) if err != nil { @@ -69,13 +69,8 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) return true } - replication, err := storage.NewReplicaPlacementFromString(stats.Replication) - if err != nil { - fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) - return true - } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, ttl) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index b63f0008e..daa970788 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -254,15 +254,14 @@ func readFiles(fileIdLineChan chan string, s *stat) { println("!!!! volume id ", vid, " location not found!!!!!") continue } - server := ret.Locations[rand.Intn(len(ret.Locations))].Url - url := "http://" + server + "/" + fid - if bytesRead, err := util.Get(url); err == nil { + server := ret.Locations.PickForRead().Url + if bytesRead, err := util.Get(server, "/"+fid, nil); err == nil { s.completed++ s.transferred += int64(len(bytesRead)) readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - fmt.Printf("Failed to read %s error:%v\n", url, err) + fmt.Printf("Failed to read %s/%s error:%v\n", server, fid, err) } } } diff --git a/go/weed/compact.go b/go/weed/compact.go index 673b96901..b51879f97 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { vid := storage.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil) + storage.NeedleMapInMemory, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/download.go b/go/weed/download.go index 2e948a056..5e1fd30f8 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -51,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool { } func downloadToFile(server, fileId, saveDir string) error { - fileUrl, lookupError := operation.LookupFileId(server, fileId) + fileUrl, lookupError := operation.LookupFileId(server, fileId, true) if lookupError != nil { return lookupError } @@ -103,7 +103,7 @@ func downloadToFile(server, fileId, saveDir string) error { } func fetchContent(server string, fileId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(server, fileId) + fileUrl, lookupError := operation.LookupFileId(server, fileId, true) if lookupError != nil { return "", nil, lookupError } diff --git a/go/weed/shell.go b/go/weed/shell.go index 144621b09..feac5ddd4 100644 --- a/go/weed/shell.go +++ b/go/weed/shell.go @@ -20,8 +20,6 @@ var cmdShell = &Command{ `, } -var () - func runShell(command *Command, args []string) bool { r := bufio.NewReader(os.Stdin) o := bufio.NewWriter(os.Stdout) diff --git a/go/weed/signal_handling.go b/go/weed/signal_handling.go index 2004bb088..9c3908ce3 100644 --- a/go/weed/signal_handling.go +++ b/go/weed/signal_handling.go @@ -22,7 +22,7 @@ func OnInterrupt(fn func()) { syscall.SIGTERM, syscall.SIGQUIT) go func() { - for _ = range signalChan { + for range signalChan { fn() os.Exit(0) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a7fa2de53..89499af40 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -69,7 +69,12 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj } func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { m := make(map[string]interface{}) - m["error"] = err.Error() + if err == nil { + m["error"] = "" + } else { + m["error"] = err.Error() + + } writeJsonQuiet(w, r, httpStatus, m) } diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 1695296d4..28de0ea63 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -5,7 +5,6 @@ import ( "errors" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "strconv" @@ -91,7 +90,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.WriteHeader(http.StatusNotFound) return } - urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url + urlLocation := lookup.Locations.PickForRead().Url urlString := "http://" + urlLocation + "/" + fileId if fs.redirectOnRead { http.Redirect(w, r, urlString, http.StatusFound) @@ -130,7 +129,11 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if replication == "" { replication = fs.defaultReplication } - assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl")) + collection := query.Get("collection") + if collection == "" { + collection = fs.collection + } + assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, http.StatusInternalServerError, ae) diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index db70ca6b1..62ec5c9aa 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/sequence" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/topology" "github.com/chrislusf/seaweedfs/go/util" "github.com/gorilla/mux" @@ -50,9 +51,10 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() + cs := storage.NewCollectionSettings(defaultReplicaPlacement, garbageThreshold) var e error - if ms.Topo, e = topology.NewTopology("topo", confFile, seq, - uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { + if ms.Topo, e = topology.NewTopology("topo", confFile, cs, + seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { glog.Fatalf("cannot create topology:%s", e) } ms.vg = topology.NewDefaultVolumeGrowth() @@ -71,6 +73,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/vol/check_replicate", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeCheckReplicateHandler))) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 2be5d9524..a61dd765d 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -23,10 +23,10 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume } volumeId, err := storage.NewVolumeId(vid) if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { - var ret []operation.Location - for _, dn := range machines { + locationList := ms.Topo.Lookup(collection, volumeId) + if locationList != nil { + var ret operation.Locations + for _, dn := range locationList.AllDataNode() { ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) } volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index fb2b18983..38a68762d 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "io/ioutil" - "math/rand" "net/http" "strconv" "strings" + "net/url" + "sync" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/storage" @@ -25,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) + _, err := util.Get(server.Ip+":"+strconv.Itoa(server.Port), "/admin/delete_collection", url.Values{"collection": r.Form["collection"]}) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -82,6 +84,11 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque ms.dirStatusHandler(w, r) } +func (ms *MasterServer) volumeCheckReplicateHandler(w http.ResponseWriter, r *http.Request) { + ms.Topo.StartCheckReplicate() + writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"status": "running"}) +} + func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { count := 0 option, err := ms.getVolumeGrowOption(r) @@ -122,8 +129,14 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) return } machines := ms.Topo.Lookup("", volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl)+r.URL.Path, http.StatusMovedPermanently) + if machines != nil && machines.Length() > 0 { + var url string + if r.URL.RawQuery != "" { + url = util.NormalizeUrl(machines.PickForRead().PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + } else { + url = util.NormalizeUrl(machines.PickForRead().PublicUrl) + r.URL.Path + } + http.Redirect(w, r, url, http.StatusMovedPermanently) } else { writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId)) } @@ -157,7 +170,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -184,3 +197,40 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr } return volumeGrowOption, nil } + +func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, volumes, collections []string) (result map[string]interface{}) { + forms := url.Values{} + forms.Set("key", settingKey) + forms.Set("value", settingValue) + if len(volumes) == 0 && len(collections) == 0 { + forms.Set("all", "true") + } else { + forms["volume"] = volumes + forms["collection"] = collections + } + + var wg sync.WaitGroup + ms.Topo.WalkDataNode(func(dn *topology.DataNode) (e error) { + wg.Add(1) + go func(server string, values url.Values) { + defer wg.Done() + jsonBlob, e := util.Post(server, "/admin/setting", values) + if e != nil { + result[server] = map[string]interface{}{ + "error": e.Error() + " " + string(jsonBlob), + } + } + var ret interface{} + if e := json.Unmarshal(jsonBlob, ret); e == nil { + result[server] = ret + } else { + result[server] = map[string]interface{}{ + "error": e.Error() + " " + string(jsonBlob), + } + } + }(dn.Url(), forms) + return nil + }) + wg.Wait() + return +} diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 8becdd0f1..fbf0339e3 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -20,7 +20,6 @@ type VolumeServer struct { store *storage.Store guard *security.Guard - needleMapKind storage.NeedleMapType FixJpgOrientation bool ReadRedirect bool } @@ -38,12 +37,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, - needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, } vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind) vs.guard = security.NewGuard(whiteList, "") @@ -53,10 +51,17 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/setting", vs.guard.WhiteList(vs.setVolumeOptionHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) + adminMux.HandleFunc("/admin/task/new", vs.guard.WhiteList(vs.newTaskHandler)) + adminMux.HandleFunc("/admin/task/query", vs.guard.WhiteList(vs.queryTaskHandler)) + adminMux.HandleFunc("/admin/task/commit", vs.guard.WhiteList(vs.commitTaskHandler)) + adminMux.HandleFunc("/admin/task/clean", vs.guard.WhiteList(vs.cleanTaskHandler)) + adminMux.HandleFunc("/admin/task/all", vs.guard.WhiteList(vs.allTaskHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 80aeb3f1d..779d6f99d 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -1,14 +1,23 @@ package weed_server import ( + "errors" "net/http" "path/filepath" + "strconv" + "strings" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/stats" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" ) +type VolumeOptError struct { + Volume string `json:"volume"` + Err string `json:"err"` +} + func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION @@ -17,7 +26,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { @@ -48,3 +57,65 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) m["DiskStatuses"] = ds writeJsonQuiet(w, r, http.StatusOK, m) } + +func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + errs := []VolumeOptError{} + var ( + setter storage.VolumeWalker + ) + + key := r.FormValue("key") + value := r.FormValue("value") + if key == "readonly" { + isReadOnly, e := strconv.ParseBool(value) + if e != nil { + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + setter = func(v *storage.Volume) error { + if e := v.SetReadOnly(isReadOnly); e != nil { + errs = append(errs, VolumeOptError{ + Volume: v.Id.String(), + Err: e.Error(), + }) + } + return nil + } + } else { + writeJsonError(w, r, http.StatusBadRequest, errors.New("Unkonw setting: "+key)) + return + } + + all, _ := strconv.ParseBool(r.FormValue("all")) + if all { + vs.store.WalkVolume(setter) + } else { + volumesSet := make(map[string]bool) + for _, volume := range r.Form["volume"] { + volumesSet[strings.TrimSpace(volume)] = true + } + collectionsSet := make(map[string]bool) + for _, c := range r.Form["collection"] { + collectionsSet[strings.TrimSpace(c)] = true + } + if len(collectionsSet) > 0 || len(volumesSet) > 0 { + vs.store.WalkVolume(func(v *storage.Volume) (e error) { + if !collectionsSet[v.Collection] && !volumesSet[v.Id.String()] { + return nil + } + setter(v) + return nil + }) + } + + } + + result := make(map[string]interface{}) + if len(errs) > 0 { + result["error"] = "set volume replica error." + result["errors"] = errs + } + + writeJson(w, r, http.StatusAccepted, result) +} diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index bfa6d6a4f..39613a07e 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -46,7 +46,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { - http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)+r.URL.Path, http.StatusMovedPermanently) + http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl)+r.URL.Path, http.StatusMovedPermanently) } else { glog.V(2).Infoln("lookup error:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index c650e5f53..6f0d5aa8b 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -2,11 +2,14 @@ package weed_server import ( "fmt" + "io" "net/http" + "strconv" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" + "github.com/pierrec/lz4" ) func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { @@ -84,3 +87,58 @@ func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) ( } return v, nil } + +func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { + v, e := vs.getVolume("volume", r) + if v == nil { + http.Error(w, e.Error(), http.StatusBadRequest) + return + } + cr, e := v.GetVolumeCleanReader() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError) + return + } + totalSize, e := cr.Size() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume size: %v", e), http.StatusInternalServerError) + return + } + w.Header().Set("Accept-Ranges", "bytes") + w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id)) + + rangeReq := r.Header.Get("Range") + if rangeReq == "" { + w.Header().Set("X-Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("Content-Encoding", "lz4") + lz4w := lz4.NewWriter(w) + if _, e = io.Copy(lz4w, cr); e != nil { + glog.V(4).Infoln("response write error:", e) + } + lz4w.Close() + return + } + ranges, e := parseRange(rangeReq, totalSize) + if e != nil { + http.Error(w, e.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if len(ranges) != 1 { + http.Error(w, "Only support one range", http.StatusNotImplemented) + return + } + ra := ranges[0] + if _, e := cr.Seek(ra.start, 0); e != nil { + http.Error(w, e.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("X-Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.Header().Set("Content-Encoding", "lz4") + w.WriteHeader(http.StatusPartialContent) + lz4w := lz4.NewWriter(w) + if _, e = io.CopyN(lz4w, cr, ra.length); e != nil { + glog.V(2).Infoln("response write error:", e) + } + lz4w.Close() +} diff --git a/go/weed/weed_server/volume_server_handlers_task.go b/go/weed/weed_server/volume_server_handlers_task.go new file mode 100644 index 000000000..9677e7d95 --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_task.go @@ -0,0 +1,66 @@ +package weed_server + +import ( + "net/http" + + "time" + + "strings" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" +) + +func (vs *VolumeServer) newTaskHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + tid, e := vs.store.TaskManager.NewTask(vs.store, r.Form) + if e == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"tid": tid}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, e) + } + glog.V(2).Infoln("new store task =", tid, ", error =", e) +} + +func (vs *VolumeServer) queryTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.FormValue("tid") + timeoutStr := strings.TrimSpace(r.FormValue("timeout")) + d := time.Minute + if td, e := time.ParseDuration(timeoutStr); e == nil { + d = td + } + err := vs.store.TaskManager.QueryResult(tid, d) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) + } + glog.V(2).Infoln("query task =", tid, ", error =", err) +} +func (vs *VolumeServer) commitTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.FormValue("tid") + err := vs.store.TaskManager.Commit(tid) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } + glog.V(2).Infoln("query task =", tid, ", error =", err) +} +func (vs *VolumeServer) cleanTaskHandler(w http.ResponseWriter, r *http.Request) { + tid := r.FormValue("tid") + err := vs.store.TaskManager.Clean(tid) + if err == storage.ErrTaskNotFinish { + writeJsonError(w, r, http.StatusRequestTimeout, err) + } else if err == nil { + writeJsonError(w, r, http.StatusOK, err) + } + glog.V(2).Infoln("clean task =", tid, ", error =", err) +} + +func (vs *VolumeServer) allTaskHandler(w http.ResponseWriter, r *http.Request) { + //TODO get all task + glog.V(2).Infoln("TODO: get all task") +}