You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
193 lines
6.6 KiB
193 lines
6.6 KiB
package engine
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
// TestFastPathCountFixRealistic tests the specific scenario mentioned in the bug report:
|
|
// Fast path returning 0 for COUNT(*) when slow path returns 1803
|
|
func TestFastPathCountFixRealistic(t *testing.T) {
|
|
engine := NewMockSQLEngine()
|
|
|
|
// Set up debug mode to see our new logging
|
|
ctx := context.WithValue(context.Background(), "debug", true)
|
|
|
|
// Create realistic data sources that mimic a scenario with 1803 rows
|
|
dataSources := &TopicDataSources{
|
|
ParquetFiles: map[string][]*ParquetFileStats{
|
|
"/topics/test/large-topic/0000-1023": {
|
|
{
|
|
RowCount: 800,
|
|
ColumnStats: map[string]*ParquetColumnStats{
|
|
"id": {
|
|
ColumnName: "id",
|
|
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1}},
|
|
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 800}},
|
|
NullCount: 0,
|
|
RowCount: 800,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
RowCount: 500,
|
|
ColumnStats: map[string]*ParquetColumnStats{
|
|
"id": {
|
|
ColumnName: "id",
|
|
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 801}},
|
|
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1300}},
|
|
NullCount: 0,
|
|
RowCount: 500,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"/topics/test/large-topic/1024-2047": {
|
|
{
|
|
RowCount: 300,
|
|
ColumnStats: map[string]*ParquetColumnStats{
|
|
"id": {
|
|
ColumnName: "id",
|
|
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1301}},
|
|
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1600}},
|
|
NullCount: 0,
|
|
RowCount: 300,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
ParquetRowCount: 1600, // 800 + 500 + 300
|
|
LiveLogRowCount: 203, // Additional live log data
|
|
PartitionsCount: 2,
|
|
LiveLogFilesCount: 15,
|
|
}
|
|
|
|
partitions := []string{
|
|
"/topics/test/large-topic/0000-1023",
|
|
"/topics/test/large-topic/1024-2047",
|
|
}
|
|
|
|
t.Run("COUNT(*) should return correct total (1803)", func(t *testing.T) {
|
|
computer := NewAggregationComputer(engine.SQLEngine)
|
|
|
|
aggregations := []AggregationSpec{
|
|
{Function: FuncCOUNT, Column: "*", Alias: "COUNT(*)"},
|
|
}
|
|
|
|
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
|
|
|
|
assert.NoError(t, err, "Fast path aggregation should not error")
|
|
assert.Len(t, results, 1, "Should return one result")
|
|
|
|
// This is the key test - before our fix, this was returning 0
|
|
expectedCount := int64(1803) // 1600 (parquet) + 203 (live log)
|
|
actualCount := results[0].Count
|
|
|
|
assert.Equal(t, expectedCount, actualCount,
|
|
"COUNT(*) should return %d (1600 parquet + 203 live log), but got %d",
|
|
expectedCount, actualCount)
|
|
})
|
|
|
|
t.Run("MIN/MAX should work with multiple partitions", func(t *testing.T) {
|
|
computer := NewAggregationComputer(engine.SQLEngine)
|
|
|
|
aggregations := []AggregationSpec{
|
|
{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
|
|
{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
|
|
}
|
|
|
|
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
|
|
|
|
assert.NoError(t, err, "Fast path aggregation should not error")
|
|
assert.Len(t, results, 2, "Should return two results")
|
|
|
|
// MIN should be the lowest across all parquet files
|
|
assert.Equal(t, int64(1), results[0].Min, "MIN should be 1")
|
|
|
|
// MAX should be the highest across all parquet files
|
|
assert.Equal(t, int64(1600), results[1].Max, "MAX should be 1600")
|
|
})
|
|
}
|
|
|
|
// TestFastPathDataSourceDiscoveryLogging tests that our debug logging works correctly
|
|
func TestFastPathDataSourceDiscoveryLogging(t *testing.T) {
|
|
// This test verifies that our enhanced data source collection structure is correct
|
|
|
|
t.Run("DataSources structure validation", func(t *testing.T) {
|
|
// Test the TopicDataSources structure initialization
|
|
dataSources := &TopicDataSources{
|
|
ParquetFiles: make(map[string][]*ParquetFileStats),
|
|
ParquetRowCount: 0,
|
|
LiveLogRowCount: 0,
|
|
LiveLogFilesCount: 0,
|
|
PartitionsCount: 0,
|
|
}
|
|
|
|
assert.NotNil(t, dataSources, "Data sources should not be nil")
|
|
assert.NotNil(t, dataSources.ParquetFiles, "ParquetFiles map should be initialized")
|
|
assert.GreaterOrEqual(t, dataSources.PartitionsCount, 0, "PartitionsCount should be non-negative")
|
|
assert.GreaterOrEqual(t, dataSources.ParquetRowCount, int64(0), "ParquetRowCount should be non-negative")
|
|
assert.GreaterOrEqual(t, dataSources.LiveLogRowCount, int64(0), "LiveLogRowCount should be non-negative")
|
|
})
|
|
}
|
|
|
|
// TestFastPathValidationLogic tests the enhanced validation we added
|
|
func TestFastPathValidationLogic(t *testing.T) {
|
|
t.Run("Validation catches data source vs computation mismatch", func(t *testing.T) {
|
|
// Create a scenario where data sources and computation might be inconsistent
|
|
dataSources := &TopicDataSources{
|
|
ParquetFiles: make(map[string][]*ParquetFileStats),
|
|
ParquetRowCount: 1000, // Data sources say 1000 rows
|
|
LiveLogRowCount: 0,
|
|
PartitionsCount: 1,
|
|
}
|
|
|
|
// But aggregation result says different count (simulating the original bug)
|
|
aggResults := []AggregationResult{
|
|
{Count: 0}, // Bug: returns 0 when data sources show 1000
|
|
}
|
|
|
|
// This simulates the validation logic from tryFastParquetAggregation
|
|
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
|
|
countResult := aggResults[0].Count
|
|
|
|
// Our validation should catch this mismatch
|
|
assert.NotEqual(t, totalRows, countResult,
|
|
"This test simulates the bug: data sources show %d but COUNT returns %d",
|
|
totalRows, countResult)
|
|
|
|
// In the real code, this would trigger a fallback to slow path
|
|
validationPassed := (countResult == totalRows)
|
|
assert.False(t, validationPassed, "Validation should fail for inconsistent data")
|
|
})
|
|
|
|
t.Run("Validation passes for consistent data", func(t *testing.T) {
|
|
// Create a scenario where everything is consistent
|
|
dataSources := &TopicDataSources{
|
|
ParquetFiles: make(map[string][]*ParquetFileStats),
|
|
ParquetRowCount: 1000,
|
|
LiveLogRowCount: 803,
|
|
PartitionsCount: 1,
|
|
}
|
|
|
|
// Aggregation result matches data sources
|
|
aggResults := []AggregationResult{
|
|
{Count: 1803}, // Correct: matches 1000 + 803
|
|
}
|
|
|
|
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
|
|
countResult := aggResults[0].Count
|
|
|
|
// Our validation should pass this
|
|
assert.Equal(t, totalRows, countResult,
|
|
"Validation should pass when data sources (%d) match COUNT result (%d)",
|
|
totalRows, countResult)
|
|
|
|
validationPassed := (countResult == totalRows)
|
|
assert.True(t, validationPassed, "Validation should pass for consistent data")
|
|
})
|
|
}
|