You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

438 lines
14 KiB

package engine
import (
"context"
"fmt"
"math/big"
"time"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
)
// ParquetScanner scans MQ topic Parquet files for SELECT queries
// Assumptions:
// 1. All MQ messages are stored in Parquet format in topic partitions
// 2. Each partition directory contains dated Parquet files
// 3. System columns (_timestamp_ns, _key) are added to user schema
// 4. Predicate pushdown is used for efficient scanning
type ParquetScanner struct {
filerClient filer_pb.FilerClient
chunkCache chunk_cache.ChunkCache
topic topic.Topic
recordSchema *schema_pb.RecordType
parquetLevels *schema.ParquetLevels
}
// NewParquetScanner creates a scanner for a specific MQ topic
// Assumption: Topic exists and has Parquet files in partition directories
func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName string) (*ParquetScanner, error) {
// Check if filerClient is available
if filerClient == nil {
return nil, fmt.Errorf("filerClient is required but not available")
}
// Create topic reference
t := topic.Topic{
Namespace: namespace,
Name: topicName,
}
// Read topic configuration to get schema
var topicConf *mq_pb.ConfigureTopicResponse
var err error
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
topicConf, err = t.ReadConfFile(client)
return err
}); err != nil {
return nil, fmt.Errorf("failed to read topic config: %v", err)
}
// Build complete schema with system columns
recordType := topicConf.GetRecordType()
if recordType == nil {
return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
}
// Add system columns that MQ adds to all records
recordType = schema.NewRecordTypeBuilder(recordType).
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
RecordTypeEnd()
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil {
return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
}
return &ParquetScanner{
filerClient: filerClient,
chunkCache: chunk_cache.NewChunkCacheInMemory(256), // Same as MQ logstore
topic: t,
recordSchema: recordType,
parquetLevels: parquetLevels,
}, nil
}
// ScanOptions configure how the scanner reads data
type ScanOptions struct {
// Time range filtering (Unix nanoseconds)
StartTimeNs int64
StopTimeNs int64
// Column projection - if empty, select all columns
Columns []string
// Row limit - 0 means no limit
Limit int
// Predicate for WHERE clause filtering
Predicate func(*schema_pb.RecordValue) bool
}
// ScanResult represents a single scanned record
type ScanResult struct {
Values map[string]*schema_pb.Value // Column name -> value
Timestamp int64 // Message timestamp (_ts_ns)
Key []byte // Message key (_key)
}
// Scan reads records from the topic's Parquet files
// Assumptions:
// 1. Scans all partitions of the topic
// 2. Applies time filtering at Parquet level for efficiency
// 3. Applies predicates and projections after reading
func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) {
var results []ScanResult
// Get all partitions for this topic
// TODO: Implement proper partition discovery
// For now, assume partition 0 exists
partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}}
for _, partition := range partitions {
partitionResults, err := ps.scanPartition(ctx, partition, options)
if err != nil {
return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err)
}
results = append(results, partitionResults...)
// Apply global limit across all partitions
if options.Limit > 0 && len(results) >= options.Limit {
results = results[:options.Limit]
break
}
}
return results, nil
}
// scanPartition scans a specific topic partition
func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) {
// partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing
var results []ScanResult
// List Parquet files in partition directory
// TODO: Implement proper file listing with date range filtering
// For now, this is a placeholder that would list actual Parquet files
// Simulate file processing - in real implementation, this would:
// 1. List files in partitionDir via filerClient
// 2. Filter files by date range if time filtering is enabled
// 3. Process each Parquet file in chronological order
// Placeholder: Create sample data for testing
if len(results) == 0 {
// Generate sample data for demonstration
sampleData := ps.generateSampleData(options)
results = append(results, sampleData...)
}
return results, nil
}
// scanParquetFile scans a single Parquet file (real implementation)
func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) {
var results []ScanResult
// Create reader for the Parquet file (same pattern as logstore)
lookupFileIdFn := filer.LookupFn(ps.filerClient)
fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize))
// Create Parquet reader
parquetReader := parquet.NewReader(readerAt)
defer parquetReader.Close()
rows := make([]parquet.Row, 128) // Read in batches like logstore
for {
rowCount, readErr := parquetReader.ReadRows(rows)
// Process rows even if EOF
for i := 0; i < rowCount; i++ {
// Convert Parquet row to schema value
recordValue, err := schema.ToRecordValue(ps.recordSchema, ps.parquetLevels, rows[i])
if err != nil {
return nil, fmt.Errorf("failed to convert row: %v", err)
}
// Extract system columns
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply time filtering
if options.StartTimeNs > 0 && timestamp < options.StartTimeNs {
continue
}
if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs {
break // Assume data is time-ordered
}
// Apply predicate filtering (WHERE clause)
if options.Predicate != nil && !options.Predicate(recordValue) {
continue
}
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
results = append(results, ScanResult{
Values: values,
Timestamp: timestamp,
Key: key,
})
// Apply row limit
if options.Limit > 0 && len(results) >= options.Limit {
return results, nil
}
}
if readErr != nil {
break // EOF or error
}
}
return results, nil
}
// generateSampleData creates sample data for testing when no real Parquet files exist
func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult {
now := time.Now().UnixNano()
sampleData := []ScanResult{
{
Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}},
},
Timestamp: now - 3600000000000, // 1 hour ago
Key: []byte("user-1001"),
},
{
Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}},
},
Timestamp: now - 1800000000000, // 30 minutes ago
Key: []byte("user-1002"),
},
{
Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}},
},
Timestamp: now - 900000000000, // 15 minutes ago
Key: []byte("user-1001"),
},
}
// Apply predicate filtering if specified
if options.Predicate != nil {
var filtered []ScanResult
for _, result := range sampleData {
// Convert to RecordValue for predicate testing
recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
for k, v := range result.Values {
recordValue.Fields[k] = v
}
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
if options.Predicate(recordValue) {
filtered = append(filtered, result)
}
}
sampleData = filtered
}
// Apply limit
if options.Limit > 0 && len(sampleData) > options.Limit {
sampleData = sampleData[:options.Limit]
}
return sampleData
}
// ConvertToSQLResult converts ScanResults to SQL query results
func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []string) *QueryResult {
if len(results) == 0 {
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{},
}
}
// Determine columns if not specified
if len(columns) == 0 {
columnSet := make(map[string]bool)
for _, result := range results {
for columnName := range result.Values {
columnSet[columnName] = true
}
}
columns = make([]string, 0, len(columnSet))
for columnName := range columnSet {
columns = append(columns, columnName)
}
}
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
row := make([]sqltypes.Value, len(columns))
for j, columnName := range columns {
if value, exists := result.Values[columnName]; exists {
row[j] = convertSchemaValueToSQL(value)
} else {
row[j] = sqltypes.NULL
}
}
rows[i] = row
}
return &QueryResult{
Columns: columns,
Rows: rows,
}
}
// convertSchemaValueToSQL converts schema_pb.Value to sqltypes.Value
func convertSchemaValueToSQL(value *schema_pb.Value) sqltypes.Value {
if value == nil {
return sqltypes.NULL
}
switch v := value.Kind.(type) {
case *schema_pb.Value_BoolValue:
if v.BoolValue {
return sqltypes.NewInt32(1)
}
return sqltypes.NewInt32(0)
case *schema_pb.Value_Int32Value:
return sqltypes.NewInt32(v.Int32Value)
case *schema_pb.Value_Int64Value:
return sqltypes.NewInt64(v.Int64Value)
case *schema_pb.Value_FloatValue:
return sqltypes.NewFloat32(v.FloatValue)
case *schema_pb.Value_DoubleValue:
return sqltypes.NewFloat64(v.DoubleValue)
case *schema_pb.Value_BytesValue:
return sqltypes.NewVarBinary(string(v.BytesValue))
case *schema_pb.Value_StringValue:
return sqltypes.NewVarChar(v.StringValue)
// Parquet logical types
case *schema_pb.Value_TimestampValue:
timestampValue := value.GetTimestampValue()
if timestampValue == nil {
return sqltypes.NULL
}
// Convert microseconds to time.Time and format as datetime string
timestamp := time.UnixMicro(timestampValue.TimestampMicros)
return sqltypes.MakeTrusted(sqltypes.Datetime, []byte(timestamp.Format("2006-01-02 15:04:05")))
case *schema_pb.Value_DateValue:
dateValue := value.GetDateValue()
if dateValue == nil {
return sqltypes.NULL
}
// Convert days since epoch to date string
date := time.Unix(int64(dateValue.DaysSinceEpoch)*86400, 0).UTC()
return sqltypes.MakeTrusted(sqltypes.Date, []byte(date.Format("2006-01-02")))
case *schema_pb.Value_DecimalValue:
decimalValue := value.GetDecimalValue()
if decimalValue == nil {
return sqltypes.NULL
}
// Convert decimal bytes to string representation
decimalStr := decimalToStringHelper(decimalValue)
return sqltypes.MakeTrusted(sqltypes.Decimal, []byte(decimalStr))
case *schema_pb.Value_TimeValue:
timeValue := value.GetTimeValue()
if timeValue == nil {
return sqltypes.NULL
}
// Convert microseconds since midnight to time string
duration := time.Duration(timeValue.TimeMicros) * time.Microsecond
timeOfDay := time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC).Add(duration)
return sqltypes.MakeTrusted(sqltypes.Time, []byte(timeOfDay.Format("15:04:05")))
default:
return sqltypes.NewVarChar(fmt.Sprintf("%v", value))
}
}
// decimalToStringHelper converts a DecimalValue to string representation
// This is a standalone version of the engine's decimalToString method
func decimalToStringHelper(decimalValue *schema_pb.DecimalValue) string {
if decimalValue == nil || decimalValue.Value == nil {
return "0"
}
// Convert bytes back to big.Int
intValue := new(big.Int).SetBytes(decimalValue.Value)
// Convert to string with proper decimal placement
str := intValue.String()
// Handle decimal placement based on scale
scale := int(decimalValue.Scale)
if scale > 0 && len(str) > scale {
// Insert decimal point
decimalPos := len(str) - scale
return str[:decimalPos] + "." + str[decimalPos:]
}
return str
}