|
@ -101,14 +101,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis |
|
|
for { |
|
|
for { |
|
|
ack, err := localTopicPartition.FollowerStream.Recv() |
|
|
ack, err := localTopicPartition.FollowerStream.Recv() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("Error receiving response: %v", err) |
|
|
|
|
|
|
|
|
glog.Errorf("Error receiving follower ack: %v", err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
println("recv ack", ack.AckTsNs) |
|
|
println("recv ack", ack.AckTsNs) |
|
|
if err := stream.Send(&mq_pb.PublishMessageResponse{ |
|
|
if err := stream.Send(&mq_pb.PublishMessageResponse{ |
|
|
AckSequence: ack.AckTsNs, |
|
|
AckSequence: ack.AckTsNs, |
|
|
}); err != nil { |
|
|
}); err != nil { |
|
|
glog.Errorf("Error sending response %v: %v", ack, err) |
|
|
|
|
|
|
|
|
glog.Errorf("Error sending publisher ack %v: %v", ack, err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -139,6 +139,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis |
|
|
localTopicPartition.GrpcConnection.Close() |
|
|
localTopicPartition.GrpcConnection.Close() |
|
|
} |
|
|
} |
|
|
b.localTopicManager.RemoveTopicPartition(t, p) |
|
|
b.localTopicManager.RemoveTopicPartition(t, p) |
|
|
|
|
|
glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|