diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index d8e8faa31..1065309d2 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "github.com/golang/protobuf/proto" @@ -50,23 +49,13 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs defer broker.topicManager.ReleaseLock(tp, false) isConnected := true - var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948 go func() { - lastActiveTime := time.Now().UnixNano() - sleepTime := 1737 * time.Millisecond for isConnected { - time.Sleep(sleepTime) - if lastActiveTime != processedTsNs { - lastActiveTime = processedTsNs - continue - } - streamLock.Lock() - // println("checking connection health to", subscriberId, tp.String()) - if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil { + if _, err := stream.Recv(); err != nil { + println("disconnecting connection to", subscriberId, tp.String()) isConnected = false lock.cond.Signal() } - streamLock.Unlock() } }() @@ -82,11 +71,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // how to process each message // an error returned will end the subscription eachMessageFn := func(m *messaging_pb.Message) error { - streamLock.Lock() err := stream.Send(&messaging_pb.BrokerMessage{ Data: m, }) - streamLock.Unlock() if err != nil { glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) }