Browse Source

detect disconnected subscribers

pull/1318/head
Chris Lu 5 years ago
parent
commit
85b53ac510
  1. 34
      weed/messaging/broker/broker_grpc_server_subscribe.go
  2. 18
      weed/messaging/msgclient/subscriber.go

34
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -25,39 +26,47 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err return err
} }
var processedTsNs int64
var messageCount int64 var messageCount int64
subscriberId := in.Init.SubscriberId subscriberId := in.Init.SubscriberId
fmt.Printf("+ subscriber %s\n", subscriberId)
defer func() {
fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
}()
// TODO look it up // TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{ topicConfig := &messaging_pb.TopicConfiguration{
// IsTransient: true, // IsTransient: true,
} }
if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
return err
}
// get lock // get lock
tp := TopicPartition{ tp := TopicPartition{
Namespace: in.Init.Namespace, Namespace: in.Init.Namespace,
Topic: in.Init.Topic, Topic: in.Init.Topic,
Partition: in.Init.Partition, Partition: in.Init.Partition,
} }
fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
defer func() {
fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
}()
lock := broker.topicManager.RequestLock(tp, topicConfig, false) lock := broker.topicManager.RequestLock(tp, topicConfig, false)
defer broker.topicManager.ReleaseLock(tp, false) defer broker.topicManager.ReleaseLock(tp, false)
isConnected := true isConnected := true
var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948
go func() { go func() {
lastActiveTime := time.Now().UnixNano()
sleepTime := 1737 * time.Millisecond
for isConnected { for isConnected {
time.Sleep(1737 * time.Millisecond)
time.Sleep(sleepTime)
if lastActiveTime != processedTsNs {
lastActiveTime = processedTsNs
continue
}
streamLock.Lock()
// println("checking connection health to", subscriberId, tp.String())
if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil { if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
isConnected = false isConnected = false
lock.cond.Signal() lock.cond.Signal()
} }
streamLock.Unlock()
} }
}() }()
@ -69,14 +78,15 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
lastReadTime = time.Unix(0, 0) lastReadTime = time.Unix(0, 0)
} }
var processedTsNs int64
// how to process each message // how to process each message
// an error returned will end the subscription // an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error { eachMessageFn := func(m *messaging_pb.Message) error {
streamLock.Lock()
err := stream.Send(&messaging_pb.BrokerMessage{ err := stream.Send(&messaging_pb.BrokerMessage{
Data: m, Data: m,
}) })
streamLock.Unlock()
if err != nil { if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
} }
@ -105,7 +115,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
if err != io.EOF { if err != io.EOF {
println("stopping from persisted logs", err.Error())
// println("stopping from persisted logs", err.Error())
return err return err
} }
} }
@ -114,6 +124,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, processedTsNs) lastReadTime = time.Unix(0, processedTsNs)
} }
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock() lock.Mutex.Lock()
lock.cond.Wait() lock.cond.Wait()

18
weed/messaging/msgclient/subscriber.go

@ -15,7 +15,7 @@ type Subscriber struct {
subscriberId string subscriberId string
} }
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
// read topic configuration // read topic configuration
topicConfiguration := &messaging_pb.TopicConfiguration{ topicConfiguration := &messaging_pb.TopicConfiguration{
PartitionCount: 4, PartitionCount: 4,
@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
if partitionId>=0 && i != partitionId {
continue
}
tp := broker.TopicPartition{ tp := broker.TopicPartition{
Namespace: namespace, Namespace: namespace,
Topic: topic, Topic: topic,
@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti
return return
} }
// process init response
_, err = stream.Recv()
if err != nil {
return
}
return stream, nil return stream, nil
} }
func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
for { for {
resp, listenErr := s.subscriberClients[partition].Recv()
resp, listenErr := subscriberClient.Recv()
if listenErr == io.EOF { if listenErr == io.EOF {
return nil return nil
} }
@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M
// Subscribe starts goroutines to process the messages // Subscribe starts goroutines to process the messages
func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
for i := 0; i < len(s.subscriberClients); i++ { for i := 0; i < len(s.subscriberClients); i++ {
go s.doSubscribe(i, processFn)
if s.subscriberClients[i] != nil {
go doSubscribe(s.subscriberClients[i], processFn)
}
} }
} }
Loading…
Cancel
Save