You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1668 lines
52 KiB
1668 lines
52 KiB
package engine
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/parquet-go/parquet-go"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// HybridMessageScanner scans from ALL data sources:
|
|
// Architecture:
|
|
// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
|
|
// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
|
|
// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
|
|
// 4. Seamlessly merges data from all sources chronologically
|
|
// 5. Provides complete real-time view of all messages in a topic
|
|
type HybridMessageScanner struct {
|
|
filerClient filer_pb.FilerClient
|
|
brokerClient BrokerClientInterface // For querying unflushed data
|
|
topic topic.Topic
|
|
recordSchema *schema_pb.RecordType
|
|
parquetLevels *schema.ParquetLevels
|
|
engine *SQLEngine // Reference for system column formatting
|
|
}
|
|
|
|
// NewHybridMessageScanner creates a scanner that reads from all data sources
|
|
// This provides complete real-time message coverage including unflushed data
|
|
func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
|
|
// Check if filerClient is available
|
|
if filerClient == nil {
|
|
return nil, fmt.Errorf("filerClient is required but not available")
|
|
}
|
|
|
|
// Create topic reference
|
|
t := topic.Topic{
|
|
Namespace: namespace,
|
|
Name: topicName,
|
|
}
|
|
|
|
// Get topic schema from broker client (works with both real and mock clients)
|
|
recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get topic schema: %v", err)
|
|
}
|
|
if recordType == nil {
|
|
return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
|
|
}
|
|
|
|
// Create a copy of the recordType to avoid modifying the original
|
|
recordTypeCopy := &schema_pb.RecordType{
|
|
Fields: make([]*schema_pb.Field, len(recordType.Fields)),
|
|
}
|
|
copy(recordTypeCopy.Fields, recordType.Fields)
|
|
|
|
// Add system columns that MQ adds to all records
|
|
recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
|
|
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
|
|
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
|
|
RecordTypeEnd()
|
|
|
|
// Convert to Parquet levels for efficient reading
|
|
parquetLevels, err := schema.ToParquetLevels(recordType)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
|
|
}
|
|
|
|
return &HybridMessageScanner{
|
|
filerClient: filerClient,
|
|
brokerClient: brokerClient,
|
|
topic: t,
|
|
recordSchema: recordType,
|
|
parquetLevels: parquetLevels,
|
|
engine: engine,
|
|
}, nil
|
|
}
|
|
|
|
// HybridScanOptions configure how the scanner reads from both live and archived data
|
|
type HybridScanOptions struct {
|
|
// Time range filtering (Unix nanoseconds)
|
|
StartTimeNs int64
|
|
StopTimeNs int64
|
|
|
|
// Column projection - if empty, select all columns
|
|
Columns []string
|
|
|
|
// Row limit - 0 means no limit
|
|
Limit int
|
|
|
|
// Row offset - 0 means no offset
|
|
Offset int
|
|
|
|
// Predicate for WHERE clause filtering
|
|
Predicate func(*schema_pb.RecordValue) bool
|
|
}
|
|
|
|
// HybridScanResult represents a message from either live logs or Parquet files
|
|
type HybridScanResult struct {
|
|
Values map[string]*schema_pb.Value // Column name -> value
|
|
Timestamp int64 // Message timestamp (_ts_ns)
|
|
Key []byte // Message key (_key)
|
|
Source string // "live_log" or "parquet_archive" or "in_memory_broker"
|
|
}
|
|
|
|
// HybridScanStats contains statistics about data sources scanned
|
|
type HybridScanStats struct {
|
|
BrokerBufferQueried bool
|
|
BrokerBufferMessages int
|
|
BufferStartIndex int64
|
|
PartitionsScanned int
|
|
LiveLogFilesScanned int // Number of live log files processed
|
|
}
|
|
|
|
// ParquetColumnStats holds statistics for a single column from parquet metadata
|
|
type ParquetColumnStats struct {
|
|
ColumnName string
|
|
MinValue *schema_pb.Value
|
|
MaxValue *schema_pb.Value
|
|
NullCount int64
|
|
RowCount int64
|
|
}
|
|
|
|
// ParquetFileStats holds aggregated statistics for a parquet file
|
|
type ParquetFileStats struct {
|
|
FileName string
|
|
RowCount int64
|
|
ColumnStats map[string]*ParquetColumnStats
|
|
}
|
|
|
|
// StreamingDataSource provides a streaming interface for reading scan results
|
|
type StreamingDataSource interface {
|
|
Next() (*HybridScanResult, error) // Returns next result or nil when done
|
|
HasMore() bool // Returns true if more data available
|
|
Close() error // Clean up resources
|
|
}
|
|
|
|
// StreamingMergeItem represents an item in the priority queue for streaming merge
|
|
type StreamingMergeItem struct {
|
|
Result *HybridScanResult
|
|
SourceID int
|
|
DataSource StreamingDataSource
|
|
}
|
|
|
|
// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
|
|
type StreamingMergeHeap []*StreamingMergeItem
|
|
|
|
func (h StreamingMergeHeap) Len() int { return len(h) }
|
|
|
|
func (h StreamingMergeHeap) Less(i, j int) bool {
|
|
// Sort by timestamp (ascending order)
|
|
return h[i].Result.Timestamp < h[j].Result.Timestamp
|
|
}
|
|
|
|
func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h *StreamingMergeHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(*StreamingMergeItem))
|
|
}
|
|
|
|
func (h *StreamingMergeHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// Scan reads messages from both live logs and archived Parquet files
|
|
// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
|
|
// Assumptions:
|
|
// 1. Chronologically merges live and archived data
|
|
// 2. Applies filtering at the lowest level for efficiency
|
|
// 3. Handles schema evolution transparently
|
|
func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.ScanWithStats(ctx, options)
|
|
return results, err
|
|
}
|
|
|
|
// ScanWithStats reads messages and returns scan statistics for execution plans
|
|
func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
var results []HybridScanResult
|
|
stats := &HybridScanStats{}
|
|
|
|
// Get all partitions for this topic via MQ broker discovery
|
|
partitions, err := hms.discoverTopicPartitions(ctx)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
|
|
}
|
|
|
|
stats.PartitionsScanned = len(partitions)
|
|
|
|
for _, partition := range partitions {
|
|
partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
|
|
}
|
|
|
|
results = append(results, partitionResults...)
|
|
|
|
// Aggregate broker buffer stats
|
|
if partitionStats != nil {
|
|
if partitionStats.BrokerBufferQueried {
|
|
stats.BrokerBufferQueried = true
|
|
}
|
|
stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
|
|
if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
|
|
stats.BufferStartIndex = partitionStats.BufferStartIndex
|
|
}
|
|
}
|
|
|
|
// Apply global limit (without offset) across all partitions
|
|
// When OFFSET is used, collect more data to ensure we have enough after skipping
|
|
// Note: OFFSET will be applied at the end to avoid double-application
|
|
if options.Limit > 0 {
|
|
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
|
|
minRequired := options.Limit + options.Offset
|
|
// Small buffer only when needed to handle edge cases in distributed scanning
|
|
if options.Offset > 0 && minRequired < 10 {
|
|
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
|
|
}
|
|
if len(results) >= minRequired {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply final OFFSET and LIMIT processing (done once at the end)
|
|
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
|
|
if options.Offset > 0 || options.Limit >= 0 {
|
|
// Handle LIMIT 0 special case first
|
|
if options.Limit == 0 {
|
|
return []HybridScanResult{}, stats, nil
|
|
}
|
|
|
|
// Apply OFFSET first
|
|
if options.Offset > 0 {
|
|
if options.Offset >= len(results) {
|
|
results = []HybridScanResult{}
|
|
} else {
|
|
results = results[options.Offset:]
|
|
}
|
|
}
|
|
|
|
// Apply LIMIT after OFFSET (only if limit > 0)
|
|
if options.Limit > 0 && len(results) > options.Limit {
|
|
results = results[:options.Limit]
|
|
}
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
|
|
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
|
|
return results, err
|
|
}
|
|
|
|
// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
|
|
func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
var results []HybridScanResult
|
|
stats := &HybridScanStats{}
|
|
|
|
// Skip if no broker client available
|
|
if hms.brokerClient == nil {
|
|
return results, stats, nil
|
|
}
|
|
|
|
// Mark that we attempted to query broker buffer
|
|
stats.BrokerBufferQueried = true
|
|
|
|
// Step 1: Get unflushed data from broker using buffer_start-based method
|
|
// This method uses buffer_start metadata to avoid double-counting with exact precision
|
|
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
|
|
if err != nil {
|
|
// Log error but don't fail the query - continue with disk data only
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
|
|
}
|
|
// Reset queried flag on error
|
|
stats.BrokerBufferQueried = false
|
|
return results, stats, nil
|
|
}
|
|
|
|
// Capture stats for EXPLAIN
|
|
stats.BrokerBufferMessages = len(unflushedEntries)
|
|
|
|
// Debug logging for EXPLAIN mode
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
|
|
if len(unflushedEntries) > 0 {
|
|
fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
|
|
}
|
|
}
|
|
|
|
// Step 2: Process unflushed entries (already deduplicated by broker)
|
|
for _, logEntry := range unflushedEntries {
|
|
// Skip control entries without actual data
|
|
if hms.isControlEntry(logEntry) {
|
|
continue // Skip this entry
|
|
}
|
|
|
|
// Skip messages outside time range
|
|
if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
|
|
continue
|
|
}
|
|
if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
|
|
continue
|
|
}
|
|
|
|
// Convert LogEntry to RecordValue format (same as disk data)
|
|
recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
|
|
if err != nil {
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
|
|
}
|
|
continue // Skip malformed messages
|
|
}
|
|
|
|
// Apply predicate filter if provided
|
|
if options.Predicate != nil && !options.Predicate(recordValue) {
|
|
continue
|
|
}
|
|
|
|
// Extract system columns for result
|
|
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
|
|
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
|
|
|
|
// Apply column projection
|
|
values := make(map[string]*schema_pb.Value)
|
|
if len(options.Columns) == 0 {
|
|
// Select all columns (excluding system columns from user view)
|
|
for name, value := range recordValue.Fields {
|
|
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
|
|
values[name] = value
|
|
}
|
|
}
|
|
} else {
|
|
// Select specified columns only
|
|
for _, columnName := range options.Columns {
|
|
if value, exists := recordValue.Fields[columnName]; exists {
|
|
values[columnName] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create result with proper source tagging
|
|
result := HybridScanResult{
|
|
Values: values,
|
|
Timestamp: timestamp,
|
|
Key: key,
|
|
Source: "live_log", // Data from broker's unflushed messages
|
|
}
|
|
|
|
results = append(results, result)
|
|
|
|
// Apply limit (accounting for offset) - collect exact amount needed
|
|
if options.Limit > 0 {
|
|
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
|
|
minRequired := options.Limit + options.Offset
|
|
// Small buffer only when needed to handle edge cases in message streaming
|
|
if options.Offset > 0 && minRequired < 10 {
|
|
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
|
|
}
|
|
if len(results) >= minRequired {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
|
|
func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
|
|
// Parse the message data as RecordValue
|
|
recordValue := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
|
|
return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
|
|
}
|
|
|
|
// Add system columns
|
|
if recordValue.Fields == nil {
|
|
recordValue.Fields = make(map[string]*schema_pb.Value)
|
|
}
|
|
|
|
// Add timestamp
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
|
|
}
|
|
|
|
return recordValue, string(msg.Key), nil
|
|
}
|
|
|
|
// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
|
|
// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
|
|
func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
|
|
if hms.filerClient == nil {
|
|
return nil, fmt.Errorf("filerClient not available for partition discovery")
|
|
}
|
|
|
|
var allPartitions []topic.Partition
|
|
var err error
|
|
|
|
// Scan the topic directory for actual partition versions (timestamped directories)
|
|
// List all version directories in the topic directory
|
|
err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
|
|
if !versionEntry.IsDirectory {
|
|
return nil // Skip non-directories
|
|
}
|
|
|
|
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
|
|
versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
|
|
if parseErr != nil {
|
|
// Skip directories that don't match the version format
|
|
return nil
|
|
}
|
|
|
|
// Scan partition directories within this version
|
|
versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
|
|
return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
|
|
if !partitionEntry.IsDirectory {
|
|
return nil // Skip non-directories
|
|
}
|
|
|
|
// Parse partition boundary from directory name (e.g., "0000-0630")
|
|
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
|
|
if rangeStart == rangeStop {
|
|
return nil // Skip invalid partition names
|
|
}
|
|
|
|
// Create partition object
|
|
partition := topic.Partition{
|
|
RangeStart: rangeStart,
|
|
RangeStop: rangeStop,
|
|
RingSize: topic.PartitionCount,
|
|
UnixTimeNs: versionTime.UnixNano(),
|
|
}
|
|
|
|
allPartitions = append(allPartitions, partition)
|
|
return nil
|
|
})
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
|
|
}
|
|
|
|
// If no partitions found, return empty slice (valid for newly created or empty topics)
|
|
if len(allPartitions) == 0 {
|
|
fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
|
|
return []topic.Partition{}, nil
|
|
}
|
|
|
|
fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
|
|
return allPartitions, nil
|
|
}
|
|
|
|
// scanPartitionHybrid scans a specific partition using the hybrid approach
|
|
// This is where the magic happens - seamlessly reading ALL data sources:
|
|
// 1. Unflushed in-memory data from brokers (REAL-TIME)
|
|
// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
|
|
func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
|
|
results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
|
|
return results, err
|
|
}
|
|
|
|
// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
|
|
// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
|
|
// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
|
|
// - Scalable: Can handle large topics without LIMIT clauses efficiently
|
|
// - Streaming: Processes data as it arrives rather than buffering everything
|
|
func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
|
|
stats := &HybridScanStats{}
|
|
|
|
// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
|
|
unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
|
|
if err != nil {
|
|
// Don't fail the query if broker scanning fails, but provide clear warning to user
|
|
// This ensures users are aware that results may not include the most recent data
|
|
if isDebugMode(ctx) {
|
|
fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
|
|
} else {
|
|
fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
|
|
fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
|
|
}
|
|
} else if unflushedStats != nil {
|
|
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
|
|
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
|
|
stats.BufferStartIndex = unflushedStats.BufferStartIndex
|
|
}
|
|
|
|
// Count live log files for statistics
|
|
liveLogCount, err := hms.countLiveLogFiles(partition)
|
|
if err != nil {
|
|
// Don't fail the query, just log warning
|
|
fmt.Printf("Warning: Failed to count live log files: %v\n", err)
|
|
liveLogCount = 0
|
|
}
|
|
stats.LiveLogFilesScanned = liveLogCount
|
|
|
|
// STEP 2: Create streaming data sources for memory-efficient merge
|
|
var dataSources []StreamingDataSource
|
|
|
|
// Add unflushed data source (if we have unflushed results)
|
|
if len(unflushedResults) > 0 {
|
|
// Sort unflushed results by timestamp before creating stream
|
|
if len(unflushedResults) > 1 {
|
|
hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
|
|
}
|
|
dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
|
|
}
|
|
|
|
// Add streaming flushed data source (live logs + Parquet files)
|
|
flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
|
|
dataSources = append(dataSources, flushedDataSource)
|
|
|
|
// STEP 3: Use streaming merge for memory-efficient chronological ordering
|
|
var results []HybridScanResult
|
|
if len(dataSources) > 0 {
|
|
// Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
|
|
// For LIMIT N OFFSET M, we need to collect at least N+M rows
|
|
scanLimit := options.Limit
|
|
if options.Limit > 0 && options.Offset > 0 {
|
|
scanLimit = options.Limit + options.Offset
|
|
}
|
|
|
|
mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
|
|
if err != nil {
|
|
return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
|
|
}
|
|
results = mergedResults
|
|
}
|
|
|
|
return results, stats, nil
|
|
}
|
|
|
|
// countLiveLogFiles counts the number of live log files in a partition for statistics
|
|
func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
|
|
partitionDir := topic.PartitionDir(hms.topic, partition)
|
|
|
|
var fileCount int
|
|
err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// List all files in partition directory
|
|
request := &filer_pb.ListEntriesRequest{
|
|
Directory: partitionDir,
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: true,
|
|
Limit: 10000, // reasonable limit for counting
|
|
}
|
|
|
|
stream, err := client.ListEntries(context.Background(), request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Count files that are not .parquet files (live log files)
|
|
// Live log files typically have timestamps or are named like log files
|
|
fileName := resp.Entry.Name
|
|
if !strings.HasSuffix(fileName, ".parquet") &&
|
|
!strings.HasSuffix(fileName, ".offset") &&
|
|
len(resp.Entry.Chunks) > 0 { // Has actual content
|
|
fileCount++
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return fileCount, nil
|
|
}
|
|
|
|
// isControlEntry checks if a log entry is a control entry without actual data
|
|
// Based on MQ system analysis, control entries are:
|
|
// 1. DataMessages with populated Ctrl field (publisher close signals)
|
|
// 2. Entries with empty keys (as filtered by subscriber)
|
|
// 3. Entries with no data
|
|
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
|
|
// Skip entries with no data
|
|
if len(logEntry.Data) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Skip entries with empty keys (same logic as subscriber)
|
|
if len(logEntry.Key) == 0 {
|
|
return true
|
|
}
|
|
|
|
// Check if this is a DataMessage with control field populated
|
|
dataMessage := &mq_pb.DataMessage{}
|
|
if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
|
|
// If it has a control field, it's a control message
|
|
if dataMessage.Ctrl != nil {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
|
|
// This handles both:
|
|
// 1. Live log entries (raw message format)
|
|
// 2. Parquet entries (already in schema_pb.RecordValue format)
|
|
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
|
|
// Try to unmarshal as RecordValue first (Parquet format)
|
|
recordValue := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
|
|
// This is an archived message from Parquet files
|
|
// FIX: Add system columns from LogEntry to RecordValue
|
|
if recordValue.Fields == nil {
|
|
recordValue.Fields = make(map[string]*schema_pb.Value)
|
|
}
|
|
|
|
// Add system columns from LogEntry
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
|
|
}
|
|
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
|
|
}
|
|
|
|
return recordValue, "parquet_archive", nil
|
|
}
|
|
|
|
// If not a RecordValue, this is raw live message data - parse with schema
|
|
return hms.parseRawMessageWithSchema(logEntry)
|
|
}
|
|
|
|
// parseRawMessageWithSchema parses raw live message data using the topic's schema
|
|
// This provides proper type conversion and field mapping instead of treating everything as strings
|
|
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
|
|
recordValue := &schema_pb.RecordValue{
|
|
Fields: make(map[string]*schema_pb.Value),
|
|
}
|
|
|
|
// Add system columns (always present)
|
|
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
|
|
}
|
|
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
|
|
}
|
|
|
|
// Parse message data based on schema
|
|
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
|
|
// Fallback: No schema available, treat as single "data" field
|
|
recordValue.Fields["data"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Attempt schema-aware parsing
|
|
// Strategy 1: Try JSON parsing first (most common for live messages)
|
|
if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
|
|
// Successfully parsed as JSON, merge with system columns
|
|
for fieldName, fieldValue := range parsedRecord.Fields {
|
|
recordValue.Fields[fieldName] = fieldValue
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Strategy 2: Try protobuf parsing (binary messages)
|
|
if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
|
|
// Successfully parsed as protobuf, merge with system columns
|
|
for fieldName, fieldValue := range parsedRecord.Fields {
|
|
recordValue.Fields[fieldName] = fieldValue
|
|
}
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// Strategy 3: Fallback to single field with raw data
|
|
// If schema has a single field, map the raw data to it with type conversion
|
|
if len(hms.recordSchema.Fields) == 1 {
|
|
field := hms.recordSchema.Fields[0]
|
|
convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
|
|
if err == nil {
|
|
recordValue.Fields[field.Name] = convertedValue
|
|
return recordValue, "live_log", nil
|
|
}
|
|
}
|
|
|
|
// Final fallback: treat as string data field
|
|
recordValue.Fields["data"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
|
|
}
|
|
|
|
return recordValue, "live_log", nil
|
|
}
|
|
|
|
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
|
|
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
|
|
// Try to parse as JSON
|
|
var jsonData map[string]interface{}
|
|
if err := json.Unmarshal(data, &jsonData); err != nil {
|
|
return nil, fmt.Errorf("not valid JSON: %v", err)
|
|
}
|
|
|
|
recordValue := &schema_pb.RecordValue{
|
|
Fields: make(map[string]*schema_pb.Value),
|
|
}
|
|
|
|
// Map JSON fields to schema fields
|
|
for _, schemaField := range hms.recordSchema.Fields {
|
|
fieldName := schemaField.Name
|
|
if jsonValue, exists := jsonData[fieldName]; exists {
|
|
schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
|
|
if err != nil {
|
|
// Log conversion error but continue with other fields
|
|
continue
|
|
}
|
|
recordValue.Fields[fieldName] = schemaValue
|
|
}
|
|
}
|
|
|
|
return recordValue, nil
|
|
}
|
|
|
|
// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
|
|
func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
|
|
// This might be a raw protobuf message that didn't parse correctly the first time
|
|
// Try alternative protobuf unmarshaling approaches
|
|
recordValue := &schema_pb.RecordValue{}
|
|
|
|
// Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
|
|
if err := proto.Unmarshal(data, recordValue); err == nil {
|
|
return recordValue, nil
|
|
}
|
|
|
|
// Strategy 2: Check if it's a different protobuf message type
|
|
// For now, return error as we need more specific knowledge of MQ message formats
|
|
return nil, fmt.Errorf("could not parse as protobuf RecordValue")
|
|
}
|
|
|
|
// convertRawDataToSchemaValue converts raw bytes to a specific schema type
|
|
func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
|
|
dataStr := string(data)
|
|
|
|
switch fieldType.Kind.(type) {
|
|
case *schema_pb.Type_ScalarType:
|
|
scalarType := fieldType.GetScalarType()
|
|
switch scalarType {
|
|
case schema_pb.ScalarType_STRING:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
|
|
}, nil
|
|
case schema_pb.ScalarType_INT32:
|
|
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_INT64:
|
|
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: val},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_FLOAT:
|
|
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_DOUBLE:
|
|
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BOOL:
|
|
lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
|
|
if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: true},
|
|
}, nil
|
|
} else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: false},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BYTES:
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: data},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
|
|
}
|
|
|
|
// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
|
|
func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
|
|
switch fieldType.Kind.(type) {
|
|
case *schema_pb.Type_ScalarType:
|
|
scalarType := fieldType.GetScalarType()
|
|
switch scalarType {
|
|
case schema_pb.ScalarType_STRING:
|
|
if str, ok := jsonValue.(string); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: str},
|
|
}, nil
|
|
}
|
|
// Convert other types to string
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
|
|
}, nil
|
|
case schema_pb.ScalarType_INT32:
|
|
if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_INT64:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_FLOAT:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_DOUBLE:
|
|
if num, ok := jsonValue.(float64); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BOOL:
|
|
if boolVal, ok := jsonValue.(bool); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
|
|
}, nil
|
|
}
|
|
case schema_pb.ScalarType_BYTES:
|
|
if str, ok := jsonValue.(string); ok {
|
|
return &schema_pb.Value{
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
|
|
}, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
|
|
}
|
|
|
|
// ConvertToSQLResult converts HybridScanResults to SQL query results
|
|
func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
|
|
if len(results) == 0 {
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: [][]sqltypes.Value{},
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// Determine columns if not specified
|
|
if len(columns) == 0 {
|
|
columnSet := make(map[string]bool)
|
|
for _, result := range results {
|
|
for columnName := range result.Values {
|
|
columnSet[columnName] = true
|
|
}
|
|
}
|
|
|
|
columns = make([]string, 0, len(columnSet))
|
|
for columnName := range columnSet {
|
|
columns = append(columns, columnName)
|
|
}
|
|
}
|
|
|
|
// Convert to SQL rows
|
|
rows := make([][]sqltypes.Value, len(results))
|
|
for i, result := range results {
|
|
row := make([]sqltypes.Value, len(columns))
|
|
for j, columnName := range columns {
|
|
switch columnName {
|
|
case SW_COLUMN_NAME_SOURCE:
|
|
row[j] = sqltypes.NewVarChar(result.Source)
|
|
case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
|
|
// Format timestamp as proper timestamp type instead of raw nanoseconds
|
|
row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
|
|
case SW_COLUMN_NAME_KEY:
|
|
row[j] = sqltypes.NewVarBinary(string(result.Key))
|
|
default:
|
|
if value, exists := result.Values[columnName]; exists {
|
|
row[j] = convertSchemaValueToSQL(value)
|
|
} else {
|
|
row[j] = sqltypes.NULL
|
|
}
|
|
}
|
|
}
|
|
rows[i] = row
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: rows,
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
|
|
// Combines auto-discovered columns (from *) with explicitly requested columns
|
|
func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
|
|
if len(results) == 0 {
|
|
// For empty results, combine auto-discovered columns with explicit ones
|
|
columnSet := make(map[string]bool)
|
|
|
|
// Add explicit columns first
|
|
for _, col := range explicitColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Build final column list
|
|
columns := make([]string, 0, len(columnSet))
|
|
for col := range columnSet {
|
|
columns = append(columns, col)
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: [][]sqltypes.Value{},
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// Auto-discover columns from data (like SELECT *)
|
|
autoColumns := make(map[string]bool)
|
|
for _, result := range results {
|
|
for columnName := range result.Values {
|
|
autoColumns[columnName] = true
|
|
}
|
|
}
|
|
|
|
// Combine auto-discovered and explicit columns
|
|
columnSet := make(map[string]bool)
|
|
|
|
// Add auto-discovered columns first (regular data columns)
|
|
for col := range autoColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Add explicit columns (may include system columns like _source)
|
|
for _, col := range explicitColumns {
|
|
columnSet[col] = true
|
|
}
|
|
|
|
// Build final column list
|
|
columns := make([]string, 0, len(columnSet))
|
|
for col := range columnSet {
|
|
columns = append(columns, col)
|
|
}
|
|
|
|
// Convert to SQL rows
|
|
rows := make([][]sqltypes.Value, len(results))
|
|
for i, result := range results {
|
|
row := make([]sqltypes.Value, len(columns))
|
|
for j, columnName := range columns {
|
|
switch columnName {
|
|
case SW_COLUMN_NAME_TIMESTAMP:
|
|
row[j] = sqltypes.NewInt64(result.Timestamp)
|
|
case SW_COLUMN_NAME_KEY:
|
|
row[j] = sqltypes.NewVarBinary(string(result.Key))
|
|
case SW_COLUMN_NAME_SOURCE:
|
|
row[j] = sqltypes.NewVarChar(result.Source)
|
|
default:
|
|
// Regular data column
|
|
if value, exists := result.Values[columnName]; exists {
|
|
row[j] = convertSchemaValueToSQL(value)
|
|
} else {
|
|
row[j] = sqltypes.NULL
|
|
}
|
|
}
|
|
}
|
|
rows[i] = row
|
|
}
|
|
|
|
return &QueryResult{
|
|
Columns: columns,
|
|
Rows: rows,
|
|
Database: hms.topic.Namespace,
|
|
Table: hms.topic.Name,
|
|
}
|
|
}
|
|
|
|
// ReadParquetStatistics efficiently reads column statistics from parquet files
|
|
// without scanning the full file content - uses parquet's built-in metadata
|
|
func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
|
|
var fileStats []*ParquetFileStats
|
|
|
|
// Use the same chunk cache as the logstore package
|
|
chunkCache := chunk_cache.NewChunkCacheInMemory(256)
|
|
lookupFileIdFn := filer.LookupFn(h.filerClient)
|
|
|
|
err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
// Only process parquet files
|
|
if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
|
|
return nil
|
|
}
|
|
|
|
// Extract statistics from this parquet file
|
|
stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
|
|
if err != nil {
|
|
// Log error but continue processing other files
|
|
fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
|
|
return nil
|
|
}
|
|
|
|
if stats != nil {
|
|
fileStats = append(fileStats, stats)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return fileStats, err
|
|
}
|
|
|
|
// extractParquetFileStats extracts column statistics from a single parquet file
|
|
func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
|
|
// Create reader for the parquet file
|
|
fileSize := filer.FileSize(entry)
|
|
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
|
|
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
|
|
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
|
|
readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
|
|
|
|
// Create parquet reader - this only reads metadata, not data
|
|
parquetReader := parquet.NewReader(readerAt)
|
|
defer parquetReader.Close()
|
|
|
|
fileView := parquetReader.File()
|
|
|
|
fileStats := &ParquetFileStats{
|
|
FileName: entry.Name,
|
|
RowCount: fileView.NumRows(),
|
|
ColumnStats: make(map[string]*ParquetColumnStats),
|
|
}
|
|
|
|
// Get schema information
|
|
schema := fileView.Schema()
|
|
|
|
// Process each row group
|
|
rowGroups := fileView.RowGroups()
|
|
for _, rowGroup := range rowGroups {
|
|
columnChunks := rowGroup.ColumnChunks()
|
|
|
|
// Process each column chunk
|
|
for i, chunk := range columnChunks {
|
|
// Get column name from schema
|
|
columnName := h.getColumnNameFromSchema(schema, i)
|
|
if columnName == "" {
|
|
continue
|
|
}
|
|
|
|
// Try to get column statistics
|
|
columnIndex, err := chunk.ColumnIndex()
|
|
if err != nil {
|
|
// No column index available - skip this column
|
|
continue
|
|
}
|
|
|
|
// Extract min/max values from the first page (for simplicity)
|
|
// In a more sophisticated implementation, we could aggregate across all pages
|
|
numPages := columnIndex.NumPages()
|
|
if numPages == 0 {
|
|
continue
|
|
}
|
|
|
|
minParquetValue := columnIndex.MinValue(0)
|
|
maxParquetValue := columnIndex.MaxValue(numPages - 1)
|
|
nullCount := int64(0)
|
|
|
|
// Aggregate null counts across all pages
|
|
for pageIdx := 0; pageIdx < numPages; pageIdx++ {
|
|
nullCount += columnIndex.NullCount(pageIdx)
|
|
}
|
|
|
|
// Convert parquet values to schema_pb.Value
|
|
minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Store column statistics (aggregate across row groups if column already exists)
|
|
if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
|
|
// Update existing statistics
|
|
if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
|
|
existingStats.MinValue = minValue
|
|
}
|
|
if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
|
|
existingStats.MaxValue = maxValue
|
|
}
|
|
existingStats.NullCount += nullCount
|
|
} else {
|
|
// Create new column statistics
|
|
fileStats.ColumnStats[columnName] = &ParquetColumnStats{
|
|
ColumnName: columnName,
|
|
MinValue: minValue,
|
|
MaxValue: maxValue,
|
|
NullCount: nullCount,
|
|
RowCount: rowGroup.NumRows(),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return fileStats, nil
|
|
}
|
|
|
|
// getColumnNameFromSchema extracts column name from parquet schema by index
|
|
func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
|
|
// Get the leaf columns in order
|
|
var columnNames []string
|
|
h.collectColumnNames(schema.Fields(), &columnNames)
|
|
|
|
if columnIndex >= 0 && columnIndex < len(columnNames) {
|
|
return columnNames[columnIndex]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// collectColumnNames recursively collects leaf column names from schema
|
|
func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
|
|
for _, field := range fields {
|
|
if len(field.Fields()) == 0 {
|
|
// This is a leaf field (no sub-fields)
|
|
*names = append(*names, field.Name())
|
|
} else {
|
|
// This is a group - recurse
|
|
h.collectColumnNames(field.Fields(), names)
|
|
}
|
|
}
|
|
}
|
|
|
|
// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
|
|
func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
|
|
switch pv.Kind() {
|
|
case parquet.Boolean:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
|
|
case parquet.Int32:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
|
|
case parquet.Int64:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
|
|
case parquet.Float:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
|
|
case parquet.Double:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
|
|
case parquet.ByteArray:
|
|
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
|
|
}
|
|
}
|
|
|
|
// compareSchemaValues compares two schema_pb.Value objects
|
|
func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
|
|
if v1 == nil && v2 == nil {
|
|
return 0
|
|
}
|
|
if v1 == nil {
|
|
return -1
|
|
}
|
|
if v2 == nil {
|
|
return 1
|
|
}
|
|
|
|
// Extract raw values and compare
|
|
raw1 := h.extractRawValueFromSchema(v1)
|
|
raw2 := h.extractRawValueFromSchema(v2)
|
|
|
|
return h.compareRawValues(raw1, raw2)
|
|
}
|
|
|
|
// extractRawValueFromSchema extracts the raw value from schema_pb.Value
|
|
func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
|
|
switch v := value.Kind.(type) {
|
|
case *schema_pb.Value_BoolValue:
|
|
return v.BoolValue
|
|
case *schema_pb.Value_Int32Value:
|
|
return v.Int32Value
|
|
case *schema_pb.Value_Int64Value:
|
|
return v.Int64Value
|
|
case *schema_pb.Value_FloatValue:
|
|
return v.FloatValue
|
|
case *schema_pb.Value_DoubleValue:
|
|
return v.DoubleValue
|
|
case *schema_pb.Value_BytesValue:
|
|
return string(v.BytesValue) // Convert to string for comparison
|
|
case *schema_pb.Value_StringValue:
|
|
return v.StringValue
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// compareRawValues compares two raw values
|
|
func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
|
|
// Handle nil cases
|
|
if v1 == nil && v2 == nil {
|
|
return 0
|
|
}
|
|
if v1 == nil {
|
|
return -1
|
|
}
|
|
if v2 == nil {
|
|
return 1
|
|
}
|
|
|
|
// Compare based on type
|
|
switch val1 := v1.(type) {
|
|
case bool:
|
|
if val2, ok := v2.(bool); ok {
|
|
if val1 == val2 {
|
|
return 0
|
|
}
|
|
if val1 {
|
|
return 1
|
|
}
|
|
return -1
|
|
}
|
|
case int32:
|
|
if val2, ok := v2.(int32); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case int64:
|
|
if val2, ok := v2.(int64); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case float32:
|
|
if val2, ok := v2.(float32); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case float64:
|
|
if val2, ok := v2.(float64); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
case string:
|
|
if val2, ok := v2.(string); ok {
|
|
if val1 < val2 {
|
|
return -1
|
|
} else if val1 > val2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
}
|
|
|
|
// Default: try string comparison
|
|
str1 := fmt.Sprintf("%v", v1)
|
|
str2 := fmt.Sprintf("%v", v2)
|
|
if str1 < str2 {
|
|
return -1
|
|
} else if str1 > str2 {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// streamingMerge merges multiple sorted data sources using a heap-based approach
|
|
// This provides memory-efficient merging without loading all data into memory
|
|
func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
|
|
if len(dataSources) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var results []HybridScanResult
|
|
mergeHeap := &StreamingMergeHeap{}
|
|
heap.Init(mergeHeap)
|
|
|
|
// Initialize heap with first item from each data source
|
|
for i, source := range dataSources {
|
|
if source.HasMore() {
|
|
result, err := source.Next()
|
|
if err != nil {
|
|
// Close all sources and return error
|
|
for _, s := range dataSources {
|
|
s.Close()
|
|
}
|
|
return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
|
|
}
|
|
if result != nil {
|
|
heap.Push(mergeHeap, &StreamingMergeItem{
|
|
Result: result,
|
|
SourceID: i,
|
|
DataSource: source,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process results in chronological order
|
|
for mergeHeap.Len() > 0 {
|
|
// Get next chronologically ordered result
|
|
item := heap.Pop(mergeHeap).(*StreamingMergeItem)
|
|
results = append(results, *item.Result)
|
|
|
|
// Check limit
|
|
if limit > 0 && len(results) >= limit {
|
|
break
|
|
}
|
|
|
|
// Try to get next item from the same data source
|
|
if item.DataSource.HasMore() {
|
|
nextResult, err := item.DataSource.Next()
|
|
if err != nil {
|
|
// Log error but continue with other sources
|
|
fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
|
|
} else if nextResult != nil {
|
|
heap.Push(mergeHeap, &StreamingMergeItem{
|
|
Result: nextResult,
|
|
SourceID: item.SourceID,
|
|
DataSource: item.DataSource,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close all data sources
|
|
for _, source := range dataSources {
|
|
source.Close()
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
|
|
// This is used for unflushed data that is already loaded into memory
|
|
type SliceDataSource struct {
|
|
results []HybridScanResult
|
|
index int
|
|
}
|
|
|
|
func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
|
|
return &SliceDataSource{
|
|
results: results,
|
|
index: 0,
|
|
}
|
|
}
|
|
|
|
func (s *SliceDataSource) Next() (*HybridScanResult, error) {
|
|
if s.index >= len(s.results) {
|
|
return nil, nil
|
|
}
|
|
result := &s.results[s.index]
|
|
s.index++
|
|
return result, nil
|
|
}
|
|
|
|
func (s *SliceDataSource) HasMore() bool {
|
|
return s.index < len(s.results)
|
|
}
|
|
|
|
func (s *SliceDataSource) Close() error {
|
|
return nil // Nothing to clean up for slice-based source
|
|
}
|
|
|
|
// StreamingFlushedDataSource provides streaming access to flushed data
|
|
type StreamingFlushedDataSource struct {
|
|
hms *HybridMessageScanner
|
|
partition topic.Partition
|
|
options HybridScanOptions
|
|
mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
|
|
resultChan chan *HybridScanResult
|
|
errorChan chan error
|
|
doneChan chan struct{}
|
|
started bool
|
|
finished bool
|
|
closed int32 // atomic flag to prevent double close
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
|
|
mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
|
|
|
|
return &StreamingFlushedDataSource{
|
|
hms: hms,
|
|
partition: partition,
|
|
options: options,
|
|
mergedReadFn: mergedReadFn,
|
|
resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
|
|
errorChan: make(chan error, 1),
|
|
doneChan: make(chan struct{}),
|
|
started: false,
|
|
finished: false,
|
|
}
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) startStreaming() {
|
|
if s.started {
|
|
return
|
|
}
|
|
s.started = true
|
|
|
|
go func() {
|
|
defer func() {
|
|
// Use atomic flag to ensure channels are only closed once
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
|
|
close(s.resultChan)
|
|
close(s.errorChan)
|
|
close(s.doneChan)
|
|
}
|
|
}()
|
|
|
|
// Set up time range for scanning
|
|
startTime := time.Unix(0, s.options.StartTimeNs)
|
|
if s.options.StartTimeNs == 0 {
|
|
startTime = time.Unix(0, 0)
|
|
}
|
|
|
|
stopTsNs := s.options.StopTimeNs
|
|
// For SQL queries, stopTsNs = 0 means "no stop time restriction"
|
|
// This is different from message queue consumers which want to stop at "now"
|
|
// We detect SQL context by checking if we have a predicate function
|
|
if stopTsNs == 0 && s.options.Predicate == nil {
|
|
// Only set to current time for non-SQL queries (message queue consumers)
|
|
stopTsNs = time.Now().UnixNano()
|
|
}
|
|
// If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
|
|
|
|
// Message processing function
|
|
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
|
|
// Skip control entries without actual data
|
|
if s.hms.isControlEntry(logEntry) {
|
|
return false, nil // Skip this entry
|
|
}
|
|
|
|
// Convert log entry to schema_pb.RecordValue for consistent processing
|
|
recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
|
|
if convertErr != nil {
|
|
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
|
|
}
|
|
|
|
// Apply predicate filtering (WHERE clause)
|
|
if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
|
|
return false, nil // Skip this message
|
|
}
|
|
|
|
// Extract system columns
|
|
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
|
|
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
|
|
|
|
// Apply column projection
|
|
values := make(map[string]*schema_pb.Value)
|
|
if len(s.options.Columns) == 0 {
|
|
// Select all columns (excluding system columns from user view)
|
|
for name, value := range recordValue.Fields {
|
|
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
|
|
values[name] = value
|
|
}
|
|
}
|
|
} else {
|
|
// Select specified columns only
|
|
for _, columnName := range s.options.Columns {
|
|
if value, exists := recordValue.Fields[columnName]; exists {
|
|
values[columnName] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
result := &HybridScanResult{
|
|
Values: values,
|
|
Timestamp: timestamp,
|
|
Key: key,
|
|
Source: source,
|
|
}
|
|
|
|
// Check if already closed before trying to send
|
|
if atomic.LoadInt32(&s.closed) != 0 {
|
|
return true, nil // Stop processing if closed
|
|
}
|
|
|
|
// Send result to channel with proper handling of closed channels
|
|
select {
|
|
case s.resultChan <- result:
|
|
return false, nil
|
|
case <-s.doneChan:
|
|
return true, nil // Stop processing if closed
|
|
default:
|
|
// Check again if closed (in case it was closed between the atomic check and select)
|
|
if atomic.LoadInt32(&s.closed) != 0 {
|
|
return true, nil
|
|
}
|
|
// If not closed, try sending again with blocking select
|
|
select {
|
|
case s.resultChan <- result:
|
|
return false, nil
|
|
case <-s.doneChan:
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start scanning from the specified position
|
|
startPosition := log_buffer.MessagePosition{Time: startTime}
|
|
_, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
|
|
|
|
if err != nil {
|
|
// Only try to send error if not already closed
|
|
if atomic.LoadInt32(&s.closed) == 0 {
|
|
select {
|
|
case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
|
|
case <-s.doneChan:
|
|
default:
|
|
// Channel might be full or closed, ignore
|
|
}
|
|
}
|
|
}
|
|
|
|
s.finished = true
|
|
}()
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
|
|
if !s.started {
|
|
s.startStreaming()
|
|
}
|
|
|
|
select {
|
|
case result, ok := <-s.resultChan:
|
|
if !ok {
|
|
return nil, nil // No more results
|
|
}
|
|
return result, nil
|
|
case err := <-s.errorChan:
|
|
return nil, err
|
|
case <-s.doneChan:
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) HasMore() bool {
|
|
if !s.started {
|
|
return true // Haven't started yet, so potentially has data
|
|
}
|
|
return !s.finished || len(s.resultChan) > 0
|
|
}
|
|
|
|
func (s *StreamingFlushedDataSource) Close() error {
|
|
// Use atomic flag to ensure channels are only closed once
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
|
|
close(s.doneChan)
|
|
close(s.resultChan)
|
|
close(s.errorChan)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
|
|
func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
|
|
if left < right {
|
|
mid := left + (right-left)/2
|
|
|
|
// Recursively sort both halves
|
|
hms.mergeSort(results, left, mid)
|
|
hms.mergeSort(results, mid+1, right)
|
|
|
|
// Merge the sorted halves
|
|
hms.merge(results, left, mid, right)
|
|
}
|
|
}
|
|
|
|
// merge combines two sorted subarrays into a single sorted array
|
|
func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
|
|
// Create temporary arrays for the two subarrays
|
|
leftArray := make([]HybridScanResult, mid-left+1)
|
|
rightArray := make([]HybridScanResult, right-mid)
|
|
|
|
// Copy data to temporary arrays
|
|
copy(leftArray, results[left:mid+1])
|
|
copy(rightArray, results[mid+1:right+1])
|
|
|
|
// Merge the temporary arrays back into results[left..right]
|
|
i, j, k := 0, 0, left
|
|
|
|
for i < len(leftArray) && j < len(rightArray) {
|
|
if leftArray[i].Timestamp <= rightArray[j].Timestamp {
|
|
results[k] = leftArray[i]
|
|
i++
|
|
} else {
|
|
results[k] = rightArray[j]
|
|
j++
|
|
}
|
|
k++
|
|
}
|
|
|
|
// Copy remaining elements of leftArray, if any
|
|
for i < len(leftArray) {
|
|
results[k] = leftArray[i]
|
|
i++
|
|
k++
|
|
}
|
|
|
|
// Copy remaining elements of rightArray, if any
|
|
for j < len(rightArray) {
|
|
results[k] = rightArray[j]
|
|
j++
|
|
k++
|
|
}
|
|
}
|