You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

489 lines
15 KiB

package topology
import (
"errors"
"fmt"
"math/rand/v2"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
type NodeId string
// CapacityReservation represents a temporary reservation of capacity
type CapacityReservation struct {
reservationId string
diskType types.DiskType
count int64
createdAt time.Time
}
// CapacityReservations manages capacity reservations for a node
type CapacityReservations struct {
sync.RWMutex
reservations map[string]*CapacityReservation
reservedCounts map[types.DiskType]int64
}
func newCapacityReservations() *CapacityReservations {
return &CapacityReservations{
reservations: make(map[string]*CapacityReservation),
reservedCounts: make(map[types.DiskType]int64),
}
}
func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string {
cr.Lock()
defer cr.Unlock()
return cr.doAddReservation(diskType, count)
}
func (cr *CapacityReservations) removeReservation(reservationId string) bool {
cr.Lock()
defer cr.Unlock()
if reservation, exists := cr.reservations[reservationId]; exists {
delete(cr.reservations, reservationId)
cr.decrementCount(reservation.diskType, reservation.count)
return true
}
return false
}
func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 {
cr.RLock()
defer cr.RUnlock()
return cr.reservedCounts[diskType]
}
// decrementCount is a helper to decrement reserved count and clean up zero entries
func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
cr.reservedCounts[diskType] -= count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[diskType] <= 0 {
delete(cr.reservedCounts, diskType)
}
}
// doAddReservation is a helper to add a reservation, assuming the lock is already held
func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string {
now := time.Now()
reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64())
cr.reservations[reservationId] = &CapacityReservation{
reservationId: reservationId,
diskType: diskType,
count: count,
createdAt: now,
}
cr.reservedCounts[diskType] += count
return reservationId
}
// tryReserveAtomic atomically checks available space and reserves if possible
func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
cr.Lock()
defer cr.Unlock()
// Check available space under lock
currentReserved := cr.reservedCounts[diskType]
availableSpace := availableSpaceFunc() - currentReserved
if availableSpace >= count {
// Create and add reservation atomically
return cr.doAddReservation(diskType, count), true
}
return "", false
}
func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
cr.Lock()
defer cr.Unlock()
now := time.Now()
for id, reservation := range cr.reservations {
if now.Sub(reservation.createdAt) > expirationDuration {
delete(cr.reservations, id)
cr.decrementCount(reservation.diskType, reservation.count)
glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
}
}
}
type Node interface {
Id() NodeId
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
// Capacity reservation methods for avoiding race conditions
TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool)
ReleaseReservedCapacity(reservationId string)
AvailableSpaceForReservation(option *VolumeGrowOption) int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
IsDataNode() bool
IsRack() bool
IsDataCenter() bool
IsLocked() bool
Children() []Node
Parent() Node
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
diskUsages *DiskUsages
id NodeId
parent Node
sync.RWMutex // lock children
children map[NodeId]Node
maxVolumeId needle.VolumeId
//for rack, data center, topology
nodeType string
value interface{}
// capacity reservations to prevent race conditions during volume creation
capacityReservations *CapacityReservations
}
func (n *NodeImpl) GetDiskUsages() *DiskUsages {
return n.diskUsages
}
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
var totalWeights int64
var errs []string
n.RLock()
candidates := make([]Node, 0, len(n.children))
candidatesWeights := make([]int64, 0, len(n.children))
//pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children {
if node.AvailableSpaceFor(option) <= 0 {
continue
}
totalWeights += node.AvailableSpaceFor(option)
candidates = append(candidates, node)
candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
}
n.RUnlock()
if len(candidates) < numberOfNodes {
glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
return nil, nil, errors.New("Not enough data nodes found!")
}
//pick nodes randomly by weights, the node picked earlier has higher final weights
sortedCandidates := make([]Node, 0, len(candidates))
for i := 0; i < len(candidates); i++ {
weightsInterval := rand.Int64N(totalWeights)
lastWeights := int64(0)
for k, weights := range candidatesWeights {
if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
sortedCandidates = append(sortedCandidates, candidates[k])
candidatesWeights[k] = 0
totalWeights -= weights
break
}
lastWeights += weights
}
}
restNodes = make([]Node, 0, numberOfNodes-1)
ret := false
n.RLock()
for k, node := range sortedCandidates {
if err := filterFirstNodeFn(node); err == nil {
firstNode = node
if k >= numberOfNodes-1 {
restNodes = sortedCandidates[:numberOfNodes-1]
} else {
restNodes = append(restNodes, sortedCandidates[:k]...)
restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
}
ret = true
break
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
}
}
n.RUnlock()
if !ret {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
return
}
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) IsLocked() (isTryLock bool) {
if isTryLock = n.TryRLock(); isTryLock {
n.RUnlock()
}
return !isTryLock
}
func (n *NodeImpl) String() string {
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
}
return string(n.id)
}
func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
return n.diskUsages.getOrCreateDisk(diskType)
}
func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
t := n.getOrCreateDisk(option.DiskType)
freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
ecShardCount := atomic.LoadInt64(&t.ecShardCount)
if ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
// AvailableSpaceForReservation returns available space considering existing reservations
func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 {
baseAvailable := n.AvailableSpaceFor(option)
reservedCount := n.capacityReservations.getReservedCount(option.DiskType)
return baseAvailable - reservedCount
}
// TryReserveCapacity attempts to atomically reserve capacity for volume creation
func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
const reservationTimeout = 5 * time.Minute // TODO: make this configurable
// Clean up any expired reservations first
n.capacityReservations.cleanExpiredReservations(reservationTimeout)
// Atomically check and reserve space
option := &VolumeGrowOption{DiskType: diskType}
reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
return n.AvailableSpaceFor(option)
})
if success {
glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
}
return reservationId, success
}
// ReleaseReservedCapacity releases a previously reserved capacity
func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) {
if n.capacityReservations.removeReservation(reservationId) {
glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId)
} else {
glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId)
}
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
func (n *NodeImpl) Children() (ret []Node) {
n.RLock()
defer n.RUnlock()
for _, c := range n.children {
ret = append(ret, c)
}
return ret
}
func (n *NodeImpl) Parent() Node {
return n.parent
}
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
return n.reserveOneVolumeInternal(r, option, false)
}
// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks
func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
return n.reserveOneVolumeInternal(r, option, true)
}
func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
var freeSpace int64
if useReservations {
freeSpace = node.AvailableSpaceForReservation(option)
} else {
freeSpace = node.AvailableSpaceFor(option)
}
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
if r >= freeSpace {
r -= freeSpace
} else {
var hasSpace bool
if useReservations {
hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0
} else {
hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0
}
if hasSpace {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
dn := node.(*DataNode)
if dn.IsTerminating {
continue
}
return dn, nil
}
if useReservations {
assignedNode, err = node.ReserveOneVolumeForReservation(r, option)
} else {
assignedNode, err = node.ReserveOneVolume(r, option)
}
if err == nil {
return
}
}
}
return nil, errors.New("No free volume slot found!")
}
func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative
existingDisk := n.getOrCreateDisk(diskType)
existingDisk.addDiskUsageCounts(diskUsage)
if n.parent != nil {
n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
n.parent.UpAdjustMaxVolumeId(vid)
}
}
}
func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
return n.maxVolumeId
}
func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock()
defer n.Unlock()
n.doLinkChildNode(node)
}
func (n *NodeImpl) doLinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
for dt, du := range node.GetDiskUsages().usages {
n.UpAdjustDiskUsageDelta(dt, du)
}
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
n.Lock()
defer n.Unlock()
node := n.children[nodeId]
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
for dt, du := range node.GetDiskUsages().negative().usages {
n.UpAdjustDiskUsageDelta(dt, du)
}
glog.V(0).Infoln(n, "removes", node.Id())
}
}
func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64, volumeSizeLimit uint64, growThreshold float64) {
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
for _, v := range dn.GetVolumes() {
topo := n.GetTopology()
diskType := types.ToDiskType(v.DiskType)
vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
if v.Size >= volumeSizeLimit {
vl.accessLock.RLock()
vacuumTime, ok := vl.vacuumedVolumes[v.Id]
vl.accessLock.RUnlock()
// If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
// After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
// the volume size should be correct, not the size before the vacuum.
if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
topo.chanFullVolumes <- v
}
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
topo.chanCrowdedVolumes <- v
}
copyCount := v.ReplicaPlacement.GetCopyCount()
if copyCount > 1 {
if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
} else {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
}
}
}
}
} else {
for _, c := range n.Children() {
c.CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime, volumeSizeLimit, growThreshold)
}
}
}
func (n *NodeImpl) GetTopology() *Topology {
var p Node
p = n
for p.Parent() != nil {
p = p.Parent()
}
return p.GetValue().(*Topology)
}