Browse Source

clean up dead code

pull/5637/head
chrislu 1 year ago
parent
commit
458ddbf919
  1. 4
      weed/mq/client/cmd/weed_sub/subscriber.go
  2. 74
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  3. 34
      weed/mq/client/sub_client/lookup.go
  4. 85
      weed/mq/client/sub_client/process.go
  5. 42
      weed/mq/client/sub_client/subscribe.go
  6. 2
      weed/mq/client/sub_client/subscriber.go

4
weed/mq/client/cmd/weed_sub/subscriber.go

@ -40,9 +40,9 @@ func main() {
brokers := strings.Split(*seedBrokers, ",") brokers := strings.Split(*seedBrokers, ",")
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig) subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
subscriber.SetEachMessageFunc(func(key, value []byte) bool {
subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
println(string(key), "=>", string(value)) println(string(key), "=>", string(value))
return true
return true, nil
}) })
subscriber.SetCompletionFunc(func() { subscriber.SetCompletionFunc(func() {

74
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -2,9 +2,11 @@ package sub_client
import ( import (
"context" "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"sync" "sync"
"time" "time"
) )
@ -87,4 +89,76 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
} }
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) { func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) {
// connect to the partition broker
pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Message: &mq_pb.SubscribeRequest_Init{
Init: &mq_pb.SubscribeRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
Partition: &mq_pb.Partition{
RingSize: partition.RingSize,
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
StartTimestampNs: sub.alreadyProcessedTsNs,
},
},
},
})
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
}
fmt.Printf("subscriber %s/%s/%s connected to partition %+v at %v\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
}
defer func() {
subscribeClient.SendMsg(&mq_pb.SubscribeRequest{
Message: &mq_pb.SubscribeRequest_Ack{
Ack: &mq_pb.SubscribeRequest_AckMessage{
Sequence: 0,
},
},
})
subscribeClient.CloseSend()
}()
for {
fmt.Printf("subscriber %s/%s/%s waiting for message\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("subscribe error: %v", err)
}
if resp.Message == nil {
continue
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeResponse_Data:
shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr != nil {
return fmt.Errorf("process error: %v", processErr)
}
if !shouldContinue {
return nil
}
sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}
}
}
return nil
})
} }

34
weed/mq/client/sub_client/lookup.go

@ -1,34 +0,0 @@
package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
func (sub *TopicSubscriber) doLookup(brokerAddress string) error {
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
sub.SubscriberConfig.GrpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
IsForPublish: false,
})
if err != nil {
return err
}
sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments
return nil
})
if err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
return nil
}

85
weed/mq/client/sub_client/process.go

@ -1,85 +0,0 @@
package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
)
func (sub *TopicSubscriber) doProcess() error {
var wg sync.WaitGroup
for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
brokerAddress := brokerPartitionAssignment.LeaderBroker
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption)
if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Message: &mq_pb.SubscribeRequest_Init{
Init: &mq_pb.SubscribeRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
StartTimestampNs: sub.alreadyProcessedTsNs,
},
},
},
})
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
}
defer func() {
subscribeClient.SendMsg(&mq_pb.SubscribeRequest{
Message: &mq_pb.SubscribeRequest_Ack{
Ack: &mq_pb.SubscribeRequest_AckMessage{
Sequence: 0,
},
},
})
subscribeClient.CloseSend()
}()
for {
resp, err := subscribeClient.Recv()
if err != nil {
fmt.Printf("subscribe error: %v\n", err)
return
}
if resp.Message == nil {
continue
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeResponse_Data:
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
return
}
sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return
}
}
}
}()
}
wg.Wait()
return nil
}

42
weed/mq/client/sub_client/subscribe.go

@ -1,13 +1,5 @@
package sub_client package sub_client
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"log"
"time"
)
// Subscribe subscribes to a topic's specified partitions. // Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
@ -15,39 +7,5 @@ func (sub *TopicSubscriber) Subscribe() error {
// loop forever // loop forever
sub.doKeepConnectedToSubCoordinator() sub.doKeepConnectedToSubCoordinator()
index := -1
util.RetryUntil("subscribe", func() error {
index++
index = index % len(sub.bootstrapBrokers)
// ask balancer for brokers of the topic
if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
if len(sub.brokerPartitionAssignments) == 0 {
if sub.waitForMoreMessage {
time.Sleep(1 * time.Second)
return fmt.Errorf("no broker partition assignments")
} else {
return nil
}
}
// treat the first broker as the topic leader
// connect to the leader broker
// subscribe to the topic
if err := sub.doProcess(); err != nil {
return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
return nil
}, func(err error) bool {
if err == io.EOF {
log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
sub.waitForMoreMessage = false
return false
}
return true
})
return nil return nil
} }

2
weed/mq/client/sub_client/subscriber.go

@ -27,7 +27,7 @@ type ProcessorConfiguration struct {
ConcurrentPartitionLimit int // how many partitions to process concurrently ConcurrentPartitionLimit int // how many partitions to process concurrently
} }
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
type OnCompletionFunc func() type OnCompletionFunc func()
type TopicSubscriber struct { type TopicSubscriber struct {

Loading…
Cancel
Save