diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index e920e73c7..5286c229d 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -44,10 +44,10 @@ func main() { subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) { + subscriber.SetEachMessageFunc(func(key, value []byte) (error) { counter++ println(string(key), "=>", string(value), counter) - return true, nil + return nil }) subscriber.SetCompletionFunc(func() { diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 988e40419..a5f87a3bb 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -70,12 +70,12 @@ func main() { subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) counter := 0 - subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) { + subscriber.SetEachMessageFunc(func(key, value []byte) (error) { counter++ record := &schema_pb.RecordValue{} proto.Unmarshal(value, record) fmt.Printf("record: %v\n", record) - return true, nil + return nil }) subscriber.SetCompletionFunc(func() { diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 8ee34d65a..e26997b08 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "time" ) @@ -119,11 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } partitionOffsetChan:= make(chan int64, 1024) - defer func() { close(partitionOffsetChan) }() + concurrentPartitionLimit := int(sub.ProcessorConfig.ConcurrentPartitionLimit) + if concurrentPartitionLimit <= 0 { + concurrentPartitionLimit = 1 + } + executors := util.NewLimitedConcurrentExecutor(concurrentPartitionLimit) + go func() { for ack := range partitionOffsetChan { subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ @@ -137,7 +143,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.CloseSend() }() - for { + var lastErr error + + for lastErr != nil { // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { @@ -149,14 +157,14 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: - shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) - if processErr != nil { - return fmt.Errorf("process error: %v", processErr) - } - partitionOffsetChan <- m.Data.TsNs - if !shouldContinue { - return nil - } + executors.Execute(func() { + processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) + if processErr == nil { + partitionOffsetChan <- m.Data.TsNs + }else{ + lastErr = processErr + } + }) case *mq_pb.SubscribeMessageResponse_Ctrl: // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { @@ -165,6 +173,6 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } } - return nil + return lastErr }) }