From 85b53ac510aca494f8a3d18bb15b829971795b15 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 16 May 2020 18:53:54 -0700 Subject: [PATCH] detect disconnected subscribers --- .../broker/broker_grpc_server_subscribe.go | 34 +++++++++++++------ weed/messaging/msgclient/subscriber.go | 18 +++++----- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 9e951ce65..d8e8faa31 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "sync" "time" "github.com/golang/protobuf/proto" @@ -25,39 +26,47 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + var processedTsNs int64 var messageCount int64 subscriberId := in.Init.SubscriberId - fmt.Printf("+ subscriber %s\n", subscriberId) - defer func() { - fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount) - }() // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ // IsTransient: true, } - if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil { - return err - } - // get lock tp := TopicPartition{ Namespace: in.Init.Namespace, Topic: in.Init.Topic, Partition: in.Init.Partition, } + fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) + defer func() { + fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) + }() + lock := broker.topicManager.RequestLock(tp, topicConfig, false) defer broker.topicManager.ReleaseLock(tp, false) isConnected := true + var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948 go func() { + lastActiveTime := time.Now().UnixNano() + sleepTime := 1737 * time.Millisecond for isConnected { - time.Sleep(1737 * time.Millisecond) + time.Sleep(sleepTime) + if lastActiveTime != processedTsNs { + lastActiveTime = processedTsNs + continue + } + streamLock.Lock() + // println("checking connection health to", subscriberId, tp.String()) if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil { isConnected = false lock.cond.Signal() } + streamLock.Unlock() } }() @@ -69,14 +78,15 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: lastReadTime = time.Unix(0, 0) } - var processedTsNs int64 // how to process each message // an error returned will end the subscription eachMessageFn := func(m *messaging_pb.Message) error { + streamLock.Lock() err := stream.Send(&messaging_pb.BrokerMessage{ Data: m, }) + streamLock.Unlock() if err != nil { glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) } @@ -105,7 +115,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { if err != io.EOF { - println("stopping from persisted logs", err.Error()) + // println("stopping from persisted logs", err.Error()) return err } } @@ -114,6 +124,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs lastReadTime = time.Unix(0, processedTsNs) } + // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime) + err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 2e66923e2..f96bba2ec 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -15,7 +15,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId>=0 && i != partitionId { + continue + } tp := broker.TopicPartition{ Namespace: namespace, Topic: topic, @@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti return } - // process init response - _, err = stream.Recv() - if err != nil { - return - } return stream, nil } -func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClients[partition].Recv() + resp, listenErr := subscriberClient.Recv() if listenErr == io.EOF { return nil } @@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { for i := 0; i < len(s.subscriberClients); i++ { - go s.doSubscribe(i, processFn) + if s.subscriberClients[i] != nil { + go doSubscribe(s.subscriberClients[i], processFn) + } } }