Browse Source

Merge branch 'master' into list_recursive_prefixed_entries

pull/5580/head
Konstantin Lebedev 8 months ago
committed by GitHub
parent
commit
5b642a34fe
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 24
      go.mod
  2. 52
      go.sum
  3. 2
      weed/mount/page_writer/page_chunk_mem.go
  4. 2
      weed/mount/page_writer/page_chunk_swapfile.go
  5. 1
      weed/mq/broker/broker_grpc_configure.go
  6. 5
      weed/mq/broker/broker_grpc_pub.go
  7. 9
      weed/mq/broker/broker_grpc_pub_follow.go
  8. 2
      weed/mq/broker/broker_topic_partition_read_write.go
  9. 2
      weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
  10. 2
      weed/mq/client/cmd/weed_pub_record/publisher_record.go
  11. 2
      weed/mq/client/pub_client/publish.go
  12. 2
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  13. 6
      weed/mq/pub_balancer/broker_stats.go
  14. 6
      weed/mq/schema/schema.go
  15. 6
      weed/mq/schema/schema_builder.go
  16. 40
      weed/mq/schema/schema_test.go
  17. 4
      weed/mq/schema/struct_to_schema_test.go
  18. 16
      weed/mq/schema/to_parquet_levels.go
  19. 10
      weed/mq/schema/to_parquet_levels_test.go
  20. 1
      weed/mq/schema/to_parquet_schema.go
  21. 2
      weed/mq/schema/to_parquet_value.go
  22. 16
      weed/mq/schema/to_schema_value.go
  23. 8
      weed/mq/schema/write_parquet_test.go
  24. 16
      weed/mq/topic/local_partition.go
  25. 3
      weed/shell/command_fs_meta_load.go
  26. 2
      weed/shell/command_volume_balance_test.go
  27. 19
      weed/topology/volume_growth.go
  28. 2
      weed/topology/volume_layout.go
  29. 4
      weed/util/log_buffer/log_buffer_test.go

24
go.mod

