diff --git a/go.mod b/go.mod index c61d5e18c..813734f04 100644 --- a/go.mod +++ b/go.mod @@ -139,7 +139,7 @@ require ( github.com/hashicorp/raft-boltdb/v2 v2.3.1 github.com/minio/crc64nvme v1.0.1 github.com/orcaman/concurrent-map/v2 v2.0.1 - github.com/parquet-go/parquet-go v0.24.0 + github.com/parquet-go/parquet-go v0.25.1 github.com/pkg/sftp v1.13.7 github.com/rabbitmq/amqp091-go v1.10.0 github.com/rclone/rclone v1.69.3 @@ -289,7 +289,6 @@ require ( github.com/ncruces/go-strftime v0.1.9 // indirect github.com/ncw/swift/v2 v2.0.3 // indirect github.com/nxadm/tail v1.4.11 // indirect - github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onsi/ginkgo/v2 v2.19.0 // indirect github.com/onsi/gomega v1.34.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect diff --git a/go.sum b/go.sum index 1611700e9..ddd9b7ba6 100644 --- a/go.sum +++ b/go.sum @@ -1291,7 +1291,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= @@ -1337,8 +1336,6 @@ github.com/ncw/swift/v2 v2.0.3 h1:8R9dmgFIWs+RiVlisCEfiQiik1hjuR0JnOkLxaP9ihg= github.com/ncw/swift/v2 v2.0.3/go.mod h1:cbAO76/ZwcFrFlHdXPjaqWZ9R7Hdar7HpjRXBfbjigk= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= -github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= -github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -1359,8 +1356,8 @@ github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/ github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8Kw= github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= -github.com/parquet-go/parquet-go v0.24.0 h1:VrsifmLPDnas8zpoHmYiWDZ1YHzLmc7NmNwPGkI2JM4= -github.com/parquet-go/parquet-go v0.24.0/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw= +github.com/parquet-go/parquet-go v0.25.1 h1:l7jJwNM0xrk0cnIIptWMtnSnuxRkwq53S+Po3KG8Xgo= +github.com/parquet-go/parquet-go v0.25.1/go.mod h1:AXBuotO1XiBtcqJb/FKFyjBG4aqa3aQAAWF3ZPzCanY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 3438af61a..1c53129f4 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -4,6 +4,10 @@ import ( "context" "encoding/binary" "fmt" + "io" + "math" + "strings" + "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" @@ -13,9 +17,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "io" - "math" - "strings" ) var ( @@ -42,10 +43,6 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() - parquetSchema, err := schema.ToParquetSchema(t.Name, recordType) - if err != nil { - return nil - } parquetLevels, err := schema.ToParquetLevels(recordType) if err != nil { return nil @@ -61,11 +58,12 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) // create parquet reader - parquetReader := parquet.NewReader(readerAt, parquetSchema) + parquetReader := parquet.NewReader(readerAt) rows := make([]parquet.Row, 128) for { rowCount, readErr := parquetReader.ReadRows(rows) + // Process the rows first, even if EOF is returned for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue @@ -99,12 +97,16 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic } } + // Check for end conditions after processing rows if readErr != nil { if readErr == io.EOF { return processedTsNs, nil } return processedTsNs, readErr } + if rowCount == 0 { + return processedTsNs, nil + } } return } diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go index f7ab26860..b7ecdcfc7 100644 --- a/weed/mq/schema/write_parquet_test.go +++ b/weed/mq/schema/write_parquet_test.go @@ -2,12 +2,13 @@ package schema import ( "fmt" - "github.com/parquet-go/parquet-go" - "github.com/parquet-go/parquet-go/compress/zstd" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "io" "os" "testing" + + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) func TestWriteReadParquet(t *testing.T) { @@ -125,16 +126,25 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque t.Fatalf("os.Open failed: %v", err) } defer file.Close() - reader := parquet.NewReader(file, parquetSchema) + + // Get file info to determine size + fileInfo, err := file.Stat() + if err != nil { + t.Fatalf("file.Stat failed: %v", err) + } + + // Create a parquet file from the opened file + parquetFile, err := parquet.OpenFile(file, fileInfo.Size()) + if err != nil { + t.Fatalf("parquet.OpenFile failed: %v", err) + } + + reader := parquet.NewReader(parquetFile) rows := make([]parquet.Row, 128) for { rowCount, err := reader.ReadRows(rows) - if err != nil { - if err == io.EOF { - break - } - t.Fatalf("reader.Read failed: %v", err) - } + + // Process the rows first, even if EOF is returned for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue @@ -147,6 +157,17 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque } } total += rowCount + + // Check for end conditions after processing rows + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("reader.Read failed: %v", err) + } + if rowCount == 0 { + break + } } fmt.Printf("total: %v\n", total) return