|
@ -5,6 +5,7 @@ import ( |
|
|
"io" |
|
|
"io" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/messaging/broker" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
|
"google.golang.org/grpc" |
|
|
"google.golang.org/grpc" |
|
|
) |
|
|
) |
|
@ -14,7 +15,6 @@ type Subscriber struct { |
|
|
subscriberId string |
|
|
subscriberId string |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/* |
|
|
|
|
|
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { |
|
|
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { |
|
|
// read topic configuration
|
|
|
// read topic configuration
|
|
|
topicConfiguration := &messaging_pb.TopicConfiguration{ |
|
|
topicConfiguration := &messaging_pb.TopicConfiguration{ |
|
@ -23,7 +23,16 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, |
|
|
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) |
|
|
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) |
|
|
|
|
|
|
|
|
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { |
|
|
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { |
|
|
client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) |
|
|
|
|
|
|
|
|
tp := broker.TopicPartition{ |
|
|
|
|
|
Namespace: namespace, |
|
|
|
|
|
Topic: topic, |
|
|
|
|
|
Partition: int32(i), |
|
|
|
|
|
} |
|
|
|
|
|
grpcClientConn, err := mc.findBroker(tp) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, err |
|
|
|
|
|
} |
|
|
|
|
|
client, err := setupSubscriberClient(grpcClientConn, tp, subscriberId, startTime) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
@ -36,22 +45,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, |
|
|
}, nil |
|
|
}, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { |
|
|
|
|
|
|
|
|
|
|
|
stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return stream, err |
|
|
|
|
|
} |
|
|
|
|
|
if newBroker != nil { |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return stream, nil |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
|
|
func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { |
|
|
|
|
|
|
|
|
func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { |
|
|
stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) |
|
|
stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return |
|
|
return |
|
@ -60,9 +54,9 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, |
|
|
// send init message
|
|
|
// send init message
|
|
|
err = stream.Send(&messaging_pb.SubscriberMessage{ |
|
|
err = stream.Send(&messaging_pb.SubscriberMessage{ |
|
|
Init: &messaging_pb.SubscriberMessage_InitMessage{ |
|
|
Init: &messaging_pb.SubscriberMessage_InitMessage{ |
|
|
Namespace: namespace, |
|
|
|
|
|
Topic: topic, |
|
|
|
|
|
Partition: partition, |
|
|
|
|
|
|
|
|
Namespace: tp.Namespace, |
|
|
|
|
|
Topic: tp.Topic, |
|
|
|
|
|
Partition: tp.Partition, |
|
|
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, |
|
|
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, |
|
|
TimestampNs: startTime.UnixNano(), |
|
|
TimestampNs: startTime.UnixNano(), |
|
|
SubscriberId: subscriberId, |
|
|
SubscriberId: subscriberId, |
|
@ -90,6 +84,10 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M |
|
|
println(listenErr.Error()) |
|
|
println(listenErr.Error()) |
|
|
return listenErr |
|
|
return listenErr |
|
|
} |
|
|
} |
|
|
|
|
|
if resp.Data == nil { |
|
|
|
|
|
// this could be heartbeat from broker
|
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
processFn(resp.Data) |
|
|
processFn(resp.Data) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|