@ -3,13 +3,13 @@ module github.com/seaweedfs/seaweedfs
go 1.22.0 go 1.22.0
require ( require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go v0.113.0 // indirect
cloud.google.com/go/pubsub v1.38.0 cloud.google.com/go/pubsub v1.38.0
cloud.google.com/go/storage v1.40.0
cloud.google.com/go/storage v1.41.0
github.com/Azure/azure-pipeline-go v0.2.3 github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.15.0 github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Shopify/sarama v1.38.1 github.com/Shopify/sarama v1.38.1
github.com/aws/aws-sdk-go v1.51.30
github.com/aws/aws-sdk-go v1.53.5
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bwmarrin/snowflake v0.3.0 github.com/bwmarrin/snowflake v0.3.0
github.com/cenkalti/backoff/v4 v4.3.0 github.com/cenkalti/backoff/v4 v4.3.0
@ -41,7 +41,7 @@ require (
github.com/google/btree v1.1.2 github.com/google/btree v1.1.2
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/google/wire v0.6.0 // indirect github.com/google/wire v0.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/googleapis/gax-go/v2 v2.12.4 // indirect
github.com/gorilla/mux v1.8.1 github.com/gorilla/mux v1.8.1
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
@ -108,15 +108,15 @@ require (
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3
golang.org/x/image v0.16.0 golang.org/x/image v0.16.0
golang.org/x/net v0.25.0 golang.org/x/net v0.25.0
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.20.0 golang.org/x/sys v0.20.0
golang.org/x/text v0.15.0 // indirect golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 golang.org/x/tools v0.21.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.177.0
google.golang.org/api v0.181.0
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.63.2 google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.34.0
google.golang.org/protobuf v1.34.1
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
modernc.org/b v1.0.0 // indirect modernc.org/b v1.0.0 // indirect
modernc.org/libc v1.49.3 // indirect modernc.org/libc v1.49.3 // indirect
@ -131,7 +131,7 @@ require (
github.com/Jille/raft-grpc-transport v1.5.0 github.com/Jille/raft-grpc-transport v1.5.0
github.com/arangodb/go-driver v1.6.2 github.com/arangodb/go-driver v1.6.2
github.com/armon/go-metrics v0.4.1 github.com/armon/go-metrics v0.4.1
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2 v1.27.0
github.com/aws/aws-sdk-go-v2/config v1.27.11 github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 github.com/aws/aws-sdk-go-v2/credentials v1.17.11
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.2 github.com/aws/aws-sdk-go-v2/service/s3 v1.53.2
@ -159,10 +159,10 @@ require (
) )
require ( require (
cloud.google.com/go/auth v0.3.0 // indirect
cloud.google.com/go/auth v0.4.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
filippo.io/edwards25519 v1.1.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect
@ -329,8 +329,8 @@ require (
golang.org/x/sync v0.7.0 // indirect golang.org/x/sync v0.7.0 // indirect
golang.org/x/term v0.20.0 // indirect golang.org/x/term v0.20.0 // indirect
golang.org/x/time v0.5.0 // indirect golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/validator.v2 v2.0.1 // indirect gopkg.in/validator.v2 v2.0.1 // indirect

52
go.sum

@ -14,10 +14,10 @@ cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZ
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.63.0/go.mod h1:GmezbQc7T2snqkEXWfZ0sy0VfkB/ivI2DdtJL2DEmlg= cloud.google.com/go v0.63.0/go.mod h1:GmezbQc7T2snqkEXWfZ0sy0VfkB/ivI2DdtJL2DEmlg=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.112.2 h1:ZaGT6LiG7dBzi6zNOvVZwacaXlmf3lRqnC4DQzqyRQw=
cloud.google.com/go v0.112.2/go.mod h1:iEqjp//KquGIJV/m+Pk3xecgKNhV+ry+vVTsy4TbDms=
cloud.google.com/go/auth v0.3.0 h1:PRyzEpGfx/Z9e8+lHsbkoUVXD0gnu4MNmm7Gp8TQNIs=
cloud.google.com/go/auth v0.3.0/go.mod h1:lBv6NKTWp8E3LPzmO1TbiiRKc4drLOfHsgmlH9ogv5w=
cloud.google.com/go v0.113.0 h1:g3C70mn3lWfckKBiCVsAshabrDg01pQ0pnX1MNtnMkA=
cloud.google.com/go v0.113.0/go.mod h1:glEqlogERKYeePz6ZdkcLJ28Q2I6aERgDDErBg9GzO8=
cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg=
cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro=
cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4=
cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
@ -30,8 +30,8 @@ cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2Qx
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA=
cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0=
cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE=
cloud.google.com/go/kms v1.15.8 h1:szIeDCowID8th2i8XE4uRev5PMxQFqW+JjwYxL9h6xs= cloud.google.com/go/kms v1.15.8 h1:szIeDCowID8th2i8XE4uRev5PMxQFqW+JjwYxL9h6xs=
cloud.google.com/go/kms v1.15.8/go.mod h1:WoUHcDjD9pluCg7pNds131awnH429QGvRM3N/4MyoVs= cloud.google.com/go/kms v1.15.8/go.mod h1:WoUHcDjD9pluCg7pNds131awnH429QGvRM3N/4MyoVs=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
@ -45,8 +45,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw=
cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g=
cloud.google.com/go/storage v1.41.0 h1:RusiwatSu6lHeEXe3kglxakAmAbfV+rhtPqA6i8RBx0=
cloud.google.com/go/storage v1.41.0/go.mod h1:J1WCa/Z2FcgdEDuPUY8DxT5I+d9mFKsCepp5vR6Sq80=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
@ -140,10 +140,10 @@ github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQh
github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/aws/aws-sdk-go v1.51.30 h1:RVFkjn9P0JMwnuZCVH0TlV5k9zepHzlbc4943eZMhGw=
github.com/aws/aws-sdk-go v1.51.30/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go v1.53.5 h1:1OcVWMjGlwt7EU5OWmmEEXqaYfmX581EK317QJZXItM=
github.com/aws/aws-sdk-go v1.53.5/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo=
github.com/aws/aws-sdk-go-v2 v1.27.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg=
github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA=
@ -464,8 +464,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc=
github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
@ -490,8 +490,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA=
github.com/googleapis/gax-go/v2 v2.12.3/go.mod h1:AKloxT6GtNbaLm8QTNSidHUVsHYcBHwWRvkNFJUQcS4=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
@ -1176,8 +1176,8 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.19.0 h1:9+E/EZBCbTLNrbN35fHv/a/d/mOBatymz1zbtQrXpIg=
golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc8=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -1385,8 +1385,8 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.177.0 h1:8a0p/BbPa65GlqGWtUKxot4p0TV8OGOfyTjtmkXNXmk=
google.golang.org/api v0.177.0/go.mod h1:srbhue4MLjkjbkux5p3dw/ocYOSZTaIEvf7bCOnFQDw=
google.golang.org/api v0.181.0 h1:rPdjwnWgiPPOJx3IcSAQ2III5aX5tCer6wMpa/xmZi4=
google.golang.org/api v0.181.0/go.mod h1:MnQ+M0CFsfUwA5beZ+g/vCBCPXvtmZwRz2qzZk8ih1k=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -1429,10 +1429,10 @@ google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda h1:wu/KJm9KJwpfHWhkkZGohVC6KRrc1oJNr4jwtQMOQXw= google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda h1:wu/KJm9KJwpfHWhkkZGohVC6KRrc1oJNr4jwtQMOQXw=
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda/go.mod h1:g2LLCvCeCSir/JJSWosk19BR4NVxGqHUC6rxIRsd7Aw= google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda/go.mod h1:g2LLCvCeCSir/JJSWosk19BR4NVxGqHUC6rxIRsd7Aw=
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 h1:DTJM0R8LECCgFeUwApvcEJHz85HLagW8uRENYxHh1ww=
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6/go.mod h1:10yRODfgim2/T8csjQsMPgZOMvtytXKTDRzH6HRGzRw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 h1:DujSIu+2tC9Ht0aPNA7jgj23Iq8Ewi5sgkQ++wdvonE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae h1:AH34z6WAGVNkllnKs5raNq3yRq93VnjBG6rpfub/jYk=
google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae/go.mod h1:FfiGhwUm6CJviekPrc0oJ+7h29e+DmWU6UtjX0ZvI7Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 h1:mxSlqyb8ZAHsYDCfiXN1EDdNTdvjUJSLY+OnAUtYNYA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -1475,8 +1475,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.34.0 h1:Qo/qEd2RZPCf2nKuorzksSknv0d3ERwp1vFG38gSmH4=
google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

2
weed/mount/page_writer/page_chunk_mem.go

@ -67,7 +67,7 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
maxStop = max(maxStop, logicStop) maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs { if t.TsNs >= tsNs {
println("read new data1", t.TsNs - tsNs, "ns")
println("read new data1", t.TsNs-tsNs, "ns")
} }
} }
} }

2
weed/mount/page_writer/page_chunk_swapfile.go

@ -137,7 +137,7 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
maxStop = max(maxStop, logicStop) maxStop = max(maxStop, logicStop)
if t.TsNs >= tsNs { if t.TsNs >= tsNs {
println("read new data2", t.TsNs - tsNs, "ns")
println("read new data2", t.TsNs-tsNs, "ns")
} }
} }
} }

