Browse Source

fix

pull/7185/head
chrislu 1 month ago
parent
commit
d192536376
  1. 4
      SQL_FEATURE_PLAN.md
  2. 242
      test/postgres/coverage_test.go
  3. 71
      weed/query/engine/time_filter_test.go
  4. 113
      weed/server/postgres/protocol.go

4
SQL_FEATURE_PLAN.md

@ -1,12 +1,12 @@
# SQL Query Engine Feature, Dev, and Test Plan # SQL Query Engine Feature, Dev, and Test Plan
This document outlines the plan for adding comprehensive SQL support to SeaweedFS, focusing on schema-tized Message Queue (MQ) topics with full DDL and DML capabilities, plus S3 objects querying.
This document outlines the plan for adding comprehensive SQL support to SeaweedFS, focusing on schematized Message Queue (MQ) topics with full DDL and DML capabilities, plus S3 objects querying.
## Feature Plan ## Feature Plan
**1. Goal** **1. Goal**
To provide a full-featured SQL interface for SeaweedFS, treating schema-tized MQ topics as database tables with complete DDL/DML support. This enables:
To provide a full-featured SQL interface for SeaweedFS, treating schematized MQ topics as database tables with complete DDL/DML support. This enables:
- Database-like operations on MQ topics (CREATE TABLE, ALTER TABLE, DROP TABLE) - Database-like operations on MQ topics (CREATE TABLE, ALTER TABLE, DROP TABLE)
- Advanced querying with SELECT, WHERE, JOIN, aggregations - Advanced querying with SELECT, WHERE, JOIN, aggregations
- Schema management and metadata operations (SHOW DATABASES, SHOW TABLES) - Schema management and metadata operations (SHOW DATABASES, SHOW TABLES)

242
test/postgres/coverage_test.go

