|
@ -7,6 +7,7 @@ import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
|
|
|
"sync/atomic" |
|
|
"time" |
|
|
"time" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
@ -99,20 +100,37 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS |
|
|
|
|
|
|
|
|
ackCounter := 0 |
|
|
ackCounter := 0 |
|
|
var ackSequence int64 |
|
|
var ackSequence int64 |
|
|
|
|
|
var isStopping int32 |
|
|
respChan := make(chan *mq_pb.PublishResponse, 128) |
|
|
respChan := make(chan *mq_pb.PublishResponse, 128) |
|
|
defer close(respChan) |
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
|
atomic.StoreInt32(&isStopping, 1) |
|
|
|
|
|
response := &mq_pb.PublishResponse{ |
|
|
|
|
|
Error: "end of stream", |
|
|
|
|
|
} |
|
|
|
|
|
respChan <- response |
|
|
|
|
|
close(respChan) |
|
|
|
|
|
}() |
|
|
go func() { |
|
|
go func() { |
|
|
|
|
|
ticker := time.NewTicker(1 * time.Second) |
|
|
for { |
|
|
for { |
|
|
select { |
|
|
select { |
|
|
case resp := <-respChan: |
|
|
case resp := <-respChan: |
|
|
|
|
|
if resp != nil { |
|
|
if err := stream.Send(resp); err != nil { |
|
|
if err := stream.Send(resp); err != nil { |
|
|
glog.Errorf("Error sending setup response: %v", err) |
|
|
glog.Errorf("Error sending setup response: %v", err) |
|
|
} |
|
|
} |
|
|
case <-time.After(1 * time.Second): |
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
case <-ticker.C: |
|
|
|
|
|
if atomic.LoadInt32(&isStopping) == 0 { |
|
|
response := &mq_pb.PublishResponse{ |
|
|
response := &mq_pb.PublishResponse{ |
|
|
AckSequence: ackSequence, |
|
|
AckSequence: ackSequence, |
|
|
} |
|
|
} |
|
|
respChan <- response |
|
|
respChan <- response |
|
|
|
|
|
} else { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|