1
weed/mq/broker/broker_grpc_configure.go

@ -35,7 +35,6 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
} }
} }
t := topic.FromPbTopic(request.Topic) t := topic.FromPbTopic(request.Topic)
var readErr, assignErr error var readErr, assignErr error
resp, readErr = b.readTopicConfFromFiler(t) resp, readErr = b.readTopicConfFromFiler(t)

5
weed/mq/broker/broker_grpc_pub.go

@ -69,7 +69,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response) return stream.Send(response)
} }
var receivedSequence, acknowledgedSequence int64
var receivedSequence, acknowledgedSequence int64
var isClosed bool var isClosed bool
// start sending ack to publisher // start sending ack to publisher
@ -85,7 +85,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
lastAckTime := time.Now() lastAckTime := time.Now()
for !isClosed { for !isClosed {
receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
acknowledgedSequence = receivedSequence acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{ response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence, AckSequence: acknowledgedSequence,
@ -101,7 +101,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
} }
}() }()
// process each published messages // process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())

9
weed/mq/broker/broker_grpc_pub_follow.go

@ -13,10 +13,11 @@ import (
) )
type memBuffer struct { type memBuffer struct {
buf []byte
startTime time.Time
stopTime time.Time
buf []byte
startTime time.Time
stopTime time.Time
} }
func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) { func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
var req *mq_pb.PublishFollowMeRequest var req *mq_pb.PublishFollowMeRequest
req, err = stream.Recv() req, err = stream.Recv()
@ -84,7 +85,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
} }
} }
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
logBuffer.ShutdownLogBuffer() logBuffer.ShutdownLogBuffer()
@ -97,7 +97,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop) partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
// flush the remaining messages // flush the remaining messages
inMemoryBuffers.CloseInput() inMemoryBuffers.CloseInput()
for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() { for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {

2
weed/mq/broker/broker_topic_partition_read_write.go

@ -45,7 +45,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
b.accessLock.Lock() b.accessLock.Lock()
defer b.accessLock.Unlock() defer b.accessLock.Unlock()
p := topic.FromPbPartition(partition) p := topic.FromPbPartition(partition)
if localPartition:=b.localTopicManager.GetLocalPartition(t, p); localPartition!=nil {
if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
} }

2
weed/mq/client/cmd/weed_pub_kv/publisher_kv.go

@ -16,7 +16,7 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers") concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count") partitionCount = flag.Int("p", 6, "partition count")
clientName = flag.String("client", "c1", "client name")
clientName = flag.String("client", "c1", "client name")
namespace = flag.String("ns", "test", "namespace") namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t") t = flag.String("t", "test", "t")