@ -1,242 +0,0 @@
package main
import (
"database/sql"
"fmt"
"log"
"testing"
"time"
_ "github.com/lib/pq"
)
// TestSHOWTablesRecovery tests that SHOW TABLES doesn't crash with nil pointer dereference
func TestSHOWTablesRecovery(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Test SHOW TABLES (this was causing panics before the fix)
rows, err := db.Query("SHOW TABLES")
if err != nil {
t.Fatalf("SHOW TABLES failed: %v", err)
}
defer rows.Close()
tableCount := 0
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
t.Errorf("Error scanning table name: %v", err)
continue
}
tableCount++
log.Printf("Found table: %s", tableName)
}
if tableCount == 0 {
t.Error("Expected at least one table, got 0")
}
log.Printf("✓ SHOW TABLES recovery test passed - found %d tables", tableCount)
}
// TestSHOWTablesFromDatabase tests SHOW TABLES FROM database syntax
func TestSHOWTablesFromDatabase(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Test SHOW TABLES FROM specific database
rows, err := db.Query(`SHOW TABLES FROM "logs"`)
if err != nil {
t.Fatalf("SHOW TABLES FROM logs failed: %v", err)
}
defer rows.Close()
found := false
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
t.Errorf("Error scanning table name: %v", err)
continue
}
found = true
log.Printf("Found table in logs database: %s", tableName)
}
if !found {
t.Error("Expected tables in logs database, found none")
}
log.Printf("✓ SHOW TABLES FROM database test passed")
}
// TestSystemQueriesIndividualConnections tests system queries with fresh connections
func TestSystemQueriesIndividualConnections(t *testing.T) {
queries := []struct {
name string
query string
}{
{"Version", "SELECT version()"},
{"Current User", "SELECT current_user"},
{"Current Database", "SELECT current_database()"},
{"Server Encoding", "SELECT current_setting('server_encoding')"},
}
connStr := fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable",
getEnv("POSTGRES_HOST", "postgres-server"),
getEnv("POSTGRES_PORT", "5432"),
getEnv("POSTGRES_USER", "seaweedfs"),
getEnv("POSTGRES_DB", "logs"))
for _, q := range queries {
t.Run(q.name, func(t *testing.T) {
// Create fresh connection for each system query (this was the fix)
db, err := sql.Open("postgres", connStr)
if err != nil {
t.Fatalf("Failed to create connection for %s: %v", q.name, err)
}
defer db.Close()
var result string
err = db.QueryRow(q.query).Scan(&result)
if err != nil {
t.Errorf("System query %s failed: %v", q.name, err)
return
}
if result == "" {
t.Errorf("System query %s returned empty result", q.name)
return
}
log.Printf("✓ %s: %s", q.name, result)
})
}
}
// TestDatabaseConnectionSwitching tests connecting to different databases
func TestDatabaseConnectionSwitching(t *testing.T) {
databases := []string{"analytics", "ecommerce", "logs"}
host := getEnv("POSTGRES_HOST", "postgres-server")
port := getEnv("POSTGRES_PORT", "5432")
user := getEnv("POSTGRES_USER", "seaweedfs")
for _, dbName := range databases {
t.Run(fmt.Sprintf("Connect to %s", dbName), func(t *testing.T) {
// Create fresh connection to specific database (this was the fix instead of USE commands)
connStr := fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable",
host, port, user, dbName)
db, err := sql.Open("postgres", connStr)
if err != nil {
t.Fatalf("Failed to connect to database %s: %v", dbName, err)
}
defer db.Close()
var currentDB string
err = db.QueryRow("SELECT current_database()").Scan(&currentDB)
if err != nil {
t.Errorf("Failed to verify database connection to %s: %v", dbName, err)
return
}
log.Printf("✓ Successfully connected to database: %s", currentDB)
})
}
}
// TestCOUNTFunctionParsing tests COUNT(*) parsing that was fixed
func TestCOUNTFunctionParsing(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Test COUNT(*) on known table (this was fixed in the parser)
var count int
err := db.QueryRow("SELECT COUNT(*) FROM application_logs").Scan(&count)
if err != nil {
t.Fatalf("COUNT(*) query failed: %v", err)
}
if count <= 0 {
t.Error("Expected COUNT(*) to return positive number")
}
log.Printf("✓ COUNT(*) parsing test passed - found %d records", count)
}
// TestParquetLogicalTypesDisplay tests that Parquet logical types are displayed correctly
func TestParquetLogicalTypesDisplay(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Test that timestamp logical types are visible in query results
rows, err := db.Query("SELECT timestamp, id FROM application_logs LIMIT 2")
if err != nil {
t.Fatalf("Failed to query logical types: %v", err)
}
defer rows.Close()
count := 0
for rows.Next() {
var timestamp, id string
if err := rows.Scan(&timestamp, &id); err != nil {
t.Errorf("Error scanning logical type data: %v", err)
continue
}
// Check that timestamp contains the logical type structure
if !containsAny(timestamp, []string{"timestamp_value", "timestamp_mic"}) {
t.Errorf("Expected timestamp to contain logical type structure, got: %s", timestamp)
}
count++
log.Printf("Row %d - Timestamp: %s, ID: %s", count, timestamp, id)
}
if count == 0 {
t.Error("Expected to retrieve logical type data, got none")
}
log.Printf("✓ Parquet logical types display test passed - %d rows", count)
}
// Helper functions
func setupTestDB(t *testing.T) *sql.DB {
connStr := fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable",
getEnv("POSTGRES_HOST", "postgres-server"),
getEnv("POSTGRES_PORT", "5432"),
getEnv("POSTGRES_USER", "seaweedfs"),
getEnv("POSTGRES_DB", "logs"))
db, err := sql.Open("postgres", connStr)
if err != nil {
t.Fatalf("Failed to connect to database: %v", err)
}
// Wait for server to be ready
for i := 0; i < 30; i++ {
if err = db.Ping(); err == nil {
break
}
time.Sleep(500 * time.Millisecond)
}
if err != nil {
t.Fatalf("Database not ready after 15 seconds: %v", err)
}
return db
}
func containsAny(str string, substrings []string) bool {
for _, sub := range substrings {
if len(str) >= len(sub) {
for i := 0; i <= len(str)-len(sub); i++ {
if str[i:i+len(sub)] == sub {
return true
}
}
}
}
return false
}

71
weed/query/engine/time_filter_test.go

