diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 4c8ff5700..4ae2db030 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -63,7 +63,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, - int(heartbeat.MaxVolumeCount)) + int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 95e55a497..4f0195084 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -68,8 +68,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) + if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) } else { count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) } diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..db70c9734 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -5,6 +5,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -14,16 +15,16 @@ type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + FreeSpace() int64 + ReserveOneVolume(r int64) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustMaxVolumeId(vid storage.VolumeId) - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int + GetVolumeCount() int64 + GetActiveVolumeCount() int64 + GetMaxVolumeCount() int64 GetMaxVolumeId() storage.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -40,9 +41,9 @@ type Node interface { } type NodeImpl struct { id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int + volumeCount int64 + activeVolumeCount int64 + maxVolumeCount int64 parent Node sync.RWMutex // lock children children map[NodeId]Node @@ -126,7 +127,7 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int { +func (n *NodeImpl) FreeSpace() int64 { return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { @@ -146,7 +147,7 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { @@ -171,20 +172,20 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } @@ -200,13 +201,13 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int { +func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } -func (n *NodeImpl) GetActiveVolumeCount() int { +func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } -func (n *NodeImpl) GetMaxVolumeCount() int { +func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } diff --git a/weed/topology/rack.go b/weed/topology/rack.go index f8f8ce34a..932c1a804 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index ef39a1c01..514033ca1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -105,7 +105,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 @@ -134,7 +134,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { @@ -175,7 +175,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) + r := rand.Int63n(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -183,7 +183,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } } for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) + r := rand.Int63n(datacenter.FreeSpace()) if server, e := datacenter.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else {