2
weed/mq/client/cmd/weed_pub_record/publisher_record.go

@ -19,7 +19,7 @@ var (
concurrency = flag.Int("c", 4, "concurrent publishers") concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count") partitionCount = flag.Int("p", 6, "partition count")
clientName = flag.String("client", "c1", "client name")
clientName = flag.String("client", "c1", "client name")
namespace = flag.String("ns", "test", "namespace") namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t") t = flag.String("t", "test", "t")

2
weed/mq/client/pub_client/publish.go

@ -48,7 +48,7 @@ func (p *TopicPublisher) FinishPublish() error {
if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, inputBuffer := range inputBuffers { for _, inputBuffer := range inputBuffers {
inputBuffer.Enqueue(&mq_pb.DataMessage{ inputBuffer.Enqueue(&mq_pb.DataMessage{
TsNs: time.Now().UnixNano(),
TsNs: time.Now().UnixNano(),
Ctrl: &mq_pb.ControlMessage{ Ctrl: &mq_pb.ControlMessage{
IsClose: true, IsClose: true,
}, },

2
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -122,7 +122,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
StartTsNs: sub.alreadyProcessedTsNs, StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
}, },
Filter: sub.ContentConfig.Filter,
Filter: sub.ContentConfig.Filter,
FollowerBrokers: assigned.FollowerBrokers, FollowerBrokers: assigned.FollowerBrokers,
}, },
}, },

6
weed/mq/pub_balancer/broker_stats.go

@ -17,7 +17,7 @@ type BrokerStats struct {
} }
type TopicPartitionStats struct { type TopicPartitionStats struct {
topic.TopicPartition topic.TopicPartition
PublisherCount int32
PublisherCount int32
SubscriberCount int32 SubscriberCount int32
} }
@ -48,7 +48,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
}, },
}, },
PublisherCount: topicPartitionStats.PublisherCount,
PublisherCount: topicPartitionStats.PublisherCount,
SubscriberCount: topicPartitionStats.SubscriberCount, SubscriberCount: topicPartitionStats.SubscriberCount,
} }
publisherCount += topicPartitionStats.PublisherCount publisherCount += topicPartitionStats.PublisherCount
@ -76,7 +76,7 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
UnixTimeNs: partition.UnixTimeNs, UnixTimeNs: partition.UnixTimeNs,
}, },
}, },
PublisherCount: 0,
PublisherCount: 0,
SubscriberCount: 0, SubscriberCount: 0,
} }
key := tps.TopicPartition.String() key := tps.TopicPartition.String()

6
weed/mq/schema/schema.go

@ -6,17 +6,17 @@ import (
type Schema struct { type Schema struct {
RecordType *schema_pb.RecordType RecordType *schema_pb.RecordType
fieldMap map[string]*schema_pb.Field
fieldMap map[string]*schema_pb.Field
} }
func NewSchema(recordType *schema_pb.RecordType) (*Schema, error) { func NewSchema(recordType *schema_pb.RecordType) (*Schema, error) {
fieldMap := make( map[string]*schema_pb.Field)
fieldMap := make(map[string]*schema_pb.Field)
for _, field := range recordType.Fields { for _, field := range recordType.Fields {
fieldMap[field.Name] = field fieldMap[field.Name] = field
} }
return &Schema{ return &Schema{
RecordType: recordType, RecordType: recordType,
fieldMap: fieldMap,
fieldMap: fieldMap,
}, nil }, nil
} }

6
weed/mq/schema/schema_builder.go

@ -8,9 +8,9 @@ import (
var ( var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}} TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}} TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}} TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}} TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
) )

