You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

302 lines
8.8 KiB

package offset
import (
"database/sql"
"fmt"
"time"
)
// MigrationVersion represents a database migration version
type MigrationVersion struct {
Version int
Description string
SQL string
}
// GetMigrations returns all available migrations for offset storage
func GetMigrations() []MigrationVersion {
return []MigrationVersion{
{
Version: 1,
Description: "Create initial offset storage tables",
SQL: `
-- Partition offset checkpoints table
-- TODO: Add _index as computed column when supported by database
CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
partition_key TEXT PRIMARY KEY,
ring_size INTEGER NOT NULL,
range_start INTEGER NOT NULL,
range_stop INTEGER NOT NULL,
unix_time_ns INTEGER NOT NULL,
checkpoint_offset INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Offset mappings table for detailed tracking
-- TODO: Add _index as computed column when supported by database
CREATE TABLE IF NOT EXISTS offset_mappings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
partition_key TEXT NOT NULL,
kafka_offset INTEGER NOT NULL,
smq_timestamp INTEGER NOT NULL,
message_size INTEGER NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(partition_key, kafka_offset)
);
-- Schema migrations tracking table
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at INTEGER NOT NULL
);
`,
},
{
Version: 2,
Description: "Add indexes for performance optimization",
SQL: `
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
ON partition_offset_checkpoints(partition_key);
CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
ON offset_mappings(partition_key, kafka_offset);
CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
ON offset_mappings(partition_key, smq_timestamp);
CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at
ON offset_mappings(created_at);
`,
},
{
Version: 3,
Description: "Add partition metadata table for enhanced tracking",
SQL: `
-- Partition metadata table
CREATE TABLE IF NOT EXISTS partition_metadata (
partition_key TEXT PRIMARY KEY,
ring_size INTEGER NOT NULL,
range_start INTEGER NOT NULL,
range_stop INTEGER NOT NULL,
unix_time_ns INTEGER NOT NULL,
created_at INTEGER NOT NULL,
last_activity_at INTEGER NOT NULL,
record_count INTEGER DEFAULT 0,
total_size INTEGER DEFAULT 0
);
-- Index for partition metadata
CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity
ON partition_metadata(last_activity_at);
`,
},
}
}
// MigrationManager handles database schema migrations
type MigrationManager struct {
db *sql.DB
}
// NewMigrationManager creates a new migration manager
func NewMigrationManager(db *sql.DB) *MigrationManager {
return &MigrationManager{db: db}
}
// GetCurrentVersion returns the current schema version
func (m *MigrationManager) GetCurrentVersion() (int, error) {
// First, ensure the migrations table exists
_, err := m.db.Exec(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at INTEGER NOT NULL
)
`)
if err != nil {
return 0, fmt.Errorf("failed to create migrations table: %w", err)
}
var version sql.NullInt64
err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get current version: %w", err)
}
if !version.Valid {
return 0, nil // No migrations applied yet
}
return int(version.Int64), nil
}
// ApplyMigrations applies all pending migrations
func (m *MigrationManager) ApplyMigrations() error {
currentVersion, err := m.GetCurrentVersion()
if err != nil {
return fmt.Errorf("failed to get current version: %w", err)
}
migrations := GetMigrations()
for _, migration := range migrations {
if migration.Version <= currentVersion {
continue // Already applied
}
fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description)
// Begin transaction
tx, err := m.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err)
}
// Execute migration SQL
_, err = tx.Exec(migration.SQL)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err)
}
// Record migration as applied
_, err = tx.Exec(
"INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
migration.Version,
migration.Description,
getCurrentTimestamp(),
)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to record migration %d: %w", migration.Version, err)
}
// Commit transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err)
}
fmt.Printf("Successfully applied migration %d\n", migration.Version)
}
return nil
}
// RollbackMigration rolls back a specific migration (if supported)
func (m *MigrationManager) RollbackMigration(version int) error {
// TODO: Implement rollback functionality
// ASSUMPTION: For now, rollbacks are not supported as they require careful planning
return fmt.Errorf("migration rollbacks not implemented - manual intervention required")
}
// GetAppliedMigrations returns a list of all applied migrations
func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) {
rows, err := m.db.Query(`
SELECT version, description, applied_at
FROM schema_migrations
ORDER BY version
`)
if err != nil {
return nil, fmt.Errorf("failed to query applied migrations: %w", err)
}
defer rows.Close()
var migrations []AppliedMigration
for rows.Next() {
var migration AppliedMigration
err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt)
if err != nil {
return nil, fmt.Errorf("failed to scan migration: %w", err)
}
migrations = append(migrations, migration)
}
return migrations, nil
}
// ValidateSchema validates that the database schema is up to date
func (m *MigrationManager) ValidateSchema() error {
currentVersion, err := m.GetCurrentVersion()
if err != nil {
return fmt.Errorf("failed to get current version: %w", err)
}
migrations := GetMigrations()
if len(migrations) == 0 {
return nil
}
latestVersion := migrations[len(migrations)-1].Version
if currentVersion < latestVersion {
return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion)
}
return nil
}
// AppliedMigration represents a migration that has been applied
type AppliedMigration struct {
Version int
Description string
AppliedAt int64
}
// getCurrentTimestamp returns the current timestamp in nanoseconds
func getCurrentTimestamp() int64 {
return time.Now().UnixNano()
}
// CreateDatabase creates and initializes a new offset storage database
func CreateDatabase(dbPath string) (*sql.DB, error) {
// TODO: Support different database types (PostgreSQL, MySQL, etc.)
// ASSUMPTION: Using SQLite for now, can be extended for other databases
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// Configure SQLite for better performance
pragmas := []string{
"PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency
"PRAGMA synchronous=NORMAL", // Balance between safety and performance
"PRAGMA cache_size=10000", // Increase cache size
"PRAGMA foreign_keys=ON", // Enable foreign key constraints
"PRAGMA temp_store=MEMORY", // Store temporary tables in memory
}
for _, pragma := range pragmas {
_, err := db.Exec(pragma)
if err != nil {
db.Close()
return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
}
}
// Apply migrations
migrationManager := NewMigrationManager(db)
err = migrationManager.ApplyMigrations()
if err != nil {
db.Close()
return nil, fmt.Errorf("failed to apply migrations: %w", err)
}
return db, nil
}
// BackupDatabase creates a backup of the offset storage database
func BackupDatabase(sourceDB *sql.DB, backupPath string) error {
// TODO: Implement database backup functionality
// ASSUMPTION: This would use database-specific backup mechanisms
return fmt.Errorf("database backup not implemented yet")
}
// RestoreDatabase restores a database from a backup
func RestoreDatabase(backupPath, targetPath string) error {
// TODO: Implement database restore functionality
// ASSUMPTION: This would use database-specific restore mechanisms
return fmt.Errorf("database restore not implemented yet")
}