diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index ee00be9f8..d1f2e7c90 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -36,6 +36,7 @@ func main() { flag.Parse() config := &pub_client.PublisherConfiguration{ CreateTopic: true, + CreateTopicPartitionCount: 1, } publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) brokers := strings.Split(*seedBrokers, ",") diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index afafb15ea..51b8b1f8c 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -99,9 +99,9 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { // connect to the partition broker return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ - Message: &mq_pb.SubscribeRequest_Init{ - Init: &mq_pb.SubscribeRequest_InitMessage{ + subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: &mq_pb.Topic{ @@ -114,7 +114,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s RangeStop: partition.RangeStop, }, Filter: sub.ContentConfig.Filter, - Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ + Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ StartTimestampNs: sub.alreadyProcessedTsNs, }, },