40
weed/mq/schema/schema_test.go

@ -32,10 +32,10 @@ func TestEnumScalarType(t *testing.T) {
func TestField(t *testing.T) { func TestField(t *testing.T) {
field := &Field{ field := &Field{
Name: "field_name",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_name",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
} }
assert.NotNil(t, field) assert.NotNil(t, field)
} }
@ -44,32 +44,32 @@ func TestRecordType(t *testing.T) {
subRecord := &RecordType{ subRecord := &RecordType{
Fields: []*Field{ Fields: []*Field{
{ {
Name: "field_1",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_1",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
}, },
{ {
Name: "field_2",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
FieldIndex: 2,
IsRepeated: false,
Name: "field_2",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
FieldIndex: 2,
IsRepeated: false,
}, },
}, },
} }
record := &RecordType{ record := &RecordType{
Fields: []*Field{ Fields: []*Field{
{ {
Name: "field_key",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_key",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
}, },
{ {
Name: "field_record",
Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
FieldIndex: 2,
IsRepeated: false,
Name: "field_record",
Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
FieldIndex: 2,
IsRepeated: false,
}, },
}, },
} }

4
weed/mq/schema/struct_to_schema_test.go

@ -76,7 +76,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin(). RecordTypeBegin().
WithField("Field3", TypeString). WithField("Field3", TypeString).
WithField("Field4", TypeInt32). WithField("Field4", TypeInt32).
RecordTypeEnd(),
RecordTypeEnd(),
). ).
RecordTypeEnd(), RecordTypeEnd(),
}, },
@ -104,7 +104,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin(). RecordTypeBegin().
WithField("Field6", TypeString). WithField("Field6", TypeString).
WithField("Field7", TypeBytes). WithField("Field7", TypeBytes).
RecordTypeEnd(),
RecordTypeEnd(),
).RecordTypeEnd(), ).RecordTypeEnd(),
). ).
RecordTypeEnd(), RecordTypeEnd(),

16
weed/mq/schema/to_parquet_levels.go

@ -7,9 +7,9 @@ import (
type ParquetLevels struct { type ParquetLevels struct {
startColumnIndex int startColumnIndex int
endColumnIndex int
definitionDepth int
levels map[string]*ParquetLevels
endColumnIndex int
definitionDepth int
levels map[string]*ParquetLevels
} }
func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) { func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
@ -19,7 +19,7 @@ func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) { func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
switch fieldType.Kind.(type) { switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType: case *schema_pb.Type_ScalarType:
return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_RecordType: case *schema_pb.Type_RecordType:
return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth) return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_ListType: case *schema_pb.Type_ListType:
@ -35,15 +35,15 @@ func toFieldTypeListLevels(listType *schema_pb.ListType, startColumnIndex, defin
func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) { func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
return &ParquetLevels{ return &ParquetLevels{
startColumnIndex: startColumnIndex, startColumnIndex: startColumnIndex,
endColumnIndex: startColumnIndex + 1,
definitionDepth: definitionDepth,
endColumnIndex: startColumnIndex + 1,
definitionDepth: definitionDepth,
}, nil }, nil
} }
func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) { func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
recordTypeLevels := &ParquetLevels{ recordTypeLevels := &ParquetLevels{
startColumnIndex: startColumnIndex, startColumnIndex: startColumnIndex,
definitionDepth: definitionDepth,
levels: make(map[string]*ParquetLevels),
definitionDepth: definitionDepth,
levels: make(map[string]*ParquetLevels),
} }
for _, field := range recordType.Fields { for _, field := range recordType.Fields {
fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1) fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1)

10
weed/mq/schema/to_parquet_levels_test.go

@ -11,9 +11,9 @@ func TestToParquetLevels(t *testing.T) {
recordType *schema_pb.RecordType recordType *schema_pb.RecordType
} }
tests := []struct { tests := []struct {
name string
args args
want *ParquetLevels
name string
args args
want *ParquetLevels
}{ }{
{ {
name: "nested type", name: "nested type",
@ -25,13 +25,13 @@ func TestToParquetLevels(t *testing.T) {
RecordTypeBegin(). RecordTypeBegin().
WithField("zName", TypeString). WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)). WithField("emails", ListOf(TypeString)).
RecordTypeEnd()).
RecordTypeEnd()).
WithField("Company", TypeString). WithField("Company", TypeString).
WithRecordField("Address", WithRecordField("Address",
RecordTypeBegin(). RecordTypeBegin().
WithField("Street", TypeString). WithField("Street", TypeString).
WithField("City", TypeString). WithField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd()).
RecordTypeEnd(), RecordTypeEnd(),
}, },
want: &ParquetLevels{ want: &ParquetLevels{

1
weed/mq/schema/to_parquet_schema.go

@ -31,7 +31,6 @@ func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err e
return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind) return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
} }
return dataType, err return dataType, err
} }

