You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
311 lines
9.5 KiB
311 lines
9.5 KiB
package maintenance
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// PendingOperationType represents the type of pending operation
|
|
type PendingOperationType string
|
|
|
|
const (
|
|
OpTypeVolumeMove PendingOperationType = "volume_move"
|
|
OpTypeVolumeBalance PendingOperationType = "volume_balance"
|
|
OpTypeErasureCoding PendingOperationType = "erasure_coding"
|
|
OpTypeVacuum PendingOperationType = "vacuum"
|
|
OpTypeReplication PendingOperationType = "replication"
|
|
)
|
|
|
|
// PendingOperation represents a pending volume/shard operation
|
|
type PendingOperation struct {
|
|
VolumeID uint32 `json:"volume_id"`
|
|
OperationType PendingOperationType `json:"operation_type"`
|
|
SourceNode string `json:"source_node"`
|
|
DestNode string `json:"dest_node,omitempty"` // Empty for non-movement operations
|
|
TaskID string `json:"task_id"`
|
|
StartTime time.Time `json:"start_time"`
|
|
EstimatedSize uint64 `json:"estimated_size"` // Bytes
|
|
Collection string `json:"collection"`
|
|
Status string `json:"status"` // "assigned", "in_progress", "completing"
|
|
}
|
|
|
|
// PendingOperations tracks all pending volume/shard operations
|
|
type PendingOperations struct {
|
|
// Operations by volume ID for conflict detection
|
|
byVolumeID map[uint32]*PendingOperation
|
|
|
|
// Operations by task ID for updates
|
|
byTaskID map[string]*PendingOperation
|
|
|
|
// Operations by node for capacity calculations
|
|
bySourceNode map[string][]*PendingOperation
|
|
byDestNode map[string][]*PendingOperation
|
|
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewPendingOperations creates a new pending operations tracker
|
|
func NewPendingOperations() *PendingOperations {
|
|
return &PendingOperations{
|
|
byVolumeID: make(map[uint32]*PendingOperation),
|
|
byTaskID: make(map[string]*PendingOperation),
|
|
bySourceNode: make(map[string][]*PendingOperation),
|
|
byDestNode: make(map[string][]*PendingOperation),
|
|
}
|
|
}
|
|
|
|
// AddOperation adds a pending operation
|
|
func (po *PendingOperations) AddOperation(op *PendingOperation) {
|
|
po.mutex.Lock()
|
|
defer po.mutex.Unlock()
|
|
|
|
// Check for existing operation on this volume
|
|
if existing, exists := po.byVolumeID[op.VolumeID]; exists {
|
|
glog.V(1).Infof("Replacing existing pending operation on volume %d: %s -> %s",
|
|
op.VolumeID, existing.TaskID, op.TaskID)
|
|
po.removeOperationUnlocked(existing)
|
|
}
|
|
|
|
// Add new operation
|
|
po.byVolumeID[op.VolumeID] = op
|
|
po.byTaskID[op.TaskID] = op
|
|
|
|
// Add to node indexes
|
|
po.bySourceNode[op.SourceNode] = append(po.bySourceNode[op.SourceNode], op)
|
|
if op.DestNode != "" {
|
|
po.byDestNode[op.DestNode] = append(po.byDestNode[op.DestNode], op)
|
|
}
|
|
|
|
glog.V(2).Infof("Added pending operation: volume %d, type %s, task %s, %s -> %s",
|
|
op.VolumeID, op.OperationType, op.TaskID, op.SourceNode, op.DestNode)
|
|
}
|
|
|
|
// RemoveOperation removes a completed operation
|
|
func (po *PendingOperations) RemoveOperation(taskID string) {
|
|
po.mutex.Lock()
|
|
defer po.mutex.Unlock()
|
|
|
|
if op, exists := po.byTaskID[taskID]; exists {
|
|
po.removeOperationUnlocked(op)
|
|
glog.V(2).Infof("Removed completed operation: volume %d, task %s", op.VolumeID, taskID)
|
|
}
|
|
}
|
|
|
|
// removeOperationUnlocked removes an operation (must hold lock)
|
|
func (po *PendingOperations) removeOperationUnlocked(op *PendingOperation) {
|
|
delete(po.byVolumeID, op.VolumeID)
|
|
delete(po.byTaskID, op.TaskID)
|
|
|
|
// Remove from source node list
|
|
if ops, exists := po.bySourceNode[op.SourceNode]; exists {
|
|
for i, other := range ops {
|
|
if other.TaskID == op.TaskID {
|
|
po.bySourceNode[op.SourceNode] = append(ops[:i], ops[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove from dest node list
|
|
if op.DestNode != "" {
|
|
if ops, exists := po.byDestNode[op.DestNode]; exists {
|
|
for i, other := range ops {
|
|
if other.TaskID == op.TaskID {
|
|
po.byDestNode[op.DestNode] = append(ops[:i], ops[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// HasPendingOperationOnVolume checks if a volume has a pending operation
|
|
func (po *PendingOperations) HasPendingOperationOnVolume(volumeID uint32) bool {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
_, exists := po.byVolumeID[volumeID]
|
|
return exists
|
|
}
|
|
|
|
// GetPendingOperationOnVolume returns the pending operation on a volume
|
|
func (po *PendingOperations) GetPendingOperationOnVolume(volumeID uint32) *PendingOperation {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
return po.byVolumeID[volumeID]
|
|
}
|
|
|
|
// WouldConflictWithPending checks if a new operation would conflict with pending ones
|
|
func (po *PendingOperations) WouldConflictWithPending(volumeID uint32, opType PendingOperationType) bool {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
if existing, exists := po.byVolumeID[volumeID]; exists {
|
|
// Volume already has a pending operation
|
|
glog.V(3).Infof("Volume %d conflict: already has %s operation (task %s)",
|
|
volumeID, existing.OperationType, existing.TaskID)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetPendingCapacityImpactForNode calculates pending capacity changes for a node
|
|
func (po *PendingOperations) GetPendingCapacityImpactForNode(nodeID string) (incoming uint64, outgoing uint64) {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
// Calculate outgoing capacity (volumes leaving this node)
|
|
if ops, exists := po.bySourceNode[nodeID]; exists {
|
|
for _, op := range ops {
|
|
// Only count movement operations
|
|
if op.DestNode != "" {
|
|
outgoing += op.EstimatedSize
|
|
}
|
|
}
|
|
}
|
|
|
|
// Calculate incoming capacity (volumes coming to this node)
|
|
if ops, exists := po.byDestNode[nodeID]; exists {
|
|
for _, op := range ops {
|
|
incoming += op.EstimatedSize
|
|
}
|
|
}
|
|
|
|
return incoming, outgoing
|
|
}
|
|
|
|
// FilterVolumeMetricsExcludingPending filters out volumes with pending operations
|
|
func (po *PendingOperations) FilterVolumeMetricsExcludingPending(metrics []*types.VolumeHealthMetrics) []*types.VolumeHealthMetrics {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
var filtered []*types.VolumeHealthMetrics
|
|
excludedCount := 0
|
|
|
|
for _, metric := range metrics {
|
|
if _, hasPending := po.byVolumeID[metric.VolumeID]; !hasPending {
|
|
filtered = append(filtered, metric)
|
|
} else {
|
|
excludedCount++
|
|
glog.V(3).Infof("Excluding volume %d from scan due to pending operation", metric.VolumeID)
|
|
}
|
|
}
|
|
|
|
if excludedCount > 0 {
|
|
glog.V(1).Infof("Filtered out %d volumes with pending operations from %d total volumes",
|
|
excludedCount, len(metrics))
|
|
}
|
|
|
|
return filtered
|
|
}
|
|
|
|
// GetNodeCapacityProjection calculates projected capacity for a node
|
|
func (po *PendingOperations) GetNodeCapacityProjection(nodeID string, currentUsed uint64, totalCapacity uint64) NodeCapacityProjection {
|
|
incoming, outgoing := po.GetPendingCapacityImpactForNode(nodeID)
|
|
|
|
projectedUsed := currentUsed + incoming - outgoing
|
|
projectedFree := totalCapacity - projectedUsed
|
|
|
|
return NodeCapacityProjection{
|
|
NodeID: nodeID,
|
|
CurrentUsed: currentUsed,
|
|
TotalCapacity: totalCapacity,
|
|
PendingIncoming: incoming,
|
|
PendingOutgoing: outgoing,
|
|
ProjectedUsed: projectedUsed,
|
|
ProjectedFree: projectedFree,
|
|
}
|
|
}
|
|
|
|
// GetAllPendingOperations returns all pending operations
|
|
func (po *PendingOperations) GetAllPendingOperations() []*PendingOperation {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
var operations []*PendingOperation
|
|
for _, op := range po.byVolumeID {
|
|
operations = append(operations, op)
|
|
}
|
|
|
|
return operations
|
|
}
|
|
|
|
// UpdateOperationStatus updates the status of a pending operation
|
|
func (po *PendingOperations) UpdateOperationStatus(taskID string, status string) {
|
|
po.mutex.Lock()
|
|
defer po.mutex.Unlock()
|
|
|
|
if op, exists := po.byTaskID[taskID]; exists {
|
|
op.Status = status
|
|
glog.V(3).Infof("Updated operation status: task %s, volume %d -> %s", taskID, op.VolumeID, status)
|
|
}
|
|
}
|
|
|
|
// CleanupStaleOperations removes operations that have been running too long
|
|
func (po *PendingOperations) CleanupStaleOperations(maxAge time.Duration) int {
|
|
po.mutex.Lock()
|
|
defer po.mutex.Unlock()
|
|
|
|
cutoff := time.Now().Add(-maxAge)
|
|
var staleOps []*PendingOperation
|
|
|
|
for _, op := range po.byVolumeID {
|
|
if op.StartTime.Before(cutoff) {
|
|
staleOps = append(staleOps, op)
|
|
}
|
|
}
|
|
|
|
for _, op := range staleOps {
|
|
po.removeOperationUnlocked(op)
|
|
glog.Warningf("Removed stale pending operation: volume %d, task %s, age %v",
|
|
op.VolumeID, op.TaskID, time.Since(op.StartTime))
|
|
}
|
|
|
|
return len(staleOps)
|
|
}
|
|
|
|
// NodeCapacityProjection represents projected capacity for a node
|
|
type NodeCapacityProjection struct {
|
|
NodeID string `json:"node_id"`
|
|
CurrentUsed uint64 `json:"current_used"`
|
|
TotalCapacity uint64 `json:"total_capacity"`
|
|
PendingIncoming uint64 `json:"pending_incoming"`
|
|
PendingOutgoing uint64 `json:"pending_outgoing"`
|
|
ProjectedUsed uint64 `json:"projected_used"`
|
|
ProjectedFree uint64 `json:"projected_free"`
|
|
}
|
|
|
|
// GetStats returns statistics about pending operations
|
|
func (po *PendingOperations) GetStats() PendingOperationsStats {
|
|
po.mutex.RLock()
|
|
defer po.mutex.RUnlock()
|
|
|
|
stats := PendingOperationsStats{
|
|
TotalOperations: len(po.byVolumeID),
|
|
ByType: make(map[PendingOperationType]int),
|
|
ByStatus: make(map[string]int),
|
|
}
|
|
|
|
var totalSize uint64
|
|
for _, op := range po.byVolumeID {
|
|
stats.ByType[op.OperationType]++
|
|
stats.ByStatus[op.Status]++
|
|
totalSize += op.EstimatedSize
|
|
}
|
|
|
|
stats.TotalEstimatedSize = totalSize
|
|
return stats
|
|
}
|
|
|
|
// PendingOperationsStats provides statistics about pending operations
|
|
type PendingOperationsStats struct {
|
|
TotalOperations int `json:"total_operations"`
|
|
ByType map[PendingOperationType]int `json:"by_type"`
|
|
ByStatus map[string]int `json:"by_status"`
|
|
TotalEstimatedSize uint64 `json:"total_estimated_size"`
|
|
}
|