chrislu
1 year ago
4 changed files with 99 additions and 71 deletions
-
6weed/mq/client/cmd/weed_sub/subscriber.go
-
81weed/mq/client/sub_client/process.go
-
72weed/mq/client/sub_client/subscribe.go
-
11weed/mq/client/sub_client/subscriber.go
@ -0,0 +1,81 @@ |
|||
package sub_client |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"sync" |
|||
) |
|||
|
|||
func (sub *TopicSubscriber) doProcess() error { |
|||
var wg sync.WaitGroup |
|||
for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { |
|||
brokerAddress := brokerPartitionAssignment.LeaderBroker |
|||
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) |
|||
if err != nil { |
|||
return fmt.Errorf("dial broker %s: %v", brokerAddress, err) |
|||
} |
|||
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) |
|||
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ |
|||
Message: &mq_pb.SubscribeRequest_Init{ |
|||
Init: &mq_pb.SubscribeRequest_InitMessage{ |
|||
ConsumerGroup: sub.SubscriberConfig.GroupId, |
|||
ConsumerId: sub.SubscriberConfig.GroupInstanceId, |
|||
Topic: &mq_pb.Topic{ |
|||
Namespace: sub.ContentConfig.Namespace, |
|||
Name: sub.ContentConfig.Topic, |
|||
}, |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: brokerPartitionAssignment.Partition.RingSize, |
|||
RangeStart: brokerPartitionAssignment.Partition.RangeStart, |
|||
RangeStop: brokerPartitionAssignment.Partition.RangeStop, |
|||
}, |
|||
Filter: sub.ContentConfig.Filter, |
|||
}, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("create subscribe client: %v", err) |
|||
} |
|||
wg.Add(1) |
|||
go func() { |
|||
defer wg.Done() |
|||
if sub.OnCompletionFunc != nil { |
|||
defer sub.OnCompletionFunc() |
|||
} |
|||
defer func() { |
|||
subscribeClient.SendMsg(&mq_pb.SubscribeRequest{ |
|||
Message: &mq_pb.SubscribeRequest_Ack{ |
|||
Ack: &mq_pb.SubscribeRequest_AckMessage{ |
|||
Sequence: 0, |
|||
}, |
|||
}, |
|||
}) |
|||
subscribeClient.CloseSend() |
|||
}() |
|||
for { |
|||
resp, err := subscribeClient.Recv() |
|||
if err != nil { |
|||
fmt.Printf("subscribe error: %v\n", err) |
|||
return |
|||
} |
|||
if resp.Message == nil { |
|||
continue |
|||
} |
|||
switch m := resp.Message.(type) { |
|||
case *mq_pb.SubscribeResponse_Data: |
|||
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { |
|||
return |
|||
} |
|||
case *mq_pb.SubscribeResponse_Ctrl: |
|||
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { |
|||
return |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
} |
|||
wg.Wait() |
|||
return nil |
|||
} |
@ -1,72 +1,28 @@ |
|||
package sub_client |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|||
"sync" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
"io" |
|||
) |
|||
|
|||
// Subscribe subscribes to a topic's specified partitions.
|
|||
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
|
|||
|
|||
func (sub *TopicSubscriber) Subscribe() error { |
|||
var wg sync.WaitGroup |
|||
for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { |
|||
brokerAddress := brokerPartitionAssignment.LeaderBroker |
|||
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) |
|||
if err != nil { |
|||
return fmt.Errorf("dial broker %s: %v", brokerAddress, err) |
|||
util.RetryUntil("subscribe", func() error { |
|||
if err := sub.doLookup(sub.bootstrapBroker); err != nil { |
|||
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) |
|||
} |
|||
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) |
|||
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ |
|||
Message: &mq_pb.SubscribeRequest_Init{ |
|||
Init: &mq_pb.SubscribeRequest_InitMessage{ |
|||
ConsumerGroup: sub.SubscriberConfig.GroupId, |
|||
ConsumerId: sub.SubscriberConfig.GroupInstanceId, |
|||
Topic: &mq_pb.Topic{ |
|||
Namespace: sub.ContentConfig.Namespace, |
|||
Name: sub.ContentConfig.Topic, |
|||
}, |
|||
Partition: &mq_pb.Partition{ |
|||
RingSize: brokerPartitionAssignment.Partition.RingSize, |
|||
RangeStart: brokerPartitionAssignment.Partition.RangeStart, |
|||
RangeStop: brokerPartitionAssignment.Partition.RangeStop, |
|||
}, |
|||
Filter: sub.ContentConfig.Filter, |
|||
}, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("create subscribe client: %v", err) |
|||
if err := sub.doProcess(); err != nil { |
|||
return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) |
|||
} |
|||
wg.Add(1) |
|||
go func() { |
|||
defer wg.Done() |
|||
if sub.OnCompletionFunc != nil { |
|||
defer sub.OnCompletionFunc() |
|||
} |
|||
for { |
|||
resp, err := subscribeClient.Recv() |
|||
if err != nil { |
|||
fmt.Printf("subscribe error: %v\n", err) |
|||
return |
|||
} |
|||
if resp.Message == nil { |
|||
continue |
|||
} |
|||
switch m := resp.Message.(type) { |
|||
case *mq_pb.SubscribeResponse_Data: |
|||
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { |
|||
return |
|||
} |
|||
case *mq_pb.SubscribeResponse_Ctrl: |
|||
// ignore
|
|||
} |
|||
} |
|||
}() |
|||
} |
|||
wg.Wait() |
|||
return nil |
|||
}, func(err error) bool { |
|||
if err == io.EOF { |
|||
return false |
|||
} |
|||
return true |
|||
}) |
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue