Browse Source

Replica pacement now set by global or collection settting, we can change it anytime, then the cluster will automate replica the volume(developing).

pull/279/head
tnextday 10 years ago
parent
commit
83c8bd4e8b
  1. 1
      go/operation/sync_volume.go
  2. 2
      go/proto/system_message.proto
  3. 43
      go/storage/replica_placement.go
  4. 24
      go/storage/store.go
  5. 21
      go/storage/volume.go
  6. 10
      go/storage/volume_info.go
  7. 8
      go/storage/volume_super_block.go
  8. 1
      go/storage/volume_sync.go
  9. 1
      go/storage/volume_version.go
  10. 1
      go/topology/allocate_volume.go
  11. 9
      go/topology/collection.go
  12. 14
      go/topology/data_node.go
  13. 50
      go/topology/store_replicate.go
  14. 29
      go/topology/topology.go
  15. 6
      go/topology/topology_event_handling.go
  16. 1
      go/topology/volume_growth.go
  17. 1
      go/topology/volume_growth_test.go
  18. 5
      go/topology/volume_layout.go
  19. 7
      go/topology/volume_location_list.go
  20. 17
      go/util/concurrent_read_map.go
  21. 7
      go/weed/backup.go
  22. 2
      go/weed/compact.go
  23. 6
      go/weed/weed_server/master_server.go
  24. 19
      go/weed/weed_server/master_server_handlers_admin.go
  25. 17
      go/weed/weed_server/volume_server_handlers_admin.go

1
go/operation/sync_volume.go

