|
@ -58,10 +58,9 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi |
|
|
if err := stream.Send(&mq_pb.PublishFollowMeResponse{ |
|
|
if err := stream.Send(&mq_pb.PublishFollowMeResponse{ |
|
|
AckTsNs: dataMessage.TsNs, |
|
|
AckTsNs: dataMessage.TsNs, |
|
|
}); err != nil { |
|
|
}); err != nil { |
|
|
// TODO save un-acked messages to disk
|
|
|
|
|
|
glog.Errorf("Error sending response %v: %v", dataMessage, err) |
|
|
glog.Errorf("Error sending response %v: %v", dataMessage, err) |
|
|
} |
|
|
} |
|
|
println("ack", string(dataMessage.Key), dataMessage.TsNs) |
|
|
|
|
|
|
|
|
// println("ack", string(dataMessage.Key), dataMessage.TsNs)
|
|
|
} else if closeMessage := req.GetClose(); closeMessage != nil { |
|
|
} else if closeMessage := req.GetClose(); closeMessage != nil { |
|
|
glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) |
|
|
glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) |
|
|
break |
|
|
break |
|
@ -74,7 +73,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi |
|
|
for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() { |
|
|
for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() { |
|
|
if mem.stopTime.UnixNano() <= flushMessage.TsNs { |
|
|
if mem.stopTime.UnixNano() <= flushMessage.TsNs { |
|
|
inMemoryBuffers.Dequeue() |
|
|
inMemoryBuffers.Dequeue() |
|
|
println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf)) |
|
|
|
|
|
|
|
|
// println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
|
|
|
} else { |
|
|
} else { |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
@ -117,8 +116,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi |
|
|
|
|
|
|
|
|
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) |
|
|
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) |
|
|
|
|
|
|
|
|
// TODO append block with more metadata
|
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
|
if err := b.appendToFile(targetFile, mem.buf); err != nil { |
|
|
if err := b.appendToFile(targetFile, mem.buf); err != nil { |
|
|
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) |
|
|
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) |
|
|