You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
391 lines
11 KiB
391 lines
11 KiB
package offset
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// OffsetEntry represents a mapping between Kafka offset and SMQ timestamp
|
|
type OffsetEntry struct {
|
|
KafkaOffset int64
|
|
SMQTimestamp int64
|
|
MessageSize int32
|
|
}
|
|
|
|
// SQLOffsetStorage implements OffsetStorage using SQL database with _index column
|
|
type SQLOffsetStorage struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewSQLOffsetStorage creates a new SQL-based offset storage
|
|
func NewSQLOffsetStorage(db *sql.DB) (*SQLOffsetStorage, error) {
|
|
storage := &SQLOffsetStorage{db: db}
|
|
|
|
// Initialize database schema
|
|
if err := storage.initializeSchema(); err != nil {
|
|
return nil, fmt.Errorf("failed to initialize schema: %w", err)
|
|
}
|
|
|
|
return storage, nil
|
|
}
|
|
|
|
// initializeSchema creates the necessary tables for offset storage
|
|
func (s *SQLOffsetStorage) initializeSchema() error {
|
|
// TODO: Create offset storage tables with _index as hidden column
|
|
// ASSUMPTION: Using SQLite-compatible syntax, may need adaptation for other databases
|
|
|
|
queries := []string{
|
|
// Partition offset checkpoints table
|
|
// TODO: Add _index as computed column when supported by database
|
|
// ASSUMPTION: Using regular columns for now, _index concept preserved for future enhancement
|
|
`CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
|
|
partition_key TEXT PRIMARY KEY,
|
|
ring_size INTEGER NOT NULL,
|
|
range_start INTEGER NOT NULL,
|
|
range_stop INTEGER NOT NULL,
|
|
unix_time_ns INTEGER NOT NULL,
|
|
checkpoint_offset INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
)`,
|
|
|
|
// Offset mappings table for detailed tracking
|
|
// TODO: Add _index as computed column when supported by database
|
|
`CREATE TABLE IF NOT EXISTS offset_mappings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
partition_key TEXT NOT NULL,
|
|
kafka_offset INTEGER NOT NULL,
|
|
smq_timestamp INTEGER NOT NULL,
|
|
message_size INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
UNIQUE(partition_key, kafka_offset)
|
|
)`,
|
|
|
|
// Indexes for performance
|
|
`CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
|
|
ON partition_offset_checkpoints(partition_key)`,
|
|
|
|
`CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
|
|
ON offset_mappings(partition_key, kafka_offset)`,
|
|
|
|
`CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
|
|
ON offset_mappings(partition_key, smq_timestamp)`,
|
|
}
|
|
|
|
for _, query := range queries {
|
|
if _, err := s.db.Exec(query); err != nil {
|
|
return fmt.Errorf("failed to execute schema query: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SaveCheckpoint saves the checkpoint for a partition
|
|
func (s *SQLOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error {
|
|
partitionKey := partitionKey(partition)
|
|
now := time.Now().UnixNano()
|
|
|
|
// TODO: Use UPSERT for better performance
|
|
// ASSUMPTION: SQLite REPLACE syntax, may need adaptation for other databases
|
|
query := `
|
|
REPLACE INTO partition_offset_checkpoints
|
|
(partition_key, ring_size, range_start, range_stop, unix_time_ns, checkpoint_offset, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
`
|
|
|
|
_, err := s.db.Exec(query,
|
|
partitionKey,
|
|
partition.RingSize,
|
|
partition.RangeStart,
|
|
partition.RangeStop,
|
|
partition.UnixTimeNs,
|
|
offset,
|
|
now,
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save checkpoint: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadCheckpoint loads the checkpoint for a partition
|
|
func (s *SQLOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
|
|
partitionKey := partitionKey(partition)
|
|
|
|
query := `
|
|
SELECT checkpoint_offset
|
|
FROM partition_offset_checkpoints
|
|
WHERE partition_key = ?
|
|
`
|
|
|
|
var checkpointOffset int64
|
|
err := s.db.QueryRow(query, partitionKey).Scan(&checkpointOffset)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return -1, fmt.Errorf("no checkpoint found")
|
|
}
|
|
|
|
if err != nil {
|
|
return -1, fmt.Errorf("failed to load checkpoint: %w", err)
|
|
}
|
|
|
|
return checkpointOffset, nil
|
|
}
|
|
|
|
// GetHighestOffset finds the highest offset in storage for a partition
|
|
func (s *SQLOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
|
|
partitionKey := partitionKey(partition)
|
|
|
|
// TODO: Use _index column for efficient querying
|
|
// ASSUMPTION: kafka_offset represents the sequential offset we're tracking
|
|
query := `
|
|
SELECT MAX(kafka_offset)
|
|
FROM offset_mappings
|
|
WHERE partition_key = ?
|
|
`
|
|
|
|
var highestOffset sql.NullInt64
|
|
err := s.db.QueryRow(query, partitionKey).Scan(&highestOffset)
|
|
|
|
if err != nil {
|
|
return -1, fmt.Errorf("failed to get highest offset: %w", err)
|
|
}
|
|
|
|
if !highestOffset.Valid {
|
|
return -1, fmt.Errorf("no records found")
|
|
}
|
|
|
|
return highestOffset.Int64, nil
|
|
}
|
|
|
|
// SaveOffsetMapping stores an offset mapping (extends OffsetStorage interface)
|
|
func (s *SQLOffsetStorage) SaveOffsetMapping(partitionKey string, kafkaOffset, smqTimestamp int64, size int32) error {
|
|
now := time.Now().UnixNano()
|
|
|
|
// TODO: Handle duplicate key conflicts gracefully
|
|
// ASSUMPTION: Using INSERT OR REPLACE for conflict resolution
|
|
query := `
|
|
INSERT OR REPLACE INTO offset_mappings
|
|
(partition_key, kafka_offset, smq_timestamp, message_size, created_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
`
|
|
|
|
_, err := s.db.Exec(query, partitionKey, kafkaOffset, smqTimestamp, size, now)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save offset mapping: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadOffsetMappings retrieves all offset mappings for a partition
|
|
func (s *SQLOffsetStorage) LoadOffsetMappings(partitionKey string) ([]OffsetEntry, error) {
|
|
// TODO: Add pagination for large result sets
|
|
// ASSUMPTION: Loading all mappings for now, should be paginated in production
|
|
query := `
|
|
SELECT kafka_offset, smq_timestamp, message_size
|
|
FROM offset_mappings
|
|
WHERE partition_key = ?
|
|
ORDER BY kafka_offset ASC
|
|
`
|
|
|
|
rows, err := s.db.Query(query, partitionKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query offset mappings: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var entries []OffsetEntry
|
|
for rows.Next() {
|
|
var entry OffsetEntry
|
|
err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan offset entry: %w", err)
|
|
}
|
|
entries = append(entries, entry)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating offset mappings: %w", err)
|
|
}
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
// GetOffsetMappingsByRange retrieves offset mappings within a specific range
|
|
func (s *SQLOffsetStorage) GetOffsetMappingsByRange(partitionKey string, startOffset, endOffset int64) ([]OffsetEntry, error) {
|
|
// TODO: Use _index column for efficient range queries
|
|
query := `
|
|
SELECT kafka_offset, smq_timestamp, message_size
|
|
FROM offset_mappings
|
|
WHERE partition_key = ? AND kafka_offset >= ? AND kafka_offset <= ?
|
|
ORDER BY kafka_offset ASC
|
|
`
|
|
|
|
rows, err := s.db.Query(query, partitionKey, startOffset, endOffset)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query offset range: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var entries []OffsetEntry
|
|
for rows.Next() {
|
|
var entry OffsetEntry
|
|
err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan offset entry: %w", err)
|
|
}
|
|
entries = append(entries, entry)
|
|
}
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
// GetPartitionStats returns statistics about a partition's offset usage
|
|
func (s *SQLOffsetStorage) GetPartitionStats(partitionKey string) (*PartitionStats, error) {
|
|
query := `
|
|
SELECT
|
|
COUNT(*) as record_count,
|
|
MIN(kafka_offset) as earliest_offset,
|
|
MAX(kafka_offset) as latest_offset,
|
|
SUM(message_size) as total_size,
|
|
MIN(created_at) as first_record_time,
|
|
MAX(created_at) as last_record_time
|
|
FROM offset_mappings
|
|
WHERE partition_key = ?
|
|
`
|
|
|
|
var stats PartitionStats
|
|
var earliestOffset, latestOffset sql.NullInt64
|
|
var totalSize sql.NullInt64
|
|
var firstRecordTime, lastRecordTime sql.NullInt64
|
|
|
|
err := s.db.QueryRow(query, partitionKey).Scan(
|
|
&stats.RecordCount,
|
|
&earliestOffset,
|
|
&latestOffset,
|
|
&totalSize,
|
|
&firstRecordTime,
|
|
&lastRecordTime,
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get partition stats: %w", err)
|
|
}
|
|
|
|
stats.PartitionKey = partitionKey
|
|
|
|
if earliestOffset.Valid {
|
|
stats.EarliestOffset = earliestOffset.Int64
|
|
} else {
|
|
stats.EarliestOffset = -1
|
|
}
|
|
|
|
if latestOffset.Valid {
|
|
stats.LatestOffset = latestOffset.Int64
|
|
stats.HighWaterMark = latestOffset.Int64 + 1
|
|
} else {
|
|
stats.LatestOffset = -1
|
|
stats.HighWaterMark = 0
|
|
}
|
|
|
|
if firstRecordTime.Valid {
|
|
stats.FirstRecordTime = firstRecordTime.Int64
|
|
}
|
|
|
|
if lastRecordTime.Valid {
|
|
stats.LastRecordTime = lastRecordTime.Int64
|
|
}
|
|
|
|
if totalSize.Valid {
|
|
stats.TotalSize = totalSize.Int64
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// CleanupOldMappings removes offset mappings older than the specified time
|
|
func (s *SQLOffsetStorage) CleanupOldMappings(olderThanNs int64) error {
|
|
// TODO: Add configurable cleanup policies
|
|
// ASSUMPTION: Simple time-based cleanup, could be enhanced with retention policies
|
|
query := `
|
|
DELETE FROM offset_mappings
|
|
WHERE created_at < ?
|
|
`
|
|
|
|
result, err := s.db.Exec(query, olderThanNs)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to cleanup old mappings: %w", err)
|
|
}
|
|
|
|
rowsAffected, _ := result.RowsAffected()
|
|
if rowsAffected > 0 {
|
|
// Log cleanup activity
|
|
fmt.Printf("Cleaned up %d old offset mappings\n", rowsAffected)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func (s *SQLOffsetStorage) Close() error {
|
|
if s.db != nil {
|
|
return s.db.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PartitionStats provides statistics about a partition's offset usage
|
|
type PartitionStats struct {
|
|
PartitionKey string
|
|
RecordCount int64
|
|
EarliestOffset int64
|
|
LatestOffset int64
|
|
HighWaterMark int64
|
|
TotalSize int64
|
|
FirstRecordTime int64
|
|
LastRecordTime int64
|
|
}
|
|
|
|
// GetAllPartitions returns a list of all partitions with offset data
|
|
func (s *SQLOffsetStorage) GetAllPartitions() ([]string, error) {
|
|
query := `
|
|
SELECT DISTINCT partition_key
|
|
FROM offset_mappings
|
|
ORDER BY partition_key
|
|
`
|
|
|
|
rows, err := s.db.Query(query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get all partitions: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var partitions []string
|
|
for rows.Next() {
|
|
var partitionKey string
|
|
if err := rows.Scan(&partitionKey); err != nil {
|
|
return nil, fmt.Errorf("failed to scan partition key: %w", err)
|
|
}
|
|
partitions = append(partitions, partitionKey)
|
|
}
|
|
|
|
return partitions, nil
|
|
}
|
|
|
|
// Vacuum performs database maintenance operations
|
|
func (s *SQLOffsetStorage) Vacuum() error {
|
|
// TODO: Add database-specific optimization commands
|
|
// ASSUMPTION: SQLite VACUUM command, may need adaptation for other databases
|
|
_, err := s.db.Exec("VACUUM")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to vacuum database: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|