|
@ -4,6 +4,7 @@ import ( |
|
|
"context" |
|
|
"context" |
|
|
"io" |
|
|
"io" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
"sync" |
|
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/messaging/broker" |
|
|
"github.com/chrislusf/seaweedfs/weed/messaging/broker" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
@ -97,11 +98,17 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, |
|
|
|
|
|
|
|
|
// Subscribe starts goroutines to process the messages
|
|
|
// Subscribe starts goroutines to process the messages
|
|
|
func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { |
|
|
func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { |
|
|
|
|
|
var wg sync.WaitGroup |
|
|
for i := 0; i < len(s.subscriberClients); i++ { |
|
|
for i := 0; i < len(s.subscriberClients); i++ { |
|
|
if s.subscriberClients[i] != nil { |
|
|
if s.subscriberClients[i] != nil { |
|
|
go doSubscribe(s.subscriberClients[i], processFn) |
|
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
|
|
go func() { |
|
|
|
|
|
defer wg.Done() |
|
|
|
|
|
doSubscribe(s.subscriberClients[i], processFn) |
|
|
|
|
|
}() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
wg.Wait() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s *Subscriber) Shutdown() { |
|
|
func (s *Subscriber) Shutdown() { |
|
|