diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 49a3b090b..f61f68e08 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -70,6 +70,14 @@ var ( Help: "replica placement mismatch", }, []string{"collection", "id"}) + MasterVolumeLayout = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "master", + Name: "volume_layout_total", + Help: "Number of volumes in volume layouts", + }, []string{"collection", "replica", "type"}) + MasterLeaderChangeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -259,6 +267,7 @@ func init() { Gather.MustRegister(MasterReceivedHeartbeatCounter) Gather.MustRegister(MasterLeaderChangeCounter) Gather.MustRegister(MasterReplicaPlacementMismatch) + Gather.MustRegister(MasterVolumeLayout) Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerHandlerCounter) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index c3ba65d39..a3473c677 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -3,6 +3,7 @@ package topology import ( "encoding/json" "fmt" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "testing" "github.com/seaweedfs/seaweedfs/weed/sequence" @@ -88,19 +89,35 @@ func setup(topologyLayout string) *Topology { dcMap := dcValue.(map[string]interface{}) topo.LinkChildNode(dc) for rackKey, rackValue := range dcMap { - rack := NewRack(rackKey) + dcRack := NewRack(rackKey) rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) + dc.LinkChildNode(dcRack) for serverKey, serverValue := range rackMap { server := NewDataNode(serverKey) serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) + if ip, ok := serverMap["ip"]; ok { + server.Ip = ip.(string) + } + dcRack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: needle.CurrentVersion} + Version: needle.CurrentVersion, + } + if mVal, ok := m["collection"]; ok { + vi.Collection = mVal.(string) + } + if mVal, ok := m["replication"]; ok { + rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string)) + vi.ReplicaPlacement = rp + } + if vi.ReplicaPlacement != nil { + vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType) + vl.RegisterVolume(&vi, server) + vl.setVolumeWritable(vi.Id) + } server.AddOrUpdateVolume(vi) } @@ -346,3 +363,88 @@ func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { fmt.Printf("%s : %d\n", k, v) } } + +var topologyLayout4 = ` +{ + "dc1":{ + "rack1":{ + "serverdc111":{ + "ip": "127.0.0.1", + "volumes":[ + {"id":1, "size":12312, "collection":"test", "replication":"001"}, + {"id":2, "size":12312, "collection":"test", "replication":"100"}, + {"id":4, "size":12312, "collection":"test", "replication":"100"}, + {"id":6, "size":12312, "collection":"test", "replication":"010"} + ], + "limit":100 + } + } + }, + "dc2":{ + "rack1":{ + "serverdc211":{ + "ip": "127.0.0.2", + "volumes":[ + {"id":2, "size":12312, "collection":"test", "replication":"100"}, + {"id":3, "size":12312, "collection":"test", "replication":"010"}, + {"id":5, "size":12312, "collection":"test", "replication":"001"}, + {"id":6, "size":12312, "collection":"test", "replication":"010"} + ], + "limit":100 + } + } + }, + "dc3":{ + "rack1":{ + "serverdc311":{ + "ip": "127.0.0.3", + "volumes":[ + {"id":1, "size":12312, "collection":"test", "replication":"001"}, + {"id":3, "size":12312, "collection":"test", "replication":"010"}, + {"id":4, "size":12312, "collection":"test", "replication":"100"}, + {"id":5, "size":12312, "collection":"test", "replication":"001"} + ], + "limit":100 + } + } + } +} +` + +func TestPickForWrite(t *testing.T) { + topo := setup(topologyLayout4) + volumeGrowOption := &VolumeGrowOption{ + Collection: "test", + DataCenter: "", + Rack: "", + DataNode: "", + } + for _, rpStr := range []string{"001", "010", "100"} { + rp, _ := super_block.NewReplicaPlacementFromString(rpStr) + vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType) + volumeGrowOption.ReplicaPlacement = rp + for _, dc := range []string{"", "dc1", "dc2", "dc3"} { + volumeGrowOption.DataCenter = dc + for _, r := range []string{""} { + volumeGrowOption.Rack = r + for _, dn := range []string{""} { + if dc == "" && dn != "" { + continue + } + volumeGrowOption.DataNode = dn + fileId, count, _, _, err := topo.PickForWrite(1, volumeGrowOption, vl) + if err != nil { + fmt.Println(dc, r, dn, "pick for write error :", err) + t.Fail() + } else if count == 0 { + fmt.Println(dc, r, dn, "pick for write count is zero") + t.Fail() + } else if len(fileId) == 0 { + fmt.Println(dc, r, dn, "pick for write file id is empty") + t.Fail() + } + } + } + } + } +} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 6b5d0b8da..278978292 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,6 +3,7 @@ package topology import ( "errors" "fmt" + "github.com/seaweedfs/seaweedfs/weed/stats" "math/rand" "sync" "sync/atomic" @@ -349,18 +350,21 @@ func (vl *VolumeLayout) DoneGrowRequest() { } func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool { - active, crowded := vl.GetActiveVolumeCount(option) + total, active, crowded := vl.GetActiveVolumeCount(option) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "total").Set(float64(total)) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "active").Set(float64(active)) + stats.MasterVolumeLayout.WithLabelValues(option.Collection, option.ReplicaPlacement.String(), "crowded").Set(float64(crowded)) //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) return active <= crowded } -func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) { +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, active, crowded int) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() - if option.DataCenter == "" { - return len(vl.writables), len(vl.crowded) + return len(vl.writables), len(vl.writables), len(vl.crowded) } + total = len(vl.writables) for _, v := range vl.writables { for _, dn := range vl.vid2location[v].list { if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {