|
@ -13,7 +13,7 @@ import ( |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type MessageBrokerOption struct { |
|
|
|
|
|
|
|
|
type MessageQueueBrokerOption struct { |
|
|
Filers []pb.ServerAddress |
|
|
Filers []pb.ServerAddress |
|
|
DefaultReplication string |
|
|
DefaultReplication string |
|
|
MaxMB int |
|
|
MaxMB int |
|
@ -22,16 +22,16 @@ type MessageBrokerOption struct { |
|
|
Cipher bool |
|
|
Cipher bool |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type MessageBroker struct { |
|
|
|
|
|
|
|
|
type MessageQueueBroker struct { |
|
|
mq_pb.UnimplementedSeaweedMessagingServer |
|
|
mq_pb.UnimplementedSeaweedMessagingServer |
|
|
option *MessageBrokerOption |
|
|
|
|
|
|
|
|
option *MessageQueueBrokerOption |
|
|
grpcDialOption grpc.DialOption |
|
|
grpcDialOption grpc.DialOption |
|
|
topicManager *TopicManager |
|
|
topicManager *TopicManager |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) { |
|
|
|
|
|
|
|
|
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { |
|
|
|
|
|
|
|
|
messageBroker = &MessageBroker{ |
|
|
|
|
|
|
|
|
messageBroker = &MessageQueueBroker{ |
|
|
option: option, |
|
|
option: option, |
|
|
grpcDialOption: grpcDialOption, |
|
|
grpcDialOption: grpcDialOption, |
|
|
} |
|
|
} |
|
@ -45,7 +45,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio |
|
|
return messageBroker, nil |
|
|
return messageBroker, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (broker *MessageBroker) keepConnectedToOneFiler() { |
|
|
|
|
|
|
|
|
func (broker *MessageQueueBroker) keepConnectedToOneFiler() { |
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
|
for _, filer := range broker.option.Filers { |
|
|
for _, filer := range broker.option.Filers { |
|
@ -101,13 +101,13 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { |
|
|
|
|
|
|
|
|
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { |
|
|
|
|
|
|
|
|
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) |
|
|
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { |
|
|
|
|
|
|
|
|
func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { |
|
|
|
|
|
|
|
|
return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { |
|
|
return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { |
|
|
return fn(client) |
|
|
return fn(client) |
|
|