You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

296 lines
8.7 KiB

package dash
import (
"context"
"fmt"
"io"
"path/filepath"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
// TopicRetentionPurger handles topic data purging based on retention policies
type TopicRetentionPurger struct {
adminServer *AdminServer
}
// NewTopicRetentionPurger creates a new topic retention purger
func NewTopicRetentionPurger(adminServer *AdminServer) *TopicRetentionPurger {
return &TopicRetentionPurger{
adminServer: adminServer,
}
}
// PurgeExpiredTopicData purges expired topic data based on retention policies
func (p *TopicRetentionPurger) PurgeExpiredTopicData() error {
glog.V(1).Infof("Starting topic data purge based on retention policies")
// Get all topics with retention enabled
topics, err := p.getTopicsWithRetention()
if err != nil {
return fmt.Errorf("failed to get topics with retention: %v", err)
}
glog.V(1).Infof("Found %d topics with retention enabled", len(topics))
// Process each topic
for _, topicRetention := range topics {
err := p.purgeTopicData(topicRetention)
if err != nil {
glog.Errorf("Failed to purge data for topic %s: %v", topicRetention.TopicName, err)
continue
}
}
glog.V(1).Infof("Completed topic data purge")
return nil
}
// TopicRetentionConfig represents a topic with its retention configuration
type TopicRetentionConfig struct {
TopicName string
Namespace string
Name string
RetentionSeconds int64
}
// getTopicsWithRetention retrieves all topics that have retention enabled
func (p *TopicRetentionPurger) getTopicsWithRetention() ([]TopicRetentionConfig, error) {
var topicsWithRetention []TopicRetentionConfig
// Find broker leader to get topics
brokerLeader, err := p.adminServer.findBrokerLeader()
if err != nil {
return nil, fmt.Errorf("failed to find broker leader: %v", err)
}
// Get all topics from the broker
err = p.adminServer.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
if err != nil {
return err
}
// Check each topic for retention configuration
for _, pbTopic := range resp.Topics {
configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
Topic: pbTopic,
})
if err != nil {
glog.Warningf("Failed to get configuration for topic %s.%s: %v", pbTopic.Namespace, pbTopic.Name, err)
continue
}
// Check if retention is enabled
if configResp.Retention != nil && configResp.Retention.Enabled && configResp.Retention.RetentionSeconds > 0 {
topicRetention := TopicRetentionConfig{
TopicName: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
Namespace: pbTopic.Namespace,
Name: pbTopic.Name,
RetentionSeconds: configResp.Retention.RetentionSeconds,
}
topicsWithRetention = append(topicsWithRetention, topicRetention)
}
}
return nil
})
if err != nil {
return nil, err
}
return topicsWithRetention, nil
}
// purgeTopicData purges expired data for a specific topic
func (p *TopicRetentionPurger) purgeTopicData(topicRetention TopicRetentionConfig) error {
glog.V(1).Infof("Purging expired data for topic %s with retention %d seconds", topicRetention.TopicName, topicRetention.RetentionSeconds)
// Calculate cutoff time
cutoffTime := time.Now().Add(-time.Duration(topicRetention.RetentionSeconds) * time.Second)
// Get topic directory
topicObj := topic.NewTopic(topicRetention.Namespace, topicRetention.Name)
topicDir := topicObj.Dir()
var purgedDirs []string
err := p.adminServer.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// List all version directories under the topic directory
versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: topicDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
}
var versionDirs []VersionDirInfo
// Collect all version directories
for {
versionResp, err := versionStream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to receive version entries: %v", err)
}
// Only process directories that are versions (start with "v")
if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
versionTime, err := p.parseVersionTime(versionResp.Entry.Name)
if err != nil {
glog.Warningf("Failed to parse version time from %s: %v", versionResp.Entry.Name, err)
continue
}
versionDirs = append(versionDirs, VersionDirInfo{
Name: versionResp.Entry.Name,
VersionTime: versionTime,
ModTime: time.Unix(versionResp.Entry.Attributes.Mtime, 0),
})
}
}
// Sort version directories by time (oldest first)
sort.Slice(versionDirs, func(i, j int) bool {
return versionDirs[i].VersionTime.Before(versionDirs[j].VersionTime)
})
// Keep at least the most recent version directory, even if it's expired
if len(versionDirs) <= 1 {
glog.V(1).Infof("Topic %s has %d version directories, keeping all", topicRetention.TopicName, len(versionDirs))
return nil
}
// Purge expired directories (keep the most recent one)
for i := 0; i < len(versionDirs)-1; i++ {
versionDir := versionDirs[i]
// Check if this version directory is expired
if versionDir.VersionTime.Before(cutoffTime) {
dirPath := filepath.Join(topicDir, versionDir.Name)
// Delete the entire version directory
err := p.deleteDirectoryRecursively(client, dirPath)
if err != nil {
glog.Errorf("Failed to delete expired directory %s: %v", dirPath, err)
} else {
purgedDirs = append(purgedDirs, dirPath)
glog.V(1).Infof("Purged expired directory: %s (created: %s)", dirPath, versionDir.VersionTime.Format("2006-01-02 15:04:05"))
}
}
}
return nil
})
if err != nil {
return err
}
if len(purgedDirs) > 0 {
glog.V(0).Infof("Purged %d expired directories for topic %s", len(purgedDirs), topicRetention.TopicName)
}
return nil
}
// VersionDirInfo represents a version directory with its timestamp
type VersionDirInfo struct {
Name string
VersionTime time.Time
ModTime time.Time
}
// parseVersionTime parses the version directory name to extract the timestamp
// Version format: v2025-01-10-05-44-34
func (p *TopicRetentionPurger) parseVersionTime(versionName string) (time.Time, error) {
// Remove the 'v' prefix
if !strings.HasPrefix(versionName, "v") {
return time.Time{}, fmt.Errorf("invalid version format: %s", versionName)
}
timeStr := versionName[1:] // Remove 'v'
// Parse the time format: 2025-01-10-05-44-34
versionTime, err := time.Parse("2006-01-02-15-04-05", timeStr)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse version time %s: %v", timeStr, err)
}
return versionTime, nil
}
// deleteDirectoryRecursively deletes a directory and all its contents
func (p *TopicRetentionPurger) deleteDirectoryRecursively(client filer_pb.SeaweedFilerClient, dirPath string) error {
// List all entries in the directory
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: dirPath,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
return fmt.Errorf("failed to list directory %s: %v", dirPath, err)
}
// Delete all entries
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to receive entries: %v", err)
}
entryPath := filepath.Join(dirPath, resp.Entry.Name)
if resp.Entry.IsDirectory {
// Recursively delete subdirectory
err = p.deleteDirectoryRecursively(client, entryPath)
if err != nil {
return fmt.Errorf("failed to delete subdirectory %s: %v", entryPath, err)
}
} else {
// Delete file
_, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
Directory: dirPath,
Name: resp.Entry.Name,
})
if err != nil {
return fmt.Errorf("failed to delete file %s: %v", entryPath, err)
}
}
}
// Delete the directory itself
parentDir := filepath.Dir(dirPath)
dirName := filepath.Base(dirPath)
_, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
Directory: parentDir,
Name: dirName,
})
if err != nil {
return fmt.Errorf("failed to delete directory %s: %v", dirPath, err)
}
return nil
}