diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index d2762ff24..8ce7aa2a2 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -4,6 +4,11 @@ import ( "context" "encoding/binary" "fmt" + "io" + "os" + "strings" + "time" + "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/compress/zstd" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -16,10 +21,6 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "io" - "os" - "strings" - "time" ) const ( diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index a55297f33..62a085870 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -29,10 +29,10 @@ const ( // 3. System columns (_ts_ns, _key) are added to user schema // 4. Predicate pushdown is used for efficient scanning type ParquetScanner struct { - filerClient filer_pb.FilerClient - chunkCache chunk_cache.ChunkCache - topic topic.Topic - recordSchema *schema_pb.RecordType + filerClient filer_pb.FilerClient + chunkCache chunk_cache.ChunkCache + topic topic.Topic + recordSchema *schema_pb.RecordType parquetLevels *schema.ParquetLevels } @@ -47,7 +47,7 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st // Create topic reference t := topic.Topic{ Namespace: namespace, - Name: topicName, + Name: topicName, } // Read topic configuration to get schema @@ -92,13 +92,13 @@ type ScanOptions struct { // Time range filtering (Unix nanoseconds) StartTimeNs int64 StopTimeNs int64 - + // Column projection - if empty, select all columns Columns []string - + // Row limit - 0 means no limit Limit int - + // Predicate for WHERE clause filtering Predicate func(*schema_pb.RecordValue) bool } @@ -117,59 +117,59 @@ type ScanResult struct { // 3. Applies predicates and projections after reading func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) { var results []ScanResult - + // Get all partitions for this topic // TODO: Implement proper partition discovery // For now, assume partition 0 exists partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}} - + for _, partition := range partitions { partitionResults, err := ps.scanPartition(ctx, partition, options) if err != nil { return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err) } - + results = append(results, partitionResults...) - + // Apply global limit across all partitions if options.Limit > 0 && len(results) >= options.Limit { results = results[:options.Limit] break } } - + return results, nil } // scanPartition scans a specific topic partition func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) { // partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing - + var results []ScanResult - + // List Parquet files in partition directory // TODO: Implement proper file listing with date range filtering // For now, this is a placeholder that would list actual Parquet files - + // Simulate file processing - in real implementation, this would: // 1. List files in partitionDir via filerClient // 2. Filter files by date range if time filtering is enabled // 3. Process each Parquet file in chronological order - + // Placeholder: Create sample data for testing if len(results) == 0 { // Generate sample data for demonstration sampleData := ps.generateSampleData(options) results = append(results, sampleData...) } - + return results, nil } // scanParquetFile scans a single Parquet file (real implementation) func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) { var results []ScanResult - + // Create reader for the Parquet file (same pattern as logstore) lookupFileIdFn := filer.LookupFn(ps.filerClient) fileSize := filer.FileSize(entry) @@ -177,16 +177,16 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize)) - + // Create Parquet reader parquetReader := parquet.NewReader(readerAt) defer parquetReader.Close() - + rows := make([]parquet.Row, 128) // Read in batches like logstore - + for { rowCount, readErr := parquetReader.ReadRows(rows) - + // Process rows even if EOF for i := 0; i < rowCount; i++ { // Convert Parquet row to schema value @@ -194,11 +194,11 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E if err != nil { return nil, fmt.Errorf("failed to convert row: %v", err) } - + // Extract system columns timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() - + // Apply time filtering if options.StartTimeNs > 0 && timestamp < options.StartTimeNs { continue @@ -206,12 +206,12 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs { break // Assume data is time-ordered } - + // Apply predicate filtering (WHERE clause) if options.Predicate != nil && !options.Predicate(recordValue) { continue } - + // Apply column projection values := make(map[string]*schema_pb.Value) if len(options.Columns) == 0 { @@ -229,61 +229,61 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E } } } - + results = append(results, ScanResult{ Values: values, Timestamp: timestamp, Key: key, }) - + // Apply row limit if options.Limit > 0 && len(results) >= options.Limit { return results, nil } } - + if readErr != nil { break // EOF or error } } - + return results, nil } // generateSampleData creates sample data for testing when no real Parquet files exist func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult { now := time.Now().UnixNano() - + sampleData := []ScanResult{ { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}}, }, Timestamp: now - 3600000000000, // 1 hour ago Key: []byte("user-1001"), }, { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}}, }, Timestamp: now - 1800000000000, // 30 minutes ago Key: []byte("user-1002"), }, { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}}, }, Timestamp: now - 900000000000, // 15 minutes ago Key: []byte("user-1001"), }, } - + // Apply predicate filtering if specified if options.Predicate != nil { var filtered []ScanResult @@ -295,19 +295,19 @@ func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult { } recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} - + if options.Predicate(recordValue) { filtered = append(filtered, result) } } sampleData = filtered } - + // Apply limit if options.Limit > 0 && len(sampleData) > options.Limit { sampleData = sampleData[:options.Limit] } - + return sampleData } @@ -319,7 +319,7 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str Rows: [][]sqltypes.Value{}, } } - + // Determine columns if not specified if len(columns) == 0 { columnSet := make(map[string]bool) @@ -328,13 +328,13 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str columnSet[columnName] = true } } - + columns = make([]string, 0, len(columnSet)) for columnName := range columnSet { columns = append(columns, columnName) } } - + // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { @@ -348,7 +348,7 @@ func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []str } rows[i] = row } - + return &QueryResult{ Columns: columns, Rows: rows, @@ -360,7 +360,7 @@ func convertSchemaValueToSQL(value *schema_pb.Value) sqltypes.Value { if value == nil { return sqltypes.NULL } - + switch v := value.Kind.(type) { case *schema_pb.Value_BoolValue: if v.BoolValue {