You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
302 lines
8.8 KiB
302 lines
8.8 KiB
package offset
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// MigrationVersion represents a database migration version
|
|
type MigrationVersion struct {
|
|
Version int
|
|
Description string
|
|
SQL string
|
|
}
|
|
|
|
// GetMigrations returns all available migrations for offset storage
|
|
func GetMigrations() []MigrationVersion {
|
|
return []MigrationVersion{
|
|
{
|
|
Version: 1,
|
|
Description: "Create initial offset storage tables",
|
|
SQL: `
|
|
-- Partition offset checkpoints table
|
|
-- TODO: Add _index as computed column when supported by database
|
|
CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
|
|
partition_key TEXT PRIMARY KEY,
|
|
ring_size INTEGER NOT NULL,
|
|
range_start INTEGER NOT NULL,
|
|
range_stop INTEGER NOT NULL,
|
|
unix_time_ns INTEGER NOT NULL,
|
|
checkpoint_offset INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
);
|
|
|
|
-- Offset mappings table for detailed tracking
|
|
-- TODO: Add _index as computed column when supported by database
|
|
CREATE TABLE IF NOT EXISTS offset_mappings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
partition_key TEXT NOT NULL,
|
|
kafka_offset INTEGER NOT NULL,
|
|
smq_timestamp INTEGER NOT NULL,
|
|
message_size INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
UNIQUE(partition_key, kafka_offset)
|
|
);
|
|
|
|
-- Schema migrations tracking table
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version INTEGER PRIMARY KEY,
|
|
description TEXT NOT NULL,
|
|
applied_at INTEGER NOT NULL
|
|
);
|
|
`,
|
|
},
|
|
{
|
|
Version: 2,
|
|
Description: "Add indexes for performance optimization",
|
|
SQL: `
|
|
-- Indexes for performance
|
|
CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
|
|
ON partition_offset_checkpoints(partition_key);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
|
|
ON offset_mappings(partition_key, kafka_offset);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
|
|
ON offset_mappings(partition_key, smq_timestamp);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at
|
|
ON offset_mappings(created_at);
|
|
`,
|
|
},
|
|
{
|
|
Version: 3,
|
|
Description: "Add partition metadata table for enhanced tracking",
|
|
SQL: `
|
|
-- Partition metadata table
|
|
CREATE TABLE IF NOT EXISTS partition_metadata (
|
|
partition_key TEXT PRIMARY KEY,
|
|
ring_size INTEGER NOT NULL,
|
|
range_start INTEGER NOT NULL,
|
|
range_stop INTEGER NOT NULL,
|
|
unix_time_ns INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
last_activity_at INTEGER NOT NULL,
|
|
record_count INTEGER DEFAULT 0,
|
|
total_size INTEGER DEFAULT 0
|
|
);
|
|
|
|
-- Index for partition metadata
|
|
CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity
|
|
ON partition_metadata(last_activity_at);
|
|
`,
|
|
},
|
|
}
|
|
}
|
|
|
|
// MigrationManager handles database schema migrations
|
|
type MigrationManager struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewMigrationManager creates a new migration manager
|
|
func NewMigrationManager(db *sql.DB) *MigrationManager {
|
|
return &MigrationManager{db: db}
|
|
}
|
|
|
|
// GetCurrentVersion returns the current schema version
|
|
func (m *MigrationManager) GetCurrentVersion() (int, error) {
|
|
// First, ensure the migrations table exists
|
|
_, err := m.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version INTEGER PRIMARY KEY,
|
|
description TEXT NOT NULL,
|
|
applied_at INTEGER NOT NULL
|
|
)
|
|
`)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to create migrations table: %w", err)
|
|
}
|
|
|
|
var version sql.NullInt64
|
|
err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get current version: %w", err)
|
|
}
|
|
|
|
if !version.Valid {
|
|
return 0, nil // No migrations applied yet
|
|
}
|
|
|
|
return int(version.Int64), nil
|
|
}
|
|
|
|
// ApplyMigrations applies all pending migrations
|
|
func (m *MigrationManager) ApplyMigrations() error {
|
|
currentVersion, err := m.GetCurrentVersion()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current version: %w", err)
|
|
}
|
|
|
|
migrations := GetMigrations()
|
|
|
|
for _, migration := range migrations {
|
|
if migration.Version <= currentVersion {
|
|
continue // Already applied
|
|
}
|
|
|
|
fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description)
|
|
|
|
// Begin transaction
|
|
tx, err := m.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err)
|
|
}
|
|
|
|
// Execute migration SQL
|
|
_, err = tx.Exec(migration.SQL)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err)
|
|
}
|
|
|
|
// Record migration as applied
|
|
_, err = tx.Exec(
|
|
"INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
|
|
migration.Version,
|
|
migration.Description,
|
|
getCurrentTimestamp(),
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("failed to record migration %d: %w", migration.Version, err)
|
|
}
|
|
|
|
// Commit transaction
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err)
|
|
}
|
|
|
|
fmt.Printf("Successfully applied migration %d\n", migration.Version)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RollbackMigration rolls back a specific migration (if supported)
|
|
func (m *MigrationManager) RollbackMigration(version int) error {
|
|
// TODO: Implement rollback functionality
|
|
// ASSUMPTION: For now, rollbacks are not supported as they require careful planning
|
|
return fmt.Errorf("migration rollbacks not implemented - manual intervention required")
|
|
}
|
|
|
|
// GetAppliedMigrations returns a list of all applied migrations
|
|
func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) {
|
|
rows, err := m.db.Query(`
|
|
SELECT version, description, applied_at
|
|
FROM schema_migrations
|
|
ORDER BY version
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query applied migrations: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var migrations []AppliedMigration
|
|
for rows.Next() {
|
|
var migration AppliedMigration
|
|
err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan migration: %w", err)
|
|
}
|
|
migrations = append(migrations, migration)
|
|
}
|
|
|
|
return migrations, nil
|
|
}
|
|
|
|
// ValidateSchema validates that the database schema is up to date
|
|
func (m *MigrationManager) ValidateSchema() error {
|
|
currentVersion, err := m.GetCurrentVersion()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current version: %w", err)
|
|
}
|
|
|
|
migrations := GetMigrations()
|
|
if len(migrations) == 0 {
|
|
return nil
|
|
}
|
|
|
|
latestVersion := migrations[len(migrations)-1].Version
|
|
if currentVersion < latestVersion {
|
|
return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AppliedMigration represents a migration that has been applied
|
|
type AppliedMigration struct {
|
|
Version int
|
|
Description string
|
|
AppliedAt int64
|
|
}
|
|
|
|
// getCurrentTimestamp returns the current timestamp in nanoseconds
|
|
func getCurrentTimestamp() int64 {
|
|
return time.Now().UnixNano()
|
|
}
|
|
|
|
// CreateDatabase creates and initializes a new offset storage database
|
|
func CreateDatabase(dbPath string) (*sql.DB, error) {
|
|
// TODO: Support different database types (PostgreSQL, MySQL, etc.)
|
|
// ASSUMPTION: Using SQLite for now, can be extended for other databases
|
|
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
// Configure SQLite for better performance
|
|
pragmas := []string{
|
|
"PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency
|
|
"PRAGMA synchronous=NORMAL", // Balance between safety and performance
|
|
"PRAGMA cache_size=10000", // Increase cache size
|
|
"PRAGMA foreign_keys=ON", // Enable foreign key constraints
|
|
"PRAGMA temp_store=MEMORY", // Store temporary tables in memory
|
|
}
|
|
|
|
for _, pragma := range pragmas {
|
|
_, err := db.Exec(pragma)
|
|
if err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
|
|
}
|
|
}
|
|
|
|
// Apply migrations
|
|
migrationManager := NewMigrationManager(db)
|
|
err = migrationManager.ApplyMigrations()
|
|
if err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("failed to apply migrations: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// BackupDatabase creates a backup of the offset storage database
|
|
func BackupDatabase(sourceDB *sql.DB, backupPath string) error {
|
|
// TODO: Implement database backup functionality
|
|
// ASSUMPTION: This would use database-specific backup mechanisms
|
|
return fmt.Errorf("database backup not implemented yet")
|
|
}
|
|
|
|
// RestoreDatabase restores a database from a backup
|
|
func RestoreDatabase(backupPath, targetPath string) error {
|
|
// TODO: Implement database restore functionality
|
|
// ASSUMPTION: This would use database-specific restore mechanisms
|
|
return fmt.Errorf("database restore not implemented yet")
|
|
}
|