@ -10,7 +10,6 @@ import (
)
type SyncVolumeResponse struct {
Replication string `json:"Replication,omitempty"`
Ttl string `json:"Ttl,omitempty"`
TailOffset uint64 `json:"TailOffset,omitempty"`
CompactRevision uint16 `json:"CompactRevision,omitempty"`

2
go/proto/system_message.proto

@ -8,7 +8,7 @@ message VolumeInformationMessage {
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
required uint32 replica_placement = 8;
// required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
}

43
go/storage/replica_placement.go

@ -1,6 +1,7 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
)
@ -11,6 +12,10 @@ type ReplicaPlacement struct {
DiffDataCenterCount int
}
type ReplicaPlacements struct {
settings map[string]*ReplicaPlacement
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
rp := &ReplicaPlacement{}
for i, c := range t {
@ -57,3 +62,41 @@ func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool {
rp.DiffRackCount == rp1.DiffRackCount &&
rp.DiffDataCenterCount == rp1.DiffDataCenterCount
}
func NewReplicaPlacements(defaultRP string) *ReplicaPlacements {
rp, e := NewReplicaPlacementFromString(defaultRP)
if e != nil {
rp, _ = NewReplicaPlacementFromString("000")
}
rps := &ReplicaPlacements{settings: make(map[string]*ReplicaPlacement)}
rps.settings[""] = rp
return rps
}
func NewReplicaPlacementsFromJson(s string) *ReplicaPlacements {
m := make(map[string]*ReplicaPlacement)
if json.Unmarshal([]byte(s), m) == nil {
m[""], _ = NewReplicaPlacementFromString("000")
}
return &ReplicaPlacements{settings: m}
}
func (rps *ReplicaPlacements) Get(collection string) *ReplicaPlacement {
if rp, ok := rps.settings[collection]; ok {
return rp
}
return rps.settings[""]
}
func (rps *ReplicaPlacements) Set(collection, t string) error {
rp, e := NewReplicaPlacementFromString(t)
if e == nil {
rps.settings[collection] = rp
}
return e
}
func (rps *ReplicaPlacements) Marshal() string {
buf, _ := json.Marshal(rps.settings)
return string(buf)
}

24
go/storage/store.go

@ -106,11 +106,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, ttlString string) error {
ttl, e := ReadTTL(ttlString)
if e != nil {
return e
@ -122,7 +118,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
e = s.addVolume(VolumeId(id), collection, needleMapKind, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -134,7 +130,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, ttl); err != nil {
e = err
}
}
@ -183,14 +179,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s ttl:%v",
location.Directory, vid, collection, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@ -213,9 +209,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
glog.V(0).Infof("data file %s, v=%d size=%d ttl=%s", l.Directory+"/"+name, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
@ -234,7 +230,6 @@ func (s *Store) Status() []*VolumeInfo {
Id: VolumeId(k),
Size: v.ContentSize(),
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
@ -281,7 +276,6 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.readOnly),
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
}

21
go/storage/volume.go

@ -28,9 +28,9 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.SuperBlock = SuperBlock{Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind)
return
@ -87,7 +87,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}
if v.ReplicaPlacement == nil {
if v.version == NoneVersion {
e = v.readSuperBlock()
} else {
e = v.maybeWriteSuperBlock()
@ -145,10 +145,6 @@ func (v *Volume) Close() {
_ = v.dataFile.Close()
}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
func (v *Volume) isFileUnchanged(n *Needle) bool {
@ -427,17 +423,6 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
return false
}
func (v *Volume) SetReplica(replica *ReplicaPlacement) error {
if replica == nil {
replica, _ = NewReplicaPlacementFromString("000")
}
if v.ReplicaPlacement.String() == replica.String() {
return nil
}
v.ReplicaPlacement = replica
return v.writeSuperBlock()
}
func (v *Volume) SetReadOnly(isReadOnly bool) error {
if isReadOnly == false {
if fi, e := v.dataFile.Stat(); e != nil {

10
go/storage/volume_info.go

@ -10,7 +10,6 @@ import (
type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
Collection string
Version Version
@ -31,18 +30,13 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
ReadOnly: *m.ReadOnly,
Version: Version(*m.Version),
}
rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
if e != nil {
return vi, e
}
vi.ReplicaPlacement = rp
vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
func (vi VolumeInfo) String() string {
return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
return fmt.Sprintf("Id:%d, Size:%d, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
}
/*VolumesInfo sorting*/

8
go/storage/volume_super_block.go

@ -15,14 +15,13 @@ const (
/*
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc (Deprecated!)
* Byte 2 and byte 3: Time to live. See TTL for definition
* Byte 4 and byte 5: The number of times the volume has been compacted.
* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
Ttl *TTL
CompactRevision uint16
}
@ -33,7 +32,7 @@ func (s *SuperBlock) Version() Version {
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
header[1] = 0
s.Ttl.ToBytes(header[2:4])
util.Uint16toBytes(header[4:6], s.CompactRevision)
return header
@ -83,9 +82,6 @@ func (v *Volume) writeSuperBlock() (err error) {
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
superBlock.Ttl = LoadTTLFromBytes(header[2:4])
superBlock.CompactRevision = util.BytesToUint16(header[4:6])
return

1
go/storage/volume_sync.go

@ -169,7 +169,6 @@ func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.CompactRevision = v.SuperBlock.CompactRevision
syncStatus.Ttl = v.SuperBlock.Ttl.String()
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
return syncStatus
}

1
go/storage/volume_version.go

@ -3,6 +3,7 @@ package storage
type Version uint8
const (
NoneVersion = Version(0)
Version1 = Version(1)
Version2 = Version(2)
CurrentVersion = Version2

1
go/topology/allocate_volume.go

@ -18,7 +18,6 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {

9
go/topology/collection.go

@ -10,10 +10,11 @@ import (
type Collection struct {
Name string
volumeSizeLimit uint64
rp *storage.ReplicaPlacement
storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
@ -23,13 +24,13 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
keyString := rp.String()
func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout {
keyString := ""
if ttl != nil {
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
}

14
go/topology/data_node.go

@ -31,9 +31,8 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool) {
optionChanged = false
if v1, ok := dn.volumes[v.Id]; !ok {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo){
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
if !v.ReadOnly {
@ -41,13 +40,12 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (optionChanged bool)
}
dn.UpAdjustMaxVolumeId(v.Id)
} else {
optionChanged = !v1.Ttl.Equal(v.Ttl) || v1.Collection != v.Collection || !v1.ReplicaPlacement.Equal(v.ReplicaPlacement)
dn.volumes[v.Id] = v
}
return
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDeleteVolumes []storage.VolumeInfo) {
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
@ -56,15 +54,13 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (needToDel
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
needToDeleteVolumes = append(needToDeleteVolumes, v)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
} //TODO: adjust max volume id, if need to reclaim volume ids
for _, v := range actualVolumes {
if dn.AddOrUpdateVolume(v) {
needToDeleteVolumes = append(needToDeleteVolumes, v)
}
dn.AddOrUpdateVolume(v)
}
return
}

50
go/topology/store_replicate.go

@ -20,31 +20,25 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId)
if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
} else if ret > 0 {
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
} else {
} else if ret <= 0 {
errorStatus = "Failed to write to local disk"
}
if !needToReplicate && ret > 0 {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
_, err := operation.Upload(
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err == nil
}) {
ret = 0
errorStatus = "Failed to write to replicas for volume " + volumeId.String()
}
//send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
_, err := operation.Upload(
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err == nil
}) {
ret = 0
errorStatus = "Failed to write to replicas for volume " + volumeId.String()
}
}
size = ret
return
}
@ -61,18 +55,12 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
glog.V(0).Infoln("delete error:", err)
return
}
needToReplicate := !store.HasVolume(volumeId)
if !needToReplicate && ret > 0 {
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) {
ret = 0
}
//send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) {
ret = 0
}
}
return

29
go/topology/topology.go

@ -28,12 +28,14 @@ type Topology struct {
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
ReplicaPlacements *storage.ReplicaPlacements
configuration *Configuration
RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@ -42,6 +44,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.ReplicaPlacements = rp
t.Sequence = seq
@ -111,12 +114,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := t.GetVolumeLayout(option.Collection, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
vid, count, dataNodes, err := t.GetVolumeLayout(option.Collection, option.Ttl).PickForWrite(count, option)
if err != nil || dataNodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes available!")
}
@ -124,10 +127,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, dataNodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
return NewCollection(collectionName, t.ReplicaPlacements.Get(collectionName), t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(ttl)
}
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
@ -140,11 +143,11 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
@ -168,15 +171,13 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
}
}
// If volume options(replica placement, ttl or collection) have changed,
// we need update its volume layout.
needToDeleteVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range needToDeleteVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}

6
go/topology/topology_event_handling.go

@ -42,7 +42,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@ -56,7 +56,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@ -66,7 +66,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}

1
go/topology/volume_growth.go

@ -195,7 +195,6 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
Version: storage.CurrentVersion,
}

1
go/topology/volume_growth_test.go

@ -80,6 +80,7 @@ func setup(topologyLayout string) *Topology {
//need to connect all nodes first before server adding volumes
topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
storage.NewReplicaPlacements("000"),
sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())

5
go/topology/volume_layout.go

@ -42,7 +42,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length())
//TODO
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.AddToWritable(v.Id)
} else {
@ -53,7 +54,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
//TODO only delete data node from locations?
vl.removeFromWritable(v.Id)
delete(vl.vid2location, v.Id)
}

7
go/topology/volume_location_list.go

@ -2,6 +2,8 @@ package topology
import (
"fmt"
"github.com/chrislusf/seaweedfs/go/storage"
)
type VolumeLocationList struct {
@ -25,6 +27,11 @@ func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) {
//TODO CalcReplicaPlacement
return nil
}
func (dnll *VolumeLocationList) Set(loc *DataNode) {
for i := 0; i < len(dnll.list); i++ {
if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {

17
go/util/concurrent_read_map.go

@ -7,9 +7,8 @@ import (
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
rmutex sync.RWMutex
mutex sync.Mutex
Items map[string]interface{}
rwmutex sync.RWMutex
Items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
@ -17,8 +16,8 @@ func NewConcurrentReadMap() *ConcurrentReadMap {
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
if value, ok := m.Items[key]; ok {
return value
}
@ -28,11 +27,11 @@ func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rmutex.RLock()
if value, ok := m.Items[key]; ok {
m.rmutex.RUnlock()
m.rwmutex.RLock()
value, ok := m.Items[key]
m.rwmutex.RUnlock()
if ok {
return value
}
m.rmutex.RUnlock()
return m.initMapEntry(key, newEntry)
}

7
go/weed/backup.go

@ -69,13 +69,8 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
return true
}
replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
if err != nil {
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
return true
}
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl)
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, ttl)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true

2
go/weed/compact.go

@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil)
storage.NeedleMapInMemory, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}

6
go/weed/weed_server/master_server.go

@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/sequence"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/topology"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/gorilla/mux"
@ -51,8 +52,8 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
var e error
if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
if ms.Topo, e = topology.NewTopology("topo", confFile, storage.NewReplicaPlacements(defaultReplicaPlacement),
seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}
ms.vg = topology.NewDefaultVolumeGrowth()
@ -71,7 +72,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/replica/set", ms.proxyToLeader(ms.guard.WhiteList(ms.setReplicaHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))

19
go/weed/weed_server/master_server_handlers_admin.go

@ -160,7 +160,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@ -188,23 +188,6 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
return volumeGrowOption, nil
}
//only proxy to each volume server
func (ms *MasterServer) setReplicaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
replicationValue := r.FormValue("replication")
if _, e := storage.NewReplicaPlacementFromString(replicationValue); e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
all, _ := strconv.ParseBool(r.FormValue("all"))
if !all && len(r.Form["volume"]) == 0 && len(r.Form["collection"]) == 0 {
writeJsonError(w, r, http.StatusBadRequest, errors.New("No available agrs found."))
return
}
result := ms.batchSetVolumeOption("replication", replicationValue, r.Form["volume"], r.Form["collection"])
writeJson(w, r, http.StatusOK, result)
}
func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, volumes, collections []string) (result map[string]interface{}) {
forms := url.Values{}
forms.Set("key", settingKey)

17
go/weed/weed_server/volume_server_handlers_admin.go

@ -26,7 +26,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl"))
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
@ -82,21 +82,6 @@ func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Re
}
return nil
}
} else if key == "replication" {
replica, e := storage.NewReplicaPlacementFromString(r.FormValue(value))
if e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
setter = func(v *storage.Volume) error {
if e := v.SetReplica(replica); e != nil {
errs = append(errs, VolumeOptError{
Volume: v.Id.String(),
Err: e.Error(),
})
}
return nil
}
} else {
writeJsonError(w, r, http.StatusBadRequest, errors.New("Unkonw setting: "+key))
return

Loading…
Cancel
Save