diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index fb0e3a11f..a217489de 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -79,7 +79,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } go func() { defer func() { - println("stop sending ack to publisher", initMessage.PublisherName) + // println("stop sending ack to publisher", initMessage.PublisherName) }() lastAckTime := time.Now() @@ -93,7 +93,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err := stream.Send(response); err != nil { glog.Errorf("Error sending response %v: %v", response, err) } - println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) + // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() } else { time.Sleep(1 * time.Second) diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 358b310bf..57cbbd2d2 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -58,10 +58,9 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi if err := stream.Send(&mq_pb.PublishFollowMeResponse{ AckTsNs: dataMessage.TsNs, }); err != nil { - // TODO save un-acked messages to disk glog.Errorf("Error sending response %v: %v", dataMessage, err) } - println("ack", string(dataMessage.Key), dataMessage.TsNs) + // println("ack", string(dataMessage.Key), dataMessage.TsNs) } else if closeMessage := req.GetClose(); closeMessage != nil { glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) break @@ -74,7 +73,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() { if mem.stopTime.UnixNano() <= flushMessage.TsNs { inMemoryBuffers.Dequeue() - println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf)) + // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf)) } else { break } @@ -117,8 +116,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) - // TODO append block with more metadata - for { if err := b.appendToFile(targetFile, mem.buf); err != nil { glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 6e429e5df..157fa2792 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -56,7 +56,7 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { // maybe send to the follower if p.followerStream != nil { - println("recv", string(message.Key), message.TsNs) + // println("recv", string(message.Key), message.TsNs) if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Data{ Data: message, @@ -166,7 +166,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa // start receiving ack from follower go func() { defer func() { - println("stop receiving ack from follower") + // println("stop receiving ack from follower") }() for { @@ -181,7 +181,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa return } atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) - println("recv ack", ack.AckTsNs) + // println("recv ack", ack.AckTsNs) } }() return nil @@ -234,6 +234,6 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { }); followErr != nil { glog.Errorf("send follower %s flush message: %v", p.follower, followErr) } - println("notifying", p.follower, "flushed at", flushTsNs) + // println("notifying", p.follower, "flushed at", flushTsNs) } }