|
@ -8,7 +8,6 @@ import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
"io" |
|
|
"io" |
|
|
"reflect" |
|
|
|
|
|
"time" |
|
|
"time" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
@ -86,7 +85,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
|
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
|
|
|
|
|
|
|
|
|
// glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
|
|
|
resp, err := subscribeClient.Recv() |
|
|
resp, err := subscribeClient.Recv() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return fmt.Errorf("subscribe recv: %v", err) |
|
|
return fmt.Errorf("subscribe recv: %v", err) |
|
@ -102,7 +101,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
if len(m.Data.Key) == 0 { |
|
|
if len(m.Data.Key) == 0 { |
|
|
fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m)) |
|
|
|
|
|
|
|
|
// fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
|
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
onDataMessageFn(m) |
|
|
onDataMessageFn(m) |
|
|