@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"io"
"time"
"time"
)
)
@ -119,11 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
}
partitionOffsetChan := make ( chan int64 , 1024 )
partitionOffsetChan := make ( chan int64 , 1024 )
defer func ( ) {
defer func ( ) {
close ( partitionOffsetChan )
close ( partitionOffsetChan )
} ( )
} ( )
concurrentPartitionLimit := int ( sub . ProcessorConfig . ConcurrentPartitionLimit )
if concurrentPartitionLimit <= 0 {
concurrentPartitionLimit = 1
}
executors := util . NewLimitedConcurrentExecutor ( concurrentPartitionLimit )
go func ( ) {
go func ( ) {
for ack := range partitionOffsetChan {
for ack := range partitionOffsetChan {
subscribeClient . SendMsg ( & mq_pb . SubscribeMessageRequest {
subscribeClient . SendMsg ( & mq_pb . SubscribeMessageRequest {
@ -137,7 +143,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient . CloseSend ( )
subscribeClient . CloseSend ( )
} ( )
} ( )
for {
var lastErr error
for lastErr != nil {
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp , err := subscribeClient . Recv ( )
resp , err := subscribeClient . Recv ( )
if err != nil {
if err != nil {
@ -149,14 +157,14 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
}
switch m := resp . Message . ( type ) {
switch m := resp . Message . ( type ) {
case * mq_pb . SubscribeMessageResponse_Data :
case * mq_pb . SubscribeMessageResponse_Data :
shouldContinue , processErr := sub . OnEachMessageFunc ( m . Data . Key , m . Data . Value )
if processErr != nil {
return fmt . Errorf ( "process error: %v" , processErr )
}
partitionOffsetChan <- m . Data . TsNs
if ! shouldContinue {
return nil
}
executors . Execute ( func ( ) {
processErr := sub . OnEachMessageFunc ( m . Data . Key , m . Data . Value )
if processErr == nil {
partitionOffsetChan <- m . Data . TsNs
} else {
lastErr = processErr
}
} )
case * mq_pb . SubscribeMessageResponse_Ctrl :
case * mq_pb . SubscribeMessageResponse_Ctrl :
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m . Ctrl . IsEndOfStream || m . Ctrl . IsEndOfTopic {
if m . Ctrl . IsEndOfStream || m . Ctrl . IsEndOfTopic {
@ -165,6 +173,6 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
}
}
}
return nil
return lastErr
} )
} )
}
}