Browse Source

Add `CollectionSetttings` util

pull/279/head
tnextday 10 years ago
parent
commit
7b2e53461f
  1. 4
      go/proto/system_message.proto
  2. 68
      go/storage/collection_settings.go
  3. 2
      go/storage/needle_read_write.go
  4. 43
      go/storage/replica_placement.go
  5. 8
      go/topology/topology.go
  6. 4
      go/topology/topology_replicate.go
  7. 2
      go/topology/volume_growth_test.go
  8. 3
      go/weed/weed_server/master_server.go

4
go/proto/system_message.proto

@ -29,8 +29,8 @@ message JoinMessage {
message CollectionSetting {
optional string collection = 1;
required string replica_placement = 2;
optional string vacuum_garbage_threshold = 3;
optional string replica_placement = 2;
optional float vacuum_garbage_threshold = 3;
}
message GlobalSetting {

68
go/storage/collection_settings.go

@ -0,0 +1,68 @@
package storage
type SettingKey int
const (
KeyReplicatePlacement SettingKey = iota
KeyGarbageThreshold
)
type CollectionSettings struct {
settings map[string]map[SettingKey]interface{}
}
func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings {
rp, e := NewReplicaPlacementFromString(defaultReplicatePlacement)
if e != nil {
rp, _ = NewReplicaPlacementFromString("000")
}
c := &CollectionSettings{
settings: make(map[string]map[SettingKey]interface{}),
}
c.Set("", KeyReplicatePlacement, rp)
c.Set("", KeyGarbageThreshold, defaultGarbageThreshold)
return c
}
func (c *CollectionSettings) Get(collection string, key SettingKey) interface{} {
if m, ok := c.settings[collection]; ok {
if v, ok := m[key]; ok {
return v
}
}
if m, ok := c.settings[""]; ok {
if v, ok := m[key]; ok {
return v
}
}
return nil
}
func (c *CollectionSettings) Set(collection string, key SettingKey, value interface{}) {
if _, ok := c.settings[collection]; !ok {
c.settings[collection] = make(map[SettingKey]interface{})
}
if value == nil {
delete(c.settings[collection], key)
}
}
func (c *CollectionSettings) GetGarbageThreshold(collection string) float32 {
return c.Get(collection, KeyGarbageThreshold).(float32)
}
func (c *CollectionSettings) SetGarbageThreshold(collection string, gt float32) {
c.Set(collection, KeyGarbageThreshold, gt)
}
func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement {
return c.Get(collection, KeyReplicatePlacement).(*ReplicaPlacement)
}
func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error {
rp, e := NewReplicaPlacementFromString(t)
if e == nil {
c.Set(collection, KeyReplicatePlacement, rp)
}
return e
}

2
go/storage/needle_read_write.go

@ -16,7 +16,7 @@ const (
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10
FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle
FlagIsExtendNeedle = 0x40 // TODO: Reserve flag, use extent file to save big needle
FlagIsChunkManifest = 0x80
LastModifiedBytesLength = 5
TtlBytesLength = 2

43
go/storage/replica_placement.go

@ -1,7 +1,6 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
)
@ -12,10 +11,6 @@ type ReplicaPlacement struct {
DiffDataCenterCount int
}
type ReplicaPlacements struct {
settings map[string]*ReplicaPlacement
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
rp := &ReplicaPlacement{}
for i, c := range t {
@ -62,41 +57,3 @@ func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool {
rp.DiffRackCount == rp1.DiffRackCount &&
rp.DiffDataCenterCount == rp1.DiffDataCenterCount
}
func NewReplicaPlacements(defaultRP string) *ReplicaPlacements {
rp, e := NewReplicaPlacementFromString(defaultRP)
if e != nil {
rp, _ = NewReplicaPlacementFromString("000")
}
rps := &ReplicaPlacements{settings: make(map[string]*ReplicaPlacement)}
rps.settings[""] = rp
return rps
}
func NewReplicaPlacementsFromJson(s string) *ReplicaPlacements {
m := make(map[string]*ReplicaPlacement)
if json.Unmarshal([]byte(s), m) == nil {
m[""], _ = NewReplicaPlacementFromString("000")
}
return &ReplicaPlacements{settings: m}
}
func (rps *ReplicaPlacements) Get(collection string) *ReplicaPlacement {
if rp, ok := rps.settings[collection]; ok {
return rp
}
return rps.settings[""]
}
func (rps *ReplicaPlacements) Set(collection, t string) error {
rp, e := NewReplicaPlacementFromString(t)
if e == nil {
rps.settings[collection] = rp
}
return e
}
func (rps *ReplicaPlacements) Marshal() string {
buf, _ := json.Marshal(rps.settings)
return string(buf)
}

8
go/topology/topology.go

@ -28,14 +28,14 @@ type Topology struct {
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
ReplicaPlacements *storage.ReplicaPlacements
CollectionSettings *storage.CollectionSettings
configuration *Configuration
RaftServer raft.Server
}
func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@ -44,7 +44,7 @@ func NewTopology(id string, confFile string, rp *storage.ReplicaPlacements, seq
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.ReplicaPlacements = rp
t.CollectionSettings = cs
t.Sequence = seq
@ -129,7 +129,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.ReplicaPlacements.Get(collectionName), t.volumeSizeLimit)
return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(ttl)
}

4
go/topology/topology_replicate.go

@ -2,8 +2,8 @@ package topology
import "github.com/chrislusf/seaweedfs/go/glog"
func (t *Topology) Replicate(garbageThreshold string) int {
glog.V(0).Infoln("Start replicate on demand")
func (t *Topology) Replicate() int {
glog.V(0).Infoln("Start replicate checker on demand")
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
glog.V(0).Infoln("replicate on collection:", c.Name)

2
go/topology/volume_growth_test.go

@ -80,7 +80,7 @@ func setup(topologyLayout string) *Topology {
//need to connect all nodes first before server adding volumes
topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
storage.NewReplicaPlacements("000"),
storage.NewCollectionSettings("000", "0.3"),
sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())

3
go/weed/weed_server/master_server.go

@ -51,8 +51,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
cs := storage.NewCollectionSettings(defaultReplicaPlacement, garbageThreshold)
var e error
if ms.Topo, e = topology.NewTopology("topo", confFile, storage.NewReplicaPlacements(defaultReplicaPlacement),
if ms.Topo, e = topology.NewTopology("topo", confFile, cs,
seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}

Loading…
Cancel
Save