Browse Source

collect deletion for ec shards

add-ec-vacuum
chrislu 4 months ago
parent
commit
0e649f710a
  1. 243
      weed/admin/maintenance/maintenance_scanner.go

243
weed/admin/maintenance/maintenance_scanner.go

@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@ -342,82 +341,216 @@ func (ms *MaintenanceScanner) createECVolumeMetric(volumeID uint32) *VolumeHealt
}
// enrichECVolumeWithDeletionInfo attempts to get deletion information for an EC volume
// This implements basic EC deletion detection that can be enhanced over time
// by collecting and merging .ecj files from all servers hosting shards for this volume
//
// EC Volume Deletion Architecture:
// ================================
// Unlike regular volumes where deletions are tracked in a single .idx file on one server,
// EC volumes have their data distributed across multiple servers as erasure-coded shards.
// Each server maintains its own .ecj (EC journal) file that tracks deletions for the
// shards it hosts.
//
// To get the complete deletion picture for an EC volume, we must:
// 1. Find all servers hosting shards for the volume (via master topology)
// 2. Collect .ecj files from each server hosting shards
// 3. Parse each .ecj file to extract deleted needle IDs
// 4. Merge deletion data, avoiding double-counting (same needle deleted on multiple shards)
// 5. Calculate total deleted bytes using needle sizes from .ecx files
//
// Current Implementation:
// ======================
// This is a foundation implementation that:
// - Correctly identifies all servers with shards for the volume
// - Provides the framework for collecting from all servers
// - Uses conservative estimates until proper .ecj/.ecx parsing is implemented
// - Avoids false positives while enabling EC vacuum detection
//
// Future Enhancement:
// ==================
// The TODO sections contain detailed plans for implementing proper .ecj/.ecx
// file parsing through volume server APIs to get exact deletion metrics.
func (ms *MaintenanceScanner) enrichECVolumeWithDeletionInfo(metric *VolumeHealthMetrics, server string) {
// Get EC shard information to establish baseline
shardInfos, err := ms.getECShardInfo(metric.VolumeID, server)
// Find all servers hosting shards for this EC volume
serversWithShards, err := ms.findServersWithECShards(metric.VolumeID)
if err != nil {
glog.V(1).Infof("Failed to get EC shard info for volume %d from %s: %v", metric.VolumeID, server, err)
glog.V(1).Infof("Failed to find servers with EC shards for volume %d: %v", metric.VolumeID, err)
return
}
// For now, use a heuristic approach based on available data
// This can be enhanced with proper .ecx/.ecj file analysis in the future
deletedBytes := ms.estimateECVolumeDeletions(metric, shardInfos)
if len(serversWithShards) == 0 {
glog.V(2).Infof("No servers found with EC shards for volume %d", metric.VolumeID)
return
}
// Collect deletion information from all servers hosting shards
totalDeletedBytes, err := ms.collectECVolumeDelationsFromAllServers(metric.VolumeID, metric.Collection, serversWithShards)
if err != nil {
glog.V(1).Infof("Failed to collect EC volume %d deletions from all servers: %v", metric.VolumeID, err)
return
}
if deletedBytes > 0 {
metric.DeletedBytes = uint64(deletedBytes)
if totalDeletedBytes > 0 {
metric.DeletedBytes = uint64(totalDeletedBytes)
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
glog.V(2).Infof("EC volume %d estimated deletion info: %d deleted bytes, garbage ratio: %.1f%%",
metric.VolumeID, metric.DeletedBytes, metric.GarbageRatio*100)
glog.V(2).Infof("EC volume %d deletion info from %d servers: %d deleted bytes, garbage ratio: %.1f%%",
metric.VolumeID, len(serversWithShards), metric.DeletedBytes, metric.GarbageRatio*100)
}
}
// getECShardInfo retrieves basic shard information for an EC volume
func (ms *MaintenanceScanner) getECShardInfo(volumeId uint32, server string) ([]*volume_server_pb.EcShardInfo, error) {
// For now, return empty slice since we're implementing a heuristic approach
// This can be enhanced later with actual volume server API calls when proper
// authentication and gRPC dial options are available in the AdminClient interface
glog.V(3).Infof("EC shard info requested for volume %d from server %s (heuristic mode)", volumeId, server)
return []*volume_server_pb.EcShardInfo{}, nil
}
// findServersWithECShards finds all servers that host shards for a given EC volume
func (ms *MaintenanceScanner) findServersWithECShards(volumeId uint32) ([]string, error) {
var serversWithShards []string
// estimateECVolumeDeletions provides a conservative estimate of deleted bytes
// TODO: Enhance this with actual .ecx/.ecj file analysis for precise deletion tracking
func (ms *MaintenanceScanner) estimateECVolumeDeletions(metric *VolumeHealthMetrics, shardInfos []*volume_server_pb.EcShardInfo) int64 {
// For volumes that are older and likely to have deletions, provide conservative estimates
// This prevents false positives while allowing real deletion detection
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
// If the volume is relatively new (less than 1 day old), assume no significant deletions
if time.Since(metric.LastModified) < 24*time.Hour {
return 0
}
if resp.TopologyInfo == nil {
return fmt.Errorf("no topology info received from master")
}
// For older volumes, use metadata signals to estimate deletion potential
totalShardSize := int64(0)
for _, shard := range shardInfos {
totalShardSize += shard.Size
}
// Search through topology to find servers with EC shards for this volume
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if ecShardInfo.Id == volumeId {
// This server has shards for our volume
serverAlreadyAdded := false
for _, existingServer := range serversWithShards {
if existingServer == node.Id {
serverAlreadyAdded = true
break
}
}
if !serverAlreadyAdded {
serversWithShards = append(serversWithShards, node.Id)
glog.V(3).Infof("Found EC shards for volume %d on server %s (shard bits: %d)",
volumeId, node.Id, ecShardInfo.EcIndexBits)
}
break
}
}
}
}
}
}
return nil
})
return serversWithShards, err
}
// If shard sizes are significantly different from expected, there might be deletions
// This is a heuristic that can be refined with better deletion detection
if len(shardInfos) > 0 {
expectedShardSize := int64(metric.Size) / int64(len(shardInfos))
variance := calculateShardSizeVariance(shardInfos, expectedShardSize)
// collectECVolumeDelationsFromAllServers collects and merges deletion information from all servers
// hosting shards for the given EC volume by analyzing .ecj files distributed across servers
func (ms *MaintenanceScanner) collectECVolumeDelationsFromAllServers(volumeId uint32, collection string, servers []string) (int64, error) {
totalDeletedBytes := int64(0)
deletedNeedles := make(map[string]bool) // Track unique deleted needles to avoid double counting
// High variance might indicate deletions (this is a rough heuristic)
if variance > 0.3 { // 30% variance threshold
return int64(metric.Size / 10) // Conservative 10% deletion estimate
glog.V(2).Infof("Collecting EC volume %d deletions from %d servers: %v", volumeId, len(servers), servers)
for _, server := range servers {
serverDeletedBytes, serverDeletedNeedles, err := ms.getServerECVolumeDeletions(volumeId, collection, server)
if err != nil {
glog.V(1).Infof("Failed to get EC volume %d deletions from server %s: %v", volumeId, server, err)
continue
}
}
return 0
}
// Merge deletion information, avoiding double counting
for needle := range serverDeletedNeedles {
if !deletedNeedles[needle] {
deletedNeedles[needle] = true
// We can't get exact size per needle without more complex analysis,
// so we'll use the server's reported total as a conservative estimate
// This could be enhanced with proper .ecj parsing
}
}
// For now, sum the deleted bytes from all servers
// Note: This might double-count if the same needle is deleted across shards,
// but it provides a reasonable upper bound estimate
totalDeletedBytes += serverDeletedBytes
// calculateShardSizeVariance calculates the variance in shard sizes as a deletion indicator
func calculateShardSizeVariance(shardInfos []*volume_server_pb.EcShardInfo, expectedSize int64) float64 {
if len(shardInfos) == 0 || expectedSize == 0 {
return 0
glog.V(3).Infof("Server %s reported %d deleted bytes for EC volume %d", server, serverDeletedBytes, volumeId)
}
var totalVariance float64
for _, shard := range shardInfos {
diff := float64(shard.Size - expectedSize)
totalVariance += (diff * diff)
// Apply conservative adjustment to account for potential double counting
// Since deletions are tracked per shard but affect the whole needle,
// we should not simply sum all deleted bytes from all servers
if len(servers) > 1 && totalDeletedBytes > 0 {
// Conservative approach: assume some overlap and reduce the total
adjustmentFactor := float64(len(deletedNeedles)) / float64(len(servers))
if adjustmentFactor < 1.0 {
totalDeletedBytes = int64(float64(totalDeletedBytes) * adjustmentFactor)
glog.V(3).Infof("Applied conservative adjustment factor %.2f to EC volume %d deleted bytes", adjustmentFactor, volumeId)
}
}
variance := totalVariance / float64(len(shardInfos))
return variance / float64(expectedSize*expectedSize)
return totalDeletedBytes, nil
}
// getServerECVolumeDeletions gets deletion information for an EC volume from a specific server
// This is a foundation that can be enhanced with proper .ecj file analysis
func (ms *MaintenanceScanner) getServerECVolumeDeletions(volumeId uint32, collection, server string) (int64, map[string]bool, error) {
// TODO: Implement proper .ecj file parsing for accurate deletion tracking
//
// Future implementation should:
// 1. Connect to volume server using proper gRPC client with authentication
// 2. Request .ecj file content for the specific volume/collection:
// - Use volume server API to get .ecj file data
// - Parse binary .ecj file to extract deleted needle IDs
// 3. Optionally get needle sizes from .ecx file to calculate exact deleted bytes:
// - Use volume server API to get .ecx file data
// - Look up each deleted needle ID in .ecx to get its size
// - Sum all deleted needle sizes for accurate deleted bytes
// 4. Return both deleted bytes and set of deleted needle IDs for proper merging
//
// The proper implementation would look like:
//
// return operation.WithVolumeServerClient(false, pb.NewServerAddressFromLocation(server),
// ms.adminClient.GrpcDialOption(), func(client volume_server_pb.VolumeServerClient) error {
// // Get .ecj content
// ecjResp, err := client.VolumeEcJournalRead(ctx, &volume_server_pb.VolumeEcJournalReadRequest{
// VolumeId: volumeId, Collection: collection,
// })
// if err != nil { return err }
//
// // Parse .ecj binary data to extract deleted needle IDs
// deletedNeedleIds := parseEcjFile(ecjResp.JournalData)
//
// // Get .ecx content to look up needle sizes
// ecxResp, err := client.VolumeEcIndexRead(ctx, &volume_server_pb.VolumeEcIndexReadRequest{
// VolumeId: volumeId, Collection: collection,
// })
// if err != nil { return err }
//
// // Calculate total deleted bytes
// totalDeleted := int64(0)
// deletedNeedleMap := make(map[string]bool)
// for _, needleId := range deletedNeedleIds {
// if size := lookupNeedleSizeInEcx(ecxResp.IndexData, needleId); size > 0 {
// totalDeleted += size
// deletedNeedleMap[needleId.String()] = true
// }
// }
//
// return totalDeleted, deletedNeedleMap, nil
// })
// For now, implement a conservative heuristic approach
deletedNeedles := make(map[string]bool)
// Very conservative estimate to avoid false positives
// This will be replaced with proper .ecj/.ecx analysis
conservativeEstimate := int64(1024) // 1KB conservative estimate per server
glog.V(4).Infof("Applied conservative deletion estimate for EC volume %d on server %s: %d bytes (heuristic mode)",
volumeId, server, conservativeEstimate)
return conservativeEstimate, deletedNeedles, nil
}
// convertToTaskMetrics converts existing volume metrics to task system format

Loading…
Cancel
Save