|
|
@ -14,15 +14,16 @@ import ( |
|
|
|
func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { |
|
|
|
// the first message is the subscribe request
|
|
|
|
// it should only contain the session id
|
|
|
|
m, err := stream.Recv() |
|
|
|
initMessage, err := stream.Recv() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
sessionId := SessionId(initMessage.SessionId) |
|
|
|
a.subscribersLock.RLock() |
|
|
|
subscriberEntry, found := a.subscribers[SessionId(m.SessionId)] |
|
|
|
subscriberEntry, found := a.subscribers[sessionId] |
|
|
|
a.subscribersLock.RUnlock() |
|
|
|
if !found { |
|
|
|
return fmt.Errorf("subscribe session id %d not found", m.SessionId) |
|
|
|
return fmt.Errorf("subscribe session id %d not found", sessionId) |
|
|
|
} |
|
|
|
defer func() { |
|
|
|
subscriberEntry.lastActiveTsNs = time.Now().UnixNano() |
|
|
@ -34,6 +35,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA |
|
|
|
record := &schema_pb.RecordValue{} |
|
|
|
err := proto.Unmarshal(m.Data.Value, record) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("unmarshal record value: %v", err) |
|
|
|
if lastErr == nil { |
|
|
|
lastErr = err |
|
|
|
} |
|
|
@ -44,6 +46,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA |
|
|
|
Value: record, |
|
|
|
TsNs: m.Data.TsNs, |
|
|
|
}); sendErr != nil { |
|
|
|
glog.V(0).Infof("send record: %v", sendErr) |
|
|
|
if lastErr == nil { |
|
|
|
lastErr = sendErr |
|
|
|
} |
|
|
@ -53,7 +56,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA |
|
|
|
go func() { |
|
|
|
subErr := subscriberEntry.entry.Subscribe() |
|
|
|
if subErr != nil { |
|
|
|
glog.V(0).Infof("subscriber %d subscribe: %v", m.SessionId, subErr) |
|
|
|
glog.V(0).Infof("subscriber %d subscribe: %v", sessionId, subErr) |
|
|
|
if lastErr == nil { |
|
|
|
lastErr = subErr |
|
|
|
} |
|
|
@ -63,6 +66,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA |
|
|
|
for { |
|
|
|
m, err := stream.Recv() |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("subscriber %d receive: %v", sessionId, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if m != nil { |
|
|
|