@ -5,68 +5,6 @@ import (
"testing" "testing"
) )
// TestTimeFilterExtraction tests the extraction of time filters from WHERE clauses
func TestTimeFilterExtraction(t *testing.T) {
_ = NewTestSQLEngine()
// Test data: use fixed timestamps for consistent testing
_ = []struct {
name string
whereClause string
expectedStartNs int64
expectedStopNs int64
description string
}{
{
name: "Greater Than Filter",
whereClause: "_timestamp_ns > 1672531200000000000", // Fixed timestamp
expectedStartNs: 1672531200000000000,
expectedStopNs: 0, // No upper bound
description: "Should extract start time from > comparison",
},
{
name: "Less Than Filter",
whereClause: "_timestamp_ns < 1672617600000000000", // Fixed timestamp
expectedStartNs: 0, // No lower bound
expectedStopNs: 1672617600000000000,
description: "Should extract stop time from < comparison",
},
{
name: "Range Filter (AND)",
whereClause: "_timestamp_ns >= 1672531200000000000 AND _timestamp_ns <= 1672617600000000000",
expectedStartNs: 1672531200000000000,
expectedStopNs: 1672617600000000000,
description: "Should extract both bounds from range query",
},
{
name: "Equal Filter",
whereClause: "_timestamp_ns = 1672531200000000000",
expectedStartNs: 1672531200000000000,
expectedStopNs: 1672531200000000000,
description: "Should set both bounds for exact match",
},
{
name: "Non-Time Filter",
whereClause: "user_id > 1000",
expectedStartNs: 0,
expectedStopNs: 0,
description: "Should ignore non-time comparisons",
},
{
name: "OR Filter (Skip)",
whereClause: "_timestamp_ns > 1672531200000000000 OR user_id = 123",
expectedStartNs: 0,
expectedStopNs: 0,
description: "Should skip time extraction for OR clauses (unsafe)",
},
}
// TODO: Rewrite this test to work with the PostgreSQL parser instead of sqlparser
// The test has been temporarily disabled while migrating from sqlparser to native PostgreSQL parser
t.Skip("Test disabled during sqlparser removal - needs rewrite for PostgreSQL parser")
}
// TestTimeColumnRecognition tests the recognition of time-related columns // TestTimeColumnRecognition tests the recognition of time-related columns
func TestTimeColumnRecognition(t *testing.T) { func TestTimeColumnRecognition(t *testing.T) {
engine := NewTestSQLEngine() engine := NewTestSQLEngine()
@ -111,15 +49,6 @@ func TestTimeColumnRecognition(t *testing.T) {
t.Log("Time column recognition working correctly") t.Log("Time column recognition working correctly")
} }
// TestTimeValueParsing tests parsing of different time value formats
func TestTimeValueParsing(t *testing.T) {
_ = NewTestSQLEngine()
// TODO: Rewrite this test to work without sqlparser types
// The test has been temporarily disabled while migrating from sqlparser to native PostgreSQL parser
t.Skip("Test disabled during sqlparser removal - needs rewrite for PostgreSQL parser")
}
// TestTimeFilterIntegration tests the full integration of time filters with SELECT queries // TestTimeFilterIntegration tests the full integration of time filters with SELECT queries
func TestTimeFilterIntegration(t *testing.T) { func TestTimeFilterIntegration(t *testing.T) {
engine := NewTestSQLEngine() engine := NewTestSQLEngine()

113
weed/server/postgres/protocol.go

@ -16,43 +16,116 @@ import (
) )
// splitSQLStatements splits a query string into individual SQL statements // splitSQLStatements splits a query string into individual SQL statements
// This is a simple implementation that splits on semicolons outside of quoted strings
// This robust implementation handles SQL comments, quoted strings, and escaped characters
func splitSQLStatements(query string) []string { func splitSQLStatements(query string) []string {
var statements []string var statements []string
var current strings.Builder var current strings.Builder
inSingleQuote := false
inDoubleQuote := false
query = strings.TrimSpace(query) query = strings.TrimSpace(query)
if query == "" { if query == "" {
return []string{} return []string{}
} }
for _, char := range query {
switch char {
case '\'':
if !inDoubleQuote {
inSingleQuote = !inSingleQuote
runes := []rune(query)
i := 0
for i < len(runes) {
char := runes[i]
// Handle single-line comments (-- comment)
if char == '-' && i+1 < len(runes) && runes[i+1] == '-' {
// Skip the entire comment without including it in any statement
for i < len(runes) && runes[i] != '\n' && runes[i] != '\r' {
i++
} }
current.WriteRune(char)
case '"':
if !inSingleQuote {
inDoubleQuote = !inDoubleQuote
// Skip the newline if present
if i < len(runes) {
i++
}
continue
}
// Handle multi-line comments (/* comment */)
if char == '/' && i+1 < len(runes) && runes[i+1] == '*' {
// Skip the /* opening
i++
i++
// Skip to end of comment or end of input without including content
for i < len(runes) {
if runes[i] == '*' && i+1 < len(runes) && runes[i+1] == '/' {
i++ // Skip the *
i++ // Skip the /
break
}
i++
} }
continue
}
// Handle single-quoted strings
if char == '\'' {
current.WriteRune(char) current.WriteRune(char)
case ';':
if !inSingleQuote && !inDoubleQuote {
stmt := strings.TrimSpace(current.String())
if stmt != "" {
statements = append(statements, stmt)
i++
for i < len(runes) {
char = runes[i]
current.WriteRune(char)
if char == '\'' {
// Check if it's an escaped quote
if i+1 < len(runes) && runes[i+1] == '\'' {
i++ // Skip the next quote (it's escaped)
if i < len(runes) {
current.WriteRune(runes[i])
}
} else {
break // End of string
}
} }
current.Reset()
} else {
i++
}
i++
continue
}
// Handle double-quoted identifiers
if char == '"' {
current.WriteRune(char)
i++
for i < len(runes) {
char = runes[i]
current.WriteRune(char) current.WriteRune(char)
if char == '"' {
// Check if it's an escaped quote
if i+1 < len(runes) && runes[i+1] == '"' {
i++ // Skip the next quote (it's escaped)
if i < len(runes) {
current.WriteRune(runes[i])
}
} else {
break // End of identifier
}
}
i++
} }
default:
i++
continue
}
// Handle semicolon (statement separator)
if char == ';' {
stmt := strings.TrimSpace(current.String())
if stmt != "" {
statements = append(statements, stmt)
}
current.Reset()
} else {
current.WriteRune(char) current.WriteRune(char)
} }
i++
} }
// Add any remaining statement // Add any remaining statement

Loading…
Cancel
Save