Browse Source

add publisher shutdown

pull/4855/head
chrislu 1 year ago
parent
commit
39941edc0b
  1. 6
      weed/mq/broker/broker_grpc_pub.go
  2. 1
      weed/mq/client/cmd/weed_pub/publisher.go
  3. 6
      weed/mq/client/pub_client/lookup.go
  4. 14
      weed/mq/client/pub_client/publisher.go

6
weed/mq/broker/broker_grpc_pub.go

@ -104,10 +104,6 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
respChan := make(chan *mq_pb.PublishResponse, 128) respChan := make(chan *mq_pb.PublishResponse, 128)
defer func() { defer func() {
atomic.StoreInt32(&isStopping, 1) atomic.StoreInt32(&isStopping, 1)
response := &mq_pb.PublishResponse{
Error: "end of stream",
}
respChan <- response
close(respChan) close(respChan)
}() }()
go func() { go func() {
@ -117,7 +113,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
case resp := <-respChan: case resp := <-respChan:
if resp != nil { if resp != nil {
if err := stream.Send(resp); err != nil { if err := stream.Send(resp); err != nil {
glog.Errorf("Error sending setup response: %v", err)
glog.Errorf("Error sending response %v: %v", resp, err)
} }
} else { } else {
return return

1
weed/mq/client/cmd/weed_pub/publisher.go

@ -51,6 +51,7 @@ func main() {
// Wait for all publishers to finish // Wait for all publishers to finish
wg.Wait() wg.Wait()
elapsed := time.Since(startTime) elapsed := time.Since(startTime)
publisher.Shutdown()
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())

6
weed/mq/client/pub_client/lookup.go

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
func (p *TopicPublisher) doLookup(brokerAddress string) error { func (p *TopicPublisher) doLookup(brokerAddress string) error {
@ -100,6 +102,10 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
for { for {
_, err := publishClient.Recv() _, err := publishClient.Recv()
if err != nil { if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
return
}
publishClient.Err = err publishClient.Err = err
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return return

14
weed/mq/client/pub_client/publisher.go

@ -2,10 +2,12 @@ package pub_client
import ( import (
"github.com/rdleal/intervalst/interval" "github.com/rdleal/intervalst/interval"
"github.com/seaweedfs/seaweedfs/weed/mq/broker"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"sync" "sync"
"time"
) )
type PublisherConfiguration struct { type PublisherConfiguration struct {
@ -41,3 +43,15 @@ func (p *TopicPublisher) Connect(bootstrapBroker string) error {
} }
return nil return nil
} }
func (p *TopicPublisher) Shutdown() error {
if clients, found := p.partition2Broker.AllIntersections(0, broker.MaxPartitionCount); found {
for _, client := range clients {
client.CloseSend()
}
}
time.Sleep(1100 * time.Millisecond)
return nil
}
Loading…
Cancel
Save