2
weed/mq/schema/to_parquet_value.go

@ -70,7 +70,7 @@ func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *
return return
} }
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
switch value.Kind.(type) { switch value.Kind.(type) {
case *schema_pb.Value_BoolValue: case *schema_pb.Value_BoolValue:
return parquet.BooleanValue(value.GetBoolValue()), nil return parquet.BooleanValue(value.GetBoolValue()), nil

16
weed/mq/schema/to_schema_value.go

@ -47,7 +47,7 @@ func toRecordValue(recordType *schema_pb.RecordType, levels *ParquetLevels, valu
func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) { func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) {
listValues := make([]*schema_pb.Value, 0) listValues := make([]*schema_pb.Value, 0)
var value *schema_pb.Value var value *schema_pb.Value
for ;valueIndex < len(values); {
for valueIndex < len(values) {
if values[valueIndex].Column() != levels.startColumnIndex { if values[valueIndex].Column() != levels.startColumnIndex {
break break
} }
@ -67,19 +67,19 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
} }
switch scalarType { switch scalarType {
case schema_pb.ScalarType_BOOL: case schema_pb.ScalarType_BOOL:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT32: case schema_pb.ScalarType_INT32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT64: case schema_pb.ScalarType_INT64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex + 1, nil
case schema_pb.ScalarType_FLOAT: case schema_pb.ScalarType_FLOAT:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex + 1, nil
case schema_pb.ScalarType_DOUBLE: case schema_pb.ScalarType_DOUBLE:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
case schema_pb.ScalarType_BYTES: case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex + 1, nil
case schema_pb.ScalarType_STRING: case schema_pb.ScalarType_STRING:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex + 1, nil
} }
return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType) return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
} }

8
weed/mq/schema/write_parquet_test.go

