You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

441 lines
15 KiB

package topology
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
// EcVolumeGenerationKey represents a unique key for EC volume with generation
type EcVolumeGenerationKey struct {
VolumeId needle.VolumeId
Generation uint32
}
func (k EcVolumeGenerationKey) String() string {
return fmt.Sprintf("v%d-g%d", k.VolumeId, k.Generation)
}
type EcShardLocations struct {
Collection string
Generation uint32 // generation of this set of shard locations
Locations [erasure_coding.TotalShardsCount][]*DataNode
}
func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// convert into in memory struct storage.VolumeInfo
var shards []*erasure_coding.EcVolumeInfo
for _, shardInfo := range shardInfos {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
Generation: shardInfo.Generation, // extract generation from heartbeat
}
shards = append(shards, ecVolumeInfo)
}
// find out the delta volumes
newShards, deletedShards = dn.UpdateEcShards(shards)
for _, v := range newShards {
t.RegisterEcShards(v, dn)
}
for _, v := range deletedShards {
t.UnRegisterEcShards(v, dn)
}
return
}
func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
// convert into in memory struct storage.VolumeInfo
var newShards, deletedShards []*erasure_coding.EcVolumeInfo
for _, shardInfo := range newEcShards {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
Generation: shardInfo.Generation, // extract generation from incremental heartbeat
}
newShards = append(newShards, ecVolumeInfo)
}
for _, shardInfo := range deletedEcShards {
// Create EcVolumeInfo directly with optimized format
ecVolumeInfo := &erasure_coding.EcVolumeInfo{
VolumeId: needle.VolumeId(shardInfo.Id),
Collection: shardInfo.Collection,
ShardBits: erasure_coding.ShardBits(shardInfo.EcIndexBits),
DiskType: shardInfo.DiskType,
DiskId: shardInfo.DiskId,
ExpireAtSec: shardInfo.ExpireAtSec,
ShardSizes: shardInfo.ShardSizes,
Generation: shardInfo.Generation, // extract generation from incremental heartbeat
}
deletedShards = append(deletedShards, ecVolumeInfo)
}
dn.DeltaUpdateEcShards(newShards, deletedShards)
for _, v := range newShards {
t.RegisterEcShards(v, dn)
}
for _, v := range deletedShards {
t.UnRegisterEcShards(v, dn)
}
}
func NewEcShardLocations(collection string, generation uint32) *EcShardLocations {
return &EcShardLocations{
Collection: collection,
Generation: generation,
}
}
func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
dataNodes := loc.Locations[shardId]
for _, n := range dataNodes {
if n.Id() == dn.Id() {
return false
}
}
loc.Locations[shardId] = append(dataNodes, dn)
return true
}
func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
dataNodes := loc.Locations[shardId]
foundIndex := -1
for index, n := range dataNodes {
if n.Id() == dn.Id() {
foundIndex = index
}
}
if foundIndex < 0 {
return false
}
loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
return true
}
func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
key := EcVolumeGenerationKey{
VolumeId: ecShardInfos.VolumeId,
Generation: ecShardInfos.Generation,
}
locations, found := t.ecShardMap[key]
if !found {
locations = NewEcShardLocations(ecShardInfos.Collection, ecShardInfos.Generation)
t.ecShardMap[key] = locations
}
for _, shardId := range ecShardInfos.ShardIds() {
locations.AddShard(shardId, dn)
}
// Update active generation if this is newer or first time seeing this volume
t.ecActiveGenerationMapLock.Lock()
currentActive, exists := t.ecActiveGenerationMap[ecShardInfos.VolumeId]
if !exists || ecShardInfos.Generation >= currentActive {
t.ecActiveGenerationMap[ecShardInfos.VolumeId] = ecShardInfos.Generation
glog.V(2).Infof("Updated active generation for EC volume %d to %d", ecShardInfos.VolumeId, ecShardInfos.Generation)
}
t.ecActiveGenerationMapLock.Unlock()
}
func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
glog.Infof("removing ec shard info volume %d generation %d shards %v", ecShardInfos.VolumeId, ecShardInfos.Generation, ecShardInfos.ShardIds())
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
key := EcVolumeGenerationKey{
VolumeId: ecShardInfos.VolumeId,
Generation: ecShardInfos.Generation,
}
locations, found := t.ecShardMap[key]
if !found {
return
}
for _, shardId := range ecShardInfos.ShardIds() {
locations.DeleteShard(shardId, dn)
}
// Check if this generation is now empty and clean up if needed
isEmpty := true
for _, shardLocations := range locations.Locations {
if len(shardLocations) > 0 {
isEmpty = false
break
}
}
if isEmpty {
// Remove empty generation from map
delete(t.ecShardMap, key)
glog.V(2).Infof("Removed empty EC volume generation %d:%d", ecShardInfos.VolumeId, ecShardInfos.Generation)
// Check if this was the active generation and update if needed
t.ecActiveGenerationMapLock.Lock()
if activeGen, exists := t.ecActiveGenerationMap[ecShardInfos.VolumeId]; exists && activeGen == ecShardInfos.Generation {
// Find the highest remaining generation for this volume
maxGeneration := uint32(0)
hasRemaining := false
for otherKey := range t.ecShardMap {
if otherKey.VolumeId == ecShardInfos.VolumeId && otherKey.Generation > maxGeneration {
maxGeneration = otherKey.Generation
hasRemaining = true
}
}
if hasRemaining {
t.ecActiveGenerationMap[ecShardInfos.VolumeId] = maxGeneration
glog.V(1).Infof("Updated active generation for EC volume %d to %d after cleanup", ecShardInfos.VolumeId, maxGeneration)
} else {
delete(t.ecActiveGenerationMap, ecShardInfos.VolumeId)
glog.V(1).Infof("Removed active generation tracking for EC volume %d (no generations remain)", ecShardInfos.VolumeId)
}
}
t.ecActiveGenerationMapLock.Unlock()
}
}
func (t *Topology) LookupEcShards(vid needle.VolumeId, generation uint32) (locations *EcShardLocations, found bool) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
key := EcVolumeGenerationKey{
VolumeId: vid,
Generation: generation,
}
locations, found = t.ecShardMap[key]
return
}
func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
dateNodeMap := make(map[pb.ServerAddress]bool)
for _, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
for _, locations := range ecVolumeLocation.Locations {
for _, loc := range locations {
dateNodeMap[loc.ServerAddress()] = true
}
}
}
}
for k, _ := range dateNodeMap {
dataNodes = append(dataNodes, k)
}
return
}
func (t *Topology) DeleteEcCollection(collection string) {
t.ecShardMapLock.Lock()
defer t.ecShardMapLock.Unlock()
var keysToDelete []EcVolumeGenerationKey
var volumeIdsToDelete []needle.VolumeId
for key, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
keysToDelete = append(keysToDelete, key)
volumeIdsToDelete = append(volumeIdsToDelete, key.VolumeId)
}
}
for _, key := range keysToDelete {
delete(t.ecShardMap, key)
}
// Also clean up active generation tracking
t.ecActiveGenerationMapLock.Lock()
for _, vid := range volumeIdsToDelete {
delete(t.ecActiveGenerationMap, vid)
}
t.ecActiveGenerationMapLock.Unlock()
}
// GetEcActiveGeneration returns the current active generation for an EC volume
func (t *Topology) GetEcActiveGeneration(vid needle.VolumeId) (uint32, bool) {
t.ecActiveGenerationMapLock.RLock()
defer t.ecActiveGenerationMapLock.RUnlock()
generation, found := t.ecActiveGenerationMap[vid]
return generation, found
}
// SetEcActiveGeneration sets the active generation for an EC volume
func (t *Topology) SetEcActiveGeneration(vid needle.VolumeId, generation uint32) {
t.ecActiveGenerationMapLock.Lock()
defer t.ecActiveGenerationMapLock.Unlock()
t.ecActiveGenerationMap[vid] = generation
glog.V(1).Infof("Set active generation for EC volume %d to %d", vid, generation)
}
// ListEcVolumesWithActiveGeneration returns all EC volumes and their active generations
func (t *Topology) ListEcVolumesWithActiveGeneration() map[needle.VolumeId]uint32 {
t.ecActiveGenerationMapLock.RLock()
defer t.ecActiveGenerationMapLock.RUnlock()
result := make(map[needle.VolumeId]uint32)
for vid, generation := range t.ecActiveGenerationMap {
result[vid] = generation
}
return result
}
// LookupEcShardsWithFallback looks up EC shards for a volume with intelligent fallback
// This function provides mixed-version cluster compatibility by falling back gracefully
// If no specific generation is requested (generation == 0), it uses the active generation
// If the requested/active generation is not found, it falls back to generation 0
func (t *Topology) LookupEcShardsWithFallback(vid needle.VolumeId, requestedGeneration uint32) (locations *EcShardLocations, actualGeneration uint32, found bool) {
// Determine target generation
targetGeneration := requestedGeneration
if requestedGeneration == 0 {
// Use active generation if available (new behavior)
if activeGen, exists := t.GetEcActiveGeneration(vid); exists {
targetGeneration = activeGen
glog.V(4).Infof("LookupEcShardsWithFallback: using active generation %d for volume %d", activeGen, vid)
}
}
// Try the target generation first
if locations, found = t.LookupEcShards(vid, targetGeneration); found {
if targetGeneration != requestedGeneration {
glog.V(3).Infof("LookupEcShardsWithFallback: found volume %d generation %d (requested %d)", vid, targetGeneration, requestedGeneration)
}
return locations, targetGeneration, true
}
// If requested specific generation and not found, don't fallback for strict clients
if requestedGeneration != 0 {
glog.V(2).Infof("LookupEcShardsWithFallback: volume %d generation %d not found, no fallback for specific request", vid, requestedGeneration)
return nil, 0, false
}
// Mixed-version compatibility: fallback to generation 0 if target generation wasn't found
// This helps during rolling upgrades when some shards might not have generation info yet
if targetGeneration != 0 {
if locations, found = t.LookupEcShards(vid, 0); found {
glog.V(2).Infof("LookupEcShardsWithFallback: falling back to generation 0 for volume %d (target generation %d not found)", vid, targetGeneration)
return locations, 0, true
}
}
glog.V(2).Infof("LookupEcShardsWithFallback: volume %d not found in any generation", vid)
return nil, 0, false
}
// UpdateEcGenerationMetrics updates prometheus metrics with current EC volume generation information
func (t *Topology) UpdateEcGenerationMetrics() {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
t.ecActiveGenerationMapLock.RLock()
defer t.ecActiveGenerationMapLock.RUnlock()
// Count volumes and shards by collection, generation, and active status
volumeCountsByCollection := make(map[string]map[uint32]map[bool]int)
shardCountsByCollection := make(map[string]map[uint32]map[bool]int)
// Initialize counting maps
for key, ecShardLocs := range t.ecShardMap {
collection := ecShardLocs.Collection
generation := key.Generation
if volumeCountsByCollection[collection] == nil {
volumeCountsByCollection[collection] = make(map[uint32]map[bool]int)
}
if volumeCountsByCollection[collection][generation] == nil {
volumeCountsByCollection[collection][generation] = make(map[bool]int)
}
if shardCountsByCollection[collection] == nil {
shardCountsByCollection[collection] = make(map[uint32]map[bool]int)
}
if shardCountsByCollection[collection][generation] == nil {
shardCountsByCollection[collection][generation] = make(map[bool]int)
}
// Check if this generation is active for this volume
activeGeneration, hasActiveGen := t.ecActiveGenerationMap[key.VolumeId]
isActive := hasActiveGen && activeGeneration == generation
// Count this volume
volumeCountsByCollection[collection][generation][isActive]++
// Count shards in this volume
shardCount := len(ecShardLocs.Locations)
shardCountsByCollection[collection][generation][isActive] += shardCount
}
// Update volume metrics
for collection, generationMap := range volumeCountsByCollection {
for generation, activeMap := range generationMap {
generationLabel := fmt.Sprintf("%d", generation)
for isActive, count := range activeMap {
activeLabel := fmt.Sprintf("%t", isActive)
stats.MasterEcVolumeGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count))
}
}
}
// Update shard metrics
for collection, generationMap := range shardCountsByCollection {
for generation, activeMap := range generationMap {
generationLabel := fmt.Sprintf("%d", generation)
for isActive, count := range activeMap {
activeLabel := fmt.Sprintf("%t", isActive)
stats.MasterEcShardGenerationGauge.WithLabelValues(collection, generationLabel, activeLabel).Set(float64(count))
}
}
}
}
// ValidateEcGenerationReadiness checks if an EC generation has sufficient shards for activation
// Returns true if the generation has at least erasure_coding.DataShardsCount shards available
func (t *Topology) ValidateEcGenerationReadiness(vid needle.VolumeId, generation uint32) (ready bool, availableShards int, err error) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation}
ecLocations, found := t.ecShardMap[key]
if !found {
return false, 0, fmt.Errorf("generation %d not found for EC volume %d", generation, vid)
}
// Count available shards
availableShards = 0
for _, locations := range ecLocations.Locations {
if len(locations) > 0 {
availableShards++
}
}
// Need at least DataShardsCount shards to reconstruct data
ready = availableShards >= erasure_coding.DataShardsCount
return ready, availableShards, nil
}