Browse Source

simplify disconnected stream detection

pull/1318/head
Chris Lu 5 years ago
parent
commit
f11233cd49
  1. 17
      weed/messaging/broker/broker_grpc_server_subscribe.go

17
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -50,23 +49,13 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
defer broker.topicManager.ReleaseLock(tp, false) defer broker.topicManager.ReleaseLock(tp, false)
isConnected := true isConnected := true
var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948
go func() { go func() {
lastActiveTime := time.Now().UnixNano()
sleepTime := 1737 * time.Millisecond
for isConnected { for isConnected {
time.Sleep(sleepTime)
if lastActiveTime != processedTsNs {
lastActiveTime = processedTsNs
continue
}
streamLock.Lock()
// println("checking connection health to", subscriberId, tp.String())
if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
if _, err := stream.Recv(); err != nil {
println("disconnecting connection to", subscriberId, tp.String())
isConnected = false isConnected = false
lock.cond.Signal() lock.cond.Signal()
} }
streamLock.Unlock()
} }
}() }()
@ -82,11 +71,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// how to process each message // how to process each message
// an error returned will end the subscription // an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error { eachMessageFn := func(m *messaging_pb.Message) error {
streamLock.Lock()
err := stream.Send(&messaging_pb.BrokerMessage{ err := stream.Send(&messaging_pb.BrokerMessage{
Data: m, Data: m,
}) })
streamLock.Unlock()
if err != nil { if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
} }

Loading…
Cancel
Save