|
|
@ -151,7 +151,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s |
|
|
|
continue |
|
|
|
} |
|
|
|
switch m := resp.Message.(type) { |
|
|
|
case *mq_pb.SubscribeResponse_Data: |
|
|
|
case *mq_pb.SubscribeMessageResponse_Data: |
|
|
|
shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) |
|
|
|
if processErr != nil { |
|
|
|
return fmt.Errorf("process error: %v", processErr) |
|
|
@ -160,7 +160,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s |
|
|
|
if !shouldContinue { |
|
|
|
return nil |
|
|
|
} |
|
|
|
case *mq_pb.SubscribeResponse_Ctrl: |
|
|
|
case *mq_pb.SubscribeMessageResponse_Ctrl: |
|
|
|
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { |
|
|
|
return io.EOF |
|
|
|
} |
|
|
|