|
@ -177,6 +177,19 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs |
|
|
|
|
|
|
|
|
for imt.IsInflight(logEntry.Key) { |
|
|
for imt.IsInflight(logEntry.Key) { |
|
|
time.Sleep(137 * time.Millisecond) |
|
|
time.Sleep(137 * time.Millisecond) |
|
|
|
|
|
// Check if the client has disconnected by monitoring the context
|
|
|
|
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
err := ctx.Err() |
|
|
|
|
|
if err == context.Canceled { |
|
|
|
|
|
// Client disconnected
|
|
|
|
|
|
return false, nil |
|
|
|
|
|
} |
|
|
|
|
|
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) |
|
|
|
|
|
return false, nil |
|
|
|
|
|
default: |
|
|
|
|
|
// Continue processing the request
|
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
if logEntry.Key != nil { |
|
|
if logEntry.Key != nil { |
|
|
imt.EnflightMessage(logEntry.Key, logEntry.TsNs) |
|
|
imt.EnflightMessage(logEntry.Key, logEntry.TsNs) |
|
|