From 7b2e53461fb27448a9a83cd9a935b7ce4634680a Mon Sep 17 00:00:00 2001 From: tnextday Date: Fri, 25 Dec 2015 12:30:23 +0800 Subject: [PATCH] Add `CollectionSetttings` util --- go/proto/system_message.proto | 4 +- go/storage/collection_settings.go | 68 ++++++++++++++++++++++++++++ go/storage/needle_read_write.go | 2 +- go/storage/replica_placement.go | 43 ------------------ go/topology/topology.go | 8 ++-- go/topology/topology_replicate.go | 4 +- go/topology/volume_growth_test.go | 2 +- go/weed/weed_server/master_server.go | 3 +- 8 files changed, 80 insertions(+), 54 deletions(-) create mode 100644 go/storage/collection_settings.go diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index dc429f0c6..b7a2456f6 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -29,8 +29,8 @@ message JoinMessage { message CollectionSetting { optional string collection = 1; - required string replica_placement = 2; - optional string vacuum_garbage_threshold = 3; + optional string replica_placement = 2; + optional float vacuum_garbage_threshold = 3; } message GlobalSetting { diff --git a/go/storage/collection_settings.go b/go/storage/collection_settings.go new file mode 100644 index 000000000..89fc89d28 --- /dev/null +++ b/go/storage/collection_settings.go @@ -0,0 +1,68 @@ +package storage + +type SettingKey int + +const ( + KeyReplicatePlacement SettingKey = iota + KeyGarbageThreshold +) + +type CollectionSettings struct { + settings map[string]map[SettingKey]interface{} +} + +func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings { + rp, e := NewReplicaPlacementFromString(defaultReplicatePlacement) + if e != nil { + rp, _ = NewReplicaPlacementFromString("000") + } + c := &CollectionSettings{ + settings: make(map[string]map[SettingKey]interface{}), + } + c.Set("", KeyReplicatePlacement, rp) + c.Set("", KeyGarbageThreshold, defaultGarbageThreshold) + return c +} + +func (c *CollectionSettings) Get(collection string, key SettingKey) interface{} { + if m, ok := c.settings[collection]; ok { + if v, ok := m[key]; ok { + return v + } + } + if m, ok := c.settings[""]; ok { + if v, ok := m[key]; ok { + return v + } + } + return nil +} + +func (c *CollectionSettings) Set(collection string, key SettingKey, value interface{}) { + if _, ok := c.settings[collection]; !ok { + c.settings[collection] = make(map[SettingKey]interface{}) + } + if value == nil { + delete(c.settings[collection], key) + } +} + +func (c *CollectionSettings) GetGarbageThreshold(collection string) float32 { + return c.Get(collection, KeyGarbageThreshold).(float32) +} + +func (c *CollectionSettings) SetGarbageThreshold(collection string, gt float32) { + c.Set(collection, KeyGarbageThreshold, gt) +} + +func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { + return c.Get(collection, KeyReplicatePlacement).(*ReplicaPlacement) +} + +func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error { + rp, e := NewReplicaPlacementFromString(t) + if e == nil { + c.Set(collection, KeyReplicatePlacement, rp) + } + return e +} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index df4989329..073b2899a 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -16,7 +16,7 @@ const ( FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 FlagHasTtl = 0x10 - FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle + FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle FlagIsChunkManifest = 0x80 LastModifiedBytesLength = 5 TtlBytesLength = 2 diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index e53bffba8..31f8f464a 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -1,7 +1,6 @@ package storage import ( - "encoding/json" "errors" "fmt" ) @@ -12,10 +11,6 @@ type ReplicaPlacement struct { DiffDataCenterCount int } -type ReplicaPlacements struct { - settings map[string]*ReplicaPlacement -} - func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { rp := &ReplicaPlacement{} for i, c := range t { @@ -62,41 +57,3 @@ func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool { rp.DiffRackCount == rp1.DiffRackCount && rp.DiffDataCenterCount == rp1.DiffDataCenterCount } - -func NewReplicaPlacements(defaultRP string) *ReplicaPlacements { - rp, e := NewReplicaPlacementFromString(defaultRP) - if e != nil { - rp, _ = NewReplicaPlacementFromString("000") - } - rps := &ReplicaPlacements{settings: make(map[string]*ReplicaPlacement)} - rps.settings[""] = rp - return rps -} - -func NewReplicaPlacementsFromJson(s string) *ReplicaPlacements { - m := make(map[string]*ReplicaPlacement) - if json.Unmarshal([]byte(s), m) == nil { - m[""], _ = NewReplicaPlacementFromString("000") - } - return &ReplicaPlacements{settings: m} -} - -func (rps *ReplicaPlacements) Get(collection string) *ReplicaPlacement { - if rp, ok := rps.settings[collection]; ok { - return rp - } - return rps.settings[""] -} - -func (rps *ReplicaPlacements) Set(collection, t string) error { - rp, e := NewReplicaPlacementFromString(t) - if e == nil { - rps.settings[collection] = rp - } - return e -} - -func (rps *ReplicaPlacements) Marshal() string { - buf, _ := json.Marshal(rps.settings) - return string(buf) -} diff --git a/go/topology/topology.go b/go/topology/topology.go index b0324d73a..6cdd1e1fa 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -28,14 +28,14 @@ type Topology struct { chanRecoveredDataNodes chan *DataNode chanFullVolumes chan storage.VolumeInfo - ReplicaPlacements *storage.ReplicaPlacements + CollectionSettings *storage.CollectionSettings configuration *Configuration RaftServer raft.Server } -func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { +func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -44,7 +44,7 @@ func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit - t.ReplicaPlacements = rp + t.CollectionSettings = cs t.Sequence = seq @@ -129,7 +129,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.ReplicaPlacements.Get(collectionName), t.volumeSizeLimit) + return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(ttl) } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 8c9281390..dbb7d490a 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -2,8 +2,8 @@ package topology import "github.com/chrislusf/seaweedfs/go/glog" -func (t *Topology) Replicate(garbageThreshold string) int { - glog.V(0).Infoln("Start replicate on demand") +func (t *Topology) Replicate() int { + glog.V(0).Infoln("Start replicate checker on demand") for _, col := range t.collectionMap.Items { c := col.(*Collection) glog.V(0).Infoln("replicate on collection:", c.Name) diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index df464e47e..08377b4fd 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -80,7 +80,7 @@ func setup(topologyLayout string) *Topology { //need to connect all nodes first before server adding volumes topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", - storage.NewReplicaPlacements("000"), + storage.NewCollectionSettings("000", "0.3"), sequence.NewMemorySequencer(), 32*1024, 5) if err != nil { panic("error: " + err.Error()) diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 37fb44c74..1adb8820e 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -51,8 +51,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() + cs := storage.NewCollectionSettings(defaultReplicaPlacement, garbageThreshold) var e error - if ms.Topo, e = topology.NewTopology("topo", confFile, storage.NewReplicaPlacements(defaultReplicaPlacement), + if ms.Topo, e = topology.NewTopology("topo", confFile, cs, seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { glog.Fatalf("cannot create topology:%s", e) }