Browse Source

chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1 (#6851)

* chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1

Bumps [github.com/parquet-go/parquet-go](https://github.com/parquet-go/parquet-go) from 0.24.0 to 0.25.1.
- [Release notes](https://github.com/parquet-go/parquet-go/releases)
- [Changelog](https://github.com/parquet-go/parquet-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/parquet-go/parquet-go/compare/v0.24.0...v0.25.1)

---
updated-dependencies:
- dependency-name: github.com/parquet-go/parquet-go
  dependency-version: 0.25.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* adjust to updated API

Fixed Reader Construction: Updated to use parquet.OpenFile() instead of passing io.Reader directly to NewReader()
Fixed EOF Handling: Changed the order of operations to process rows before checking for EOF
Added Zero Row Count Check: Added explicit check for rowCount == 0 as an additional termination condition

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
pull/6909/head
dependabot[bot] 3 months ago
committed by GitHub
parent
commit
5f1d2a9745
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      go.mod
  2. 7
      go.sum
  3. 18
      weed/mq/logstore/read_parquet_to_log.go
  4. 41
      weed/mq/schema/write_parquet_test.go

3
go.mod

@ -139,7 +139,7 @@ require (
github.com/hashicorp/raft-boltdb/v2 v2.3.1 github.com/hashicorp/raft-boltdb/v2 v2.3.1
github.com/minio/crc64nvme v1.0.1 github.com/minio/crc64nvme v1.0.1
github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/parquet-go/parquet-go v0.24.0
github.com/parquet-go/parquet-go v0.25.1
github.com/pkg/sftp v1.13.7 github.com/pkg/sftp v1.13.7
github.com/rabbitmq/amqp091-go v1.10.0 github.com/rabbitmq/amqp091-go v1.10.0
github.com/rclone/rclone v1.69.3 github.com/rclone/rclone v1.69.3
@ -289,7 +289,6 @@ require (
github.com/ncruces/go-strftime v0.1.9 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/ncw/swift/v2 v2.0.3 // indirect github.com/ncw/swift/v2 v2.0.3 // indirect
github.com/nxadm/tail v1.4.11 // indirect github.com/nxadm/tail v1.4.11 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo/v2 v2.19.0 // indirect github.com/onsi/ginkgo/v2 v2.19.0 // indirect
github.com/onsi/gomega v1.34.1 // indirect github.com/onsi/gomega v1.34.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect

7
go.sum

@ -1291,7 +1291,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
@ -1337,8 +1336,6 @@ github.com/ncw/swift/v2 v2.0.3 h1:8R9dmgFIWs+RiVlisCEfiQiik1hjuR0JnOkLxaP9ihg=
github.com/ncw/swift/v2 v2.0.3/go.mod h1:cbAO76/ZwcFrFlHdXPjaqWZ9R7Hdar7HpjRXBfbjigk= github.com/ncw/swift/v2 v2.0.3/go.mod h1:cbAO76/ZwcFrFlHdXPjaqWZ9R7Hdar7HpjRXBfbjigk=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k= github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@ -1359,8 +1356,8 @@ github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8Kw= github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8Kw=
github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/parquet-go/parquet-go v0.24.0 h1:VrsifmLPDnas8zpoHmYiWDZ1YHzLmc7NmNwPGkI2JM4=
github.com/parquet-go/parquet-go v0.24.0/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw=
github.com/parquet-go/parquet-go v0.25.1 h1:l7jJwNM0xrk0cnIIptWMtnSnuxRkwq53S+Po3KG8Xgo=
github.com/parquet-go/parquet-go v0.25.1/go.mod h1:AXBuotO1XiBtcqJb/FKFyjBG4aqa3aQAAWF3ZPzCanY=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

18
weed/mq/logstore/read_parquet_to_log.go

@ -4,6 +4,10 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"math"
"strings"
"github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/schema"
@ -13,9 +17,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"io"
"math"
"strings"
) )
var ( var (
@ -42,10 +43,6 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
RecordTypeEnd() RecordTypeEnd()
parquetSchema, err := schema.ToParquetSchema(t.Name, recordType)
if err != nil {
return nil
}
parquetLevels, err := schema.ToParquetLevels(recordType) parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil { if err != nil {
return nil return nil
@ -61,11 +58,12 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
// create parquet reader // create parquet reader
parquetReader := parquet.NewReader(readerAt, parquetSchema)
parquetReader := parquet.NewReader(readerAt)
rows := make([]parquet.Row, 128) rows := make([]parquet.Row, 128)
for { for {
rowCount, readErr := parquetReader.ReadRows(rows) rowCount, readErr := parquetReader.ReadRows(rows)
// Process the rows first, even if EOF is returned
for i := 0; i < rowCount; i++ { for i := 0; i < rowCount; i++ {
row := rows[i] row := rows[i]
// convert parquet row to schema_pb.RecordValue // convert parquet row to schema_pb.RecordValue
@ -99,12 +97,16 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
} }
} }
// Check for end conditions after processing rows
if readErr != nil { if readErr != nil {
if readErr == io.EOF { if readErr == io.EOF {
return processedTsNs, nil return processedTsNs, nil
} }
return processedTsNs, readErr return processedTsNs, readErr
} }
if rowCount == 0 {
return processedTsNs, nil
}
} }
return return
} }

41
weed/mq/schema/write_parquet_test.go

@ -2,12 +2,13 @@ package schema
import ( import (
"fmt" "fmt"
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"io" "io"
"os" "os"
"testing" "testing"
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
) )
func TestWriteReadParquet(t *testing.T) { func TestWriteReadParquet(t *testing.T) {
@ -125,16 +126,25 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
t.Fatalf("os.Open failed: %v", err) t.Fatalf("os.Open failed: %v", err)
} }
defer file.Close() defer file.Close()
reader := parquet.NewReader(file, parquetSchema)
// Get file info to determine size
fileInfo, err := file.Stat()
if err != nil {
t.Fatalf("file.Stat failed: %v", err)
}
// Create a parquet file from the opened file
parquetFile, err := parquet.OpenFile(file, fileInfo.Size())
if err != nil {
t.Fatalf("parquet.OpenFile failed: %v", err)
}
reader := parquet.NewReader(parquetFile)
rows := make([]parquet.Row, 128) rows := make([]parquet.Row, 128)
for { for {
rowCount, err := reader.ReadRows(rows) rowCount, err := reader.ReadRows(rows)
if err != nil {
if err == io.EOF {
break
}
t.Fatalf("reader.Read failed: %v", err)
}
// Process the rows first, even if EOF is returned
for i := 0; i < rowCount; i++ { for i := 0; i < rowCount; i++ {
row := rows[i] row := rows[i]
// convert parquet row to schema_pb.RecordValue // convert parquet row to schema_pb.RecordValue
@ -147,6 +157,17 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
} }
} }
total += rowCount total += rowCount
// Check for end conditions after processing rows
if err != nil {
if err == io.EOF {
break
}
t.Fatalf("reader.Read failed: %v", err)
}
if rowCount == 0 {
break
}
} }
fmt.Printf("total: %v\n", total) fmt.Printf("total: %v\n", total)
return return

Loading…
Cancel
Save