diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index cdf652294..44d9deed6 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -44,9 +44,9 @@ type MessageQueueBroker struct { currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager PubBalancer *pub_balancer.PubBalancer - lockAsBalancer *cluster.LiveLock - SubCoordinator *sub_coordinator.SubCoordinator - accessLock sync.Mutex + lockAsBalancer *cluster.LiveLock + SubCoordinator *sub_coordinator.SubCoordinator + accessLock sync.Mutex fca *sub_coordinator.FilerClientAccessor } diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 792794a4a..902e7ed1b 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -14,10 +14,10 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 262da6c4d..674c881ba 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -16,10 +16,10 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 8d5bf2044..6ee447085 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -65,7 +65,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig select { case <-stopCh: break - case ack := <- partitionOffsetChan: + case ack := <-partitionOffsetChan: subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 129392e44..922593b77 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -27,16 +27,16 @@ type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { - SubscriberConfig *SubscriberConfiguration - ContentConfig *ContentConfiguration + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest OnEachMessageFunc OnEachMessageFunc - OnCompletionFunc OnCompletionFunc - bootstrapBrokers []string - waitForMoreMessage bool - activeProcessors map[topic.Partition]*ProcessorState - activeProcessorsLock sync.Mutex + OnCompletionFunc OnCompletionFunc + bootstrapBrokers []string + waitForMoreMessage bool + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {