|
@ -2,7 +2,9 @@ package broker |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/cluster" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/wdclient" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"google.golang.org/grpc" |
|
|
"google.golang.org/grpc" |
|
@ -14,6 +16,8 @@ import ( |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type MessageQueueBrokerOption struct { |
|
|
type MessageQueueBrokerOption struct { |
|
|
|
|
|
Masters map[string]pb.ServerAddress |
|
|
|
|
|
FilerGroup string |
|
|
Filers []pb.ServerAddress |
|
|
Filers []pb.ServerAddress |
|
|
DefaultReplication string |
|
|
DefaultReplication string |
|
|
MaxMB int |
|
|
MaxMB int |
|
@ -26,23 +30,26 @@ type MessageQueueBroker struct { |
|
|
mq_pb.UnimplementedSeaweedMessagingServer |
|
|
mq_pb.UnimplementedSeaweedMessagingServer |
|
|
option *MessageQueueBrokerOption |
|
|
option *MessageQueueBrokerOption |
|
|
grpcDialOption grpc.DialOption |
|
|
grpcDialOption grpc.DialOption |
|
|
|
|
|
MasterClient *wdclient.MasterClient |
|
|
topicManager *TopicManager |
|
|
topicManager *TopicManager |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { |
|
|
|
|
|
|
|
|
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { |
|
|
|
|
|
|
|
|
messageBroker = &MessageQueueBroker{ |
|
|
|
|
|
|
|
|
mqBroker = &MessageQueueBroker{ |
|
|
option: option, |
|
|
option: option, |
|
|
grpcDialOption: grpcDialOption, |
|
|
grpcDialOption: grpcDialOption, |
|
|
|
|
|
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), "", option.Masters), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
messageBroker.topicManager = NewTopicManager(messageBroker) |
|
|
|
|
|
|
|
|
mqBroker.topicManager = NewTopicManager(mqBroker) |
|
|
|
|
|
|
|
|
messageBroker.checkFilers() |
|
|
|
|
|
|
|
|
mqBroker.checkFilers() |
|
|
|
|
|
|
|
|
go messageBroker.keepConnectedToOneFiler() |
|
|
|
|
|
|
|
|
go mqBroker.keepConnectedToOneFiler() |
|
|
|
|
|
go mqBroker.MasterClient.KeepConnectedToMaster() |
|
|
|
|
|
|
|
|
return messageBroker, nil |
|
|
|
|
|
|
|
|
return mqBroker, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (broker *MessageQueueBroker) keepConnectedToOneFiler() { |
|
|
func (broker *MessageQueueBroker) keepConnectedToOneFiler() { |
|
|