diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 1ec24f406..6cb18c574 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -22,11 +22,7 @@ func main() { Filter: "", } - subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig) - if err := subscriber.Connect("localhost:17777"); err != nil { - fmt.Println(err) - return - } + subscriber := sub_client.NewTopicSubscriber("localhost:17777", subscriberConfig, contentConfig) subscriber.SetEachMessageFunc(func(key, value []byte) bool { println(string(key), "=>", string(value)) diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go new file mode 100644 index 000000000..7717a101f --- /dev/null +++ b/weed/mq/client/sub_client/process.go @@ -0,0 +1,81 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" +) + +func (sub *TopicSubscriber) doProcess() error { + var wg sync.WaitGroup + for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { + brokerAddress := brokerPartitionAssignment.LeaderBroker + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ + Message: &mq_pb.SubscribeRequest_Init{ + Init: &mq_pb.SubscribeRequest_InitMessage{ + ConsumerGroup: sub.SubscriberConfig.GroupId, + ConsumerId: sub.SubscriberConfig.GroupInstanceId, + Topic: &mq_pb.Topic{ + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, + }, + Partition: &mq_pb.Partition{ + RingSize: brokerPartitionAssignment.Partition.RingSize, + RangeStart: brokerPartitionAssignment.Partition.RangeStart, + RangeStop: brokerPartitionAssignment.Partition.RangeStop, + }, + Filter: sub.ContentConfig.Filter, + }, + }, + }) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + wg.Add(1) + go func() { + defer wg.Done() + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() + } + defer func() { + subscribeClient.SendMsg(&mq_pb.SubscribeRequest{ + Message: &mq_pb.SubscribeRequest_Ack{ + Ack: &mq_pb.SubscribeRequest_AckMessage{ + Sequence: 0, + }, + }, + }) + subscribeClient.CloseSend() + }() + for { + resp, err := subscribeClient.Recv() + if err != nil { + fmt.Printf("subscribe error: %v\n", err) + return + } + if resp.Message == nil { + continue + } + switch m := resp.Message.(type) { + case *mq_pb.SubscribeResponse_Data: + if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { + return + } + case *mq_pb.SubscribeResponse_Ctrl: + if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { + return + } + } + } + }() + } + wg.Wait() + return nil +} diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 7830ac29f..bfde6a512 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,72 +1,28 @@ package sub_client import ( - "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "sync" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" ) // Subscribe subscribes to a topic's specified partitions. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. func (sub *TopicSubscriber) Subscribe() error { - var wg sync.WaitGroup - for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { - brokerAddress := brokerPartitionAssignment.LeaderBroker - grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) - if err != nil { - return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + util.RetryUntil("subscribe", func() error { + if err := sub.doLookup(sub.bootstrapBroker); err != nil { + return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } - brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ - Message: &mq_pb.SubscribeRequest_Init{ - Init: &mq_pb.SubscribeRequest_InitMessage{ - ConsumerGroup: sub.SubscriberConfig.GroupId, - ConsumerId: sub.SubscriberConfig.GroupInstanceId, - Topic: &mq_pb.Topic{ - Namespace: sub.ContentConfig.Namespace, - Name: sub.ContentConfig.Topic, - }, - Partition: &mq_pb.Partition{ - RingSize: brokerPartitionAssignment.Partition.RingSize, - RangeStart: brokerPartitionAssignment.Partition.RangeStart, - RangeStop: brokerPartitionAssignment.Partition.RangeStop, - }, - Filter: sub.ContentConfig.Filter, - }, - }, - }) - if err != nil { - return fmt.Errorf("create subscribe client: %v", err) + if err := sub.doProcess(); err != nil { + return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } - wg.Add(1) - go func() { - defer wg.Done() - if sub.OnCompletionFunc != nil { - defer sub.OnCompletionFunc() - } - for { - resp, err := subscribeClient.Recv() - if err != nil { - fmt.Printf("subscribe error: %v\n", err) - return - } - if resp.Message == nil { - continue - } - switch m := resp.Message.(type) { - case *mq_pb.SubscribeResponse_Data: - if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { - return - } - case *mq_pb.SubscribeResponse_Ctrl: - // ignore - } - } - }() - } - wg.Wait() + return nil + }, func(err error) bool { + if err == io.EOF { + return false + } + return true + }) return nil } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 404d05222..f744c6fa2 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -28,22 +28,17 @@ type TopicSubscriber struct { brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc + bootstrapBroker string } -func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ SubscriberConfig: subscriber, ContentConfig: content, + bootstrapBroker: bootstrapBroker, } } -func (sub *TopicSubscriber) Connect(bootstrapBroker string) error { - if err := sub.doLookup(bootstrapBroker); err != nil { - return err - } - return nil -} - func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { sub.OnEachMessageFunc = onEachMessageFn }