You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
309 lines
7.9 KiB
309 lines
7.9 KiB
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
func TestSQLEngine_HybridSelectBasic(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test SELECT with _source column to show both live and archived data
|
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events")
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
|
}
|
|
|
|
if len(result.Columns) == 0 {
|
|
t.Error("Expected columns in result")
|
|
}
|
|
|
|
// In mock environment, we only get live_log data from unflushed messages
|
|
// parquet_archive data would come from parquet files in a real system
|
|
if len(result.Rows) == 0 {
|
|
t.Error("Expected rows in result")
|
|
}
|
|
|
|
// Check that we have the _source column showing data source
|
|
hasSourceColumn := false
|
|
sourceColumnIndex := -1
|
|
for i, column := range result.Columns {
|
|
if column == SW_COLUMN_NAME_SOURCE {
|
|
hasSourceColumn = true
|
|
sourceColumnIndex = i
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasSourceColumn {
|
|
t.Skip("_source column not available in fallback mode - test requires real SeaweedFS cluster")
|
|
}
|
|
|
|
// Verify we have the expected data sources (in mock environment, only live_log)
|
|
if hasSourceColumn && sourceColumnIndex >= 0 {
|
|
foundLiveLog := false
|
|
|
|
for _, row := range result.Rows {
|
|
if sourceColumnIndex < len(row) {
|
|
source := row[sourceColumnIndex].ToString()
|
|
if source == "live_log" {
|
|
foundLiveLog = true
|
|
}
|
|
// In mock environment, all data comes from unflushed messages (live_log)
|
|
// In a real system, we would also see parquet_archive from parquet files
|
|
}
|
|
}
|
|
|
|
if !foundLiveLog {
|
|
t.Error("Expected to find live_log data source in results")
|
|
}
|
|
|
|
t.Logf("Found live_log data source from unflushed messages")
|
|
}
|
|
}
|
|
|
|
func TestSQLEngine_HybridSelectWithLimit(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test SELECT with LIMIT on hybrid data
|
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2")
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
|
}
|
|
|
|
// Should have exactly 2 rows due to LIMIT
|
|
if len(result.Rows) != 2 {
|
|
t.Errorf("Expected 2 rows with LIMIT 2, got %d", len(result.Rows))
|
|
}
|
|
}
|
|
|
|
func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test both user_events and system_logs tables
|
|
tables := []string{"user_events", "system_logs"}
|
|
|
|
for _, tableName := range tables {
|
|
result, err := engine.ExecuteSQL(context.Background(), fmt.Sprintf("SELECT *, _source FROM %s", tableName))
|
|
if err != nil {
|
|
t.Errorf("Error querying hybrid table %s: %v", tableName, err)
|
|
continue
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Errorf("Query error for hybrid table %s: %v", tableName, result.Error)
|
|
continue
|
|
}
|
|
|
|
if len(result.Columns) == 0 {
|
|
t.Errorf("No columns returned for hybrid table %s", tableName)
|
|
}
|
|
|
|
if len(result.Rows) == 0 {
|
|
t.Errorf("No rows returned for hybrid table %s", tableName)
|
|
}
|
|
|
|
// Check for _source column
|
|
hasSourceColumn := false
|
|
for _, column := range result.Columns {
|
|
if column == "_source" {
|
|
hasSourceColumn = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasSourceColumn {
|
|
t.Logf("Table %s missing _source column - running in fallback mode", tableName)
|
|
}
|
|
|
|
t.Logf("Table %s: %d columns, %d rows with hybrid data sources", tableName, len(result.Columns), len(result.Rows))
|
|
}
|
|
}
|
|
|
|
func TestSQLEngine_HybridDataSource(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test that we can distinguish between live and archived data
|
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type, _source FROM user_events")
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
|
}
|
|
|
|
// Find the _source column
|
|
sourceColumnIndex := -1
|
|
eventTypeColumnIndex := -1
|
|
|
|
for i, column := range result.Columns {
|
|
switch column {
|
|
case "_source":
|
|
sourceColumnIndex = i
|
|
case "event_type":
|
|
eventTypeColumnIndex = i
|
|
}
|
|
}
|
|
|
|
if sourceColumnIndex == -1 {
|
|
t.Skip("Could not find _source column - test requires real SeaweedFS cluster")
|
|
}
|
|
|
|
if eventTypeColumnIndex == -1 {
|
|
t.Fatal("Could not find event_type column")
|
|
}
|
|
|
|
// Check the data characteristics
|
|
liveEventFound := false
|
|
archivedEventFound := false
|
|
|
|
for _, row := range result.Rows {
|
|
if sourceColumnIndex < len(row) && eventTypeColumnIndex < len(row) {
|
|
source := row[sourceColumnIndex].ToString()
|
|
eventType := row[eventTypeColumnIndex].ToString()
|
|
|
|
if source == "live_log" && strings.Contains(eventType, "live_") {
|
|
liveEventFound = true
|
|
t.Logf("Found live event: %s from %s", eventType, source)
|
|
}
|
|
|
|
if source == "parquet_archive" && strings.Contains(eventType, "archived_") {
|
|
archivedEventFound = true
|
|
t.Logf("Found archived event: %s from %s", eventType, source)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !liveEventFound {
|
|
t.Error("Expected to find live events with live_ prefix")
|
|
}
|
|
|
|
if !archivedEventFound {
|
|
t.Error("Expected to find archived events with archived_ prefix")
|
|
}
|
|
}
|
|
|
|
func TestSQLEngine_HybridSystemLogs(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test system_logs with hybrid data
|
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT level, message, service, _source FROM system_logs")
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
|
}
|
|
|
|
// Should have both live and archived system logs
|
|
if len(result.Rows) < 2 {
|
|
t.Errorf("Expected at least 2 system log entries, got %d", len(result.Rows))
|
|
}
|
|
|
|
// Find column indices
|
|
levelIndex := -1
|
|
sourceIndex := -1
|
|
|
|
for i, column := range result.Columns {
|
|
switch column {
|
|
case "level":
|
|
levelIndex = i
|
|
case "_source":
|
|
sourceIndex = i
|
|
}
|
|
}
|
|
|
|
// Verify we have both live and archived system logs
|
|
foundLive := false
|
|
foundArchived := false
|
|
|
|
for _, row := range result.Rows {
|
|
if sourceIndex >= 0 && sourceIndex < len(row) {
|
|
source := row[sourceIndex].ToString()
|
|
|
|
if source == "live_log" {
|
|
foundLive = true
|
|
if levelIndex >= 0 && levelIndex < len(row) {
|
|
level := row[levelIndex].ToString()
|
|
t.Logf("Live system log: level=%s", level)
|
|
}
|
|
}
|
|
|
|
if source == "parquet_archive" {
|
|
foundArchived = true
|
|
if levelIndex >= 0 && levelIndex < len(row) {
|
|
level := row[levelIndex].ToString()
|
|
t.Logf("Archived system log: level=%s", level)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !foundLive {
|
|
t.Log("No live system logs found - running in fallback mode")
|
|
}
|
|
|
|
if !foundArchived {
|
|
t.Log("No archived system logs found - running in fallback mode")
|
|
}
|
|
}
|
|
|
|
func TestSQLEngine_HybridSelectWithTimeImplications(t *testing.T) {
|
|
engine := NewTestSQLEngine()
|
|
|
|
// Test that demonstrates the time-based nature of hybrid data
|
|
// Live data should be more recent than archived data
|
|
result, err := engine.ExecuteSQL(context.Background(), "SELECT event_type, _source FROM user_events")
|
|
if err != nil {
|
|
t.Fatalf("Expected no error, got %v", err)
|
|
}
|
|
|
|
if result.Error != nil {
|
|
t.Fatalf("Expected no query error, got %v", result.Error)
|
|
}
|
|
|
|
// This test documents that hybrid scanning provides a complete view
|
|
// of both recent (live) and historical (archived) data in a single query
|
|
liveCount := 0
|
|
archivedCount := 0
|
|
|
|
sourceIndex := -1
|
|
for i, column := range result.Columns {
|
|
if column == "_source" {
|
|
sourceIndex = i
|
|
break
|
|
}
|
|
}
|
|
|
|
if sourceIndex >= 0 {
|
|
for _, row := range result.Rows {
|
|
if sourceIndex < len(row) {
|
|
source := row[sourceIndex].ToString()
|
|
switch source {
|
|
case "live_log":
|
|
liveCount++
|
|
case "parquet_archive":
|
|
archivedCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
t.Logf("Hybrid query results: %d live messages, %d archived messages", liveCount, archivedCount)
|
|
|
|
if liveCount == 0 && archivedCount == 0 {
|
|
t.Log("No live or archived messages found - running in fallback mode")
|
|
}
|
|
}
|