@ -19,13 +19,13 @@ func TestWriteReadParquet(t *testing.T) {
RecordTypeBegin(). RecordTypeBegin().
WithField("zName", TypeString). WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)). WithField("emails", ListOf(TypeString)).
RecordTypeEnd()).
RecordTypeEnd()).
WithField("Company", TypeString). WithField("Company", TypeString).
WithRecordField("Address", WithRecordField("Address",
RecordTypeBegin(). RecordTypeBegin().
WithField("Street", TypeString). WithField("Street", TypeString).
WithField("City", TypeString). WithField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd()).
RecordTypeEnd() RecordTypeEnd()
fmt.Printf("RecordType: %v\n", recordType) fmt.Printf("RecordType: %v\n", recordType)
@ -85,9 +85,9 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
fmt.Sprintf("john_%d@c.com", i), fmt.Sprintf("john_%d@c.com", i),
fmt.Sprintf("john_%d@d.com", i), fmt.Sprintf("john_%d@d.com", i),
fmt.Sprintf("john_%d@e.com", i)). fmt.Sprintf("john_%d@e.com", i)).
RecordEnd()).
RecordEnd()).
SetString("Company", fmt.Sprintf("company_%d", i)). SetString("Company", fmt.Sprintf("company_%d", i)).
RecordEnd()
RecordEnd()
AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue) AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
if count < 10 { if count < 10 {

16
weed/mq/topic/local_partition.go

@ -16,17 +16,17 @@ import (
) )
type LocalPartition struct { type LocalPartition struct {
ListenersWaits int64
AckTsNs int64
ListenersWaits int64
AckTsNs int64
// notifying clients // notifying clients
ListenersLock sync.Mutex ListenersLock sync.Mutex
ListenersCond *sync.Cond ListenersCond *sync.Cond
Partition Partition
LogBuffer *log_buffer.LogBuffer
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
LogBuffer *log_buffer.LogBuffer
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn followerGrpcConnection *grpc.ClientConn
@ -37,7 +37,7 @@ var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
lp := &LocalPartition{ lp := &LocalPartition{
Partition: partition,
Partition: partition,
Publishers: NewLocalPartitionPublishers(), Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(), Subscribers: NewLocalPartitionSubscribers(),
} }
@ -155,8 +155,8 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{ Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
Partition: initMessage.Partition,
Topic: initMessage.Topic,
Partition: initMessage.Partition,
}, },
}, },
}); err != nil { }); err != nil {

3
weed/shell/command_fs_meta_load.go

@ -32,7 +32,8 @@ func (c *commandFsMetaLoad) Help() string {
fs.meta.load <filer_host>-<port>-<time>.meta fs.meta.load <filer_host>-<port>-<time>.meta
fs.meta.load -v=false <filer_host>-<port>-<time>.meta // skip printing out the verbose output fs.meta.load -v=false <filer_host>-<port>-<time>.meta // skip printing out the verbose output
fs.meta.load -dirPrefix=/buckets/important* <filer_host>.meta // load any dirs with prefix "important"
fs.meta.load -concurrency=1 <filer_host>-<port>-<time>.meta // number of parallel meta load to filer
fs.meta.load -dirPrefix=/buckets/important <filer_host>.meta // load any dirs with prefix "important"
` `
} }

2
weed/shell/command_volume_balance_test.go

@ -278,7 +278,7 @@ func TestDeleteEmptySelection(t *testing.T) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
if v.Size <= super_block.SuperBlockSize && v.ModifiedAtSecond > 0 {
fmt.Printf("empty volume %d from %s\n", v.Id, dn.Id) fmt.Printf("empty volume %d from %s\n", v.Id, dn.Id)
} }
} }

19
weed/topology/volume_growth.go

@ -31,20 +31,20 @@ type VolumeGrowRequest struct {
} }
type volumeGrowthStrategy struct { type volumeGrowthStrategy struct {
Copy1Count int
Copy2Count int
Copy3Count int
Copy1Count int
Copy2Count int
Copy3Count int
CopyOtherCount int CopyOtherCount int
Threshold float64
Threshold float64
} }
var ( var (
VolumeGrowStrategy = volumeGrowthStrategy{ VolumeGrowStrategy = volumeGrowthStrategy{
Copy1Count: 7,
Copy2Count: 6,
Copy3Count: 3,
Copy1Count: 7,
Copy2Count: 6,
Copy3Count: 3,
CopyOtherCount: 1, CopyOtherCount: 1,
Threshold: 0.9,
Threshold: 0.9,
} }
) )
@ -77,7 +77,8 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
// given copyCount, how many logical volumes to create // given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
switch copyCount { switch copyCount {
case 1: count = VolumeGrowStrategy.Copy1Count
case 1:
count = VolumeGrowStrategy.Copy1Count
case 2: case 2:
count = VolumeGrowStrategy.Copy2Count count = VolumeGrowStrategy.Copy2Count
case 3: case 3:

2
weed/topology/volume_layout.go

@ -381,7 +381,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, a
} }
active++ active++
info, _ := dn.GetVolumesById(v) info, _ := dn.GetVolumesById(v)
if float64(info.Size) > float64(vl.volumeSizeLimit)* VolumeGrowStrategy.Threshold{
if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
crowded++ crowded++
} }
} }

4
weed/util/log_buffer/log_buffer_test.go

@ -19,7 +19,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
}, nil, func() { }, nil, func() {
}) })
startTime := MessagePosition{Time:time.Now()}
startTime := MessagePosition{Time: time.Now()}
messageSize := 1024 messageSize := 1024
messageCount := 5000 messageCount := 5000
@ -38,7 +38,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
println("processed all messages") println("processed all messages")
return true, io.EOF return true, io.EOF
} }
return false,nil
return false, nil
}) })
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount) fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)

Loading…
Cancel
Save