chrislu
1 year ago
12 changed files with 297 additions and 58 deletions
-
1go.mod
-
2go.sum
-
2weed/mq/broker/broker_grpc_lookup.go
-
15weed/mq/broker/broker_grpc_pub.go
-
29weed/mq/client/cmd/weed_pub/publisher.go
-
0weed/mq/client/cmd/weed_sub/subscriber.go
-
74weed/mq/client/pub_client/lookup.go
-
34weed/mq/client/pub_client/publish.go
-
69weed/mq/client/pub_client/publisher.go
-
74weed/mq/client/sub_client/lookup.go
-
28weed/mq/client/sub_client/subscribe.go
-
19weed/mq/topic/local_partition.go
@ -0,0 +1,29 @@ |
|||||
|
package main |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" |
||||
|
) |
||||
|
|
||||
|
func main() { |
||||
|
|
||||
|
publisher := pub_client.NewTopicPublisher( |
||||
|
"test", "test") |
||||
|
if err := publisher.Connect("localhost:17777"); err != nil { |
||||
|
fmt.Println(err) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
for i := 0; i < 10; i++ { |
||||
|
if dataErr := publisher.Publish( |
||||
|
[]byte(fmt.Sprintf("key-%d", i)), |
||||
|
[]byte(fmt.Sprintf("value-%d", i)), |
||||
|
); dataErr != nil { |
||||
|
fmt.Println(dataErr) |
||||
|
return |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
fmt.Println("done publishing") |
||||
|
|
||||
|
} |
@ -0,0 +1,74 @@ |
|||||
|
package pub_client |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
"google.golang.org/grpc" |
||||
|
) |
||||
|
|
||||
|
func (p *TopicPublisher) doLookup( |
||||
|
brokerAddress string, grpcDialOption grpc.DialOption) error { |
||||
|
err := pb.WithBrokerGrpcClient(true, |
||||
|
brokerAddress, |
||||
|
grpcDialOption, |
||||
|
func(client mq_pb.SeaweedMessagingClient) error { |
||||
|
lookupResp, err := client.LookupTopicBrokers(context.Background(), |
||||
|
&mq_pb.LookupTopicBrokersRequest{ |
||||
|
Topic: &mq_pb.Topic{ |
||||
|
Namespace: p.namespace, |
||||
|
Name: p.topic, |
||||
|
}, |
||||
|
IsForPublish: true, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { |
||||
|
// partition => broker
|
||||
|
p.partition2Broker.Insert( |
||||
|
brokerPartitionAssignment.Partition.RangeStart, |
||||
|
brokerPartitionAssignment.Partition.RangeStop, |
||||
|
brokerPartitionAssignment.LeaderBroker) |
||||
|
|
||||
|
// broker => publish client
|
||||
|
// send init message
|
||||
|
// save the publishing client
|
||||
|
brokerAddress := brokerPartitionAssignment.LeaderBroker |
||||
|
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("dial broker %s: %v", brokerAddress, err) |
||||
|
} |
||||
|
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) |
||||
|
publishClient, err := brokerClient.Publish(context.Background()) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("create publish client: %v", err) |
||||
|
} |
||||
|
p.broker2PublishClient.Set(brokerAddress, publishClient) |
||||
|
if err = publishClient.Send(&mq_pb.PublishRequest{ |
||||
|
Message: &mq_pb.PublishRequest_Init{ |
||||
|
Init: &mq_pb.PublishRequest_InitMessage{ |
||||
|
Topic: &mq_pb.Topic{ |
||||
|
Namespace: p.namespace, |
||||
|
Name: p.topic, |
||||
|
}, |
||||
|
Partition: &mq_pb.Partition{ |
||||
|
RingSize: brokerPartitionAssignment.Partition.RingSize, |
||||
|
RangeStart: brokerPartitionAssignment.Partition.RangeStart, |
||||
|
RangeStop: brokerPartitionAssignment.Partition.RangeStop, |
||||
|
}, |
||||
|
}, |
||||
|
}, |
||||
|
}); err != nil { |
||||
|
return fmt.Errorf("send init message: %v", err) |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
}) |
||||
|
|
||||
|
if err != nil { |
||||
|
return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) |
||||
|
} |
||||
|
return nil |
||||
|
} |
@ -0,0 +1,34 @@ |
|||||
|
package pub_client |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/mq/broker" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/util" |
||||
|
) |
||||
|
|
||||
|
func (p *TopicPublisher) Publish(key, value []byte) error { |
||||
|
hashKey := util.HashToInt32(key) % broker.MaxPartitionCount |
||||
|
if hashKey < 0 { |
||||
|
hashKey = -hashKey |
||||
|
} |
||||
|
brokerAddress, found := p.partition2Broker.Floor(hashKey, hashKey) |
||||
|
if !found { |
||||
|
return fmt.Errorf("no broker found for key %d", hashKey) |
||||
|
} |
||||
|
publishClient, found := p.broker2PublishClient.Get(brokerAddress) |
||||
|
if !found { |
||||
|
return fmt.Errorf("no publish client found for broker %s", brokerAddress) |
||||
|
} |
||||
|
if err := publishClient.Send(&mq_pb.PublishRequest{ |
||||
|
Message: &mq_pb.PublishRequest_Data{ |
||||
|
Data: &mq_pb.DataMessage{ |
||||
|
Key: key, |
||||
|
Value: value, |
||||
|
}, |
||||
|
}, |
||||
|
}); err != nil { |
||||
|
return fmt.Errorf("send publish request: %v", err) |
||||
|
} |
||||
|
return nil |
||||
|
} |
@ -1,59 +1,36 @@ |
|||||
package main |
|
||||
|
package pub_client |
||||
|
|
||||
import ( |
import ( |
||||
"context" |
|
||||
"fmt" |
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
||||
|
cmap "github.com/orcaman/concurrent-map/v2" |
||||
|
"github.com/rdleal/intervalst/interval" |
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
"google.golang.org/grpc" |
"google.golang.org/grpc" |
||||
"google.golang.org/grpc/credentials/insecure" |
"google.golang.org/grpc/credentials/insecure" |
||||
) |
) |
||||
|
|
||||
func main() { |
|
||||
|
type PublisherConfiguration struct { |
||||
|
} |
||||
|
type TopicPublisher struct { |
||||
|
namespace string |
||||
|
topic string |
||||
|
partition2Broker *interval.SearchTree[string, int32] |
||||
|
broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] |
||||
|
} |
||||
|
|
||||
err := pb.WithBrokerGrpcClient(true, |
|
||||
"localhost:17777", |
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()), |
|
||||
func(client mq_pb.SeaweedMessagingClient) error { |
|
||||
pubClient, err := client.Publish(context.Background()) |
|
||||
if err != nil { |
|
||||
return err |
|
||||
} |
|
||||
if initErr := pubClient.Send(&mq_pb.PublishRequest{ |
|
||||
Message: &mq_pb.PublishRequest_Init{ |
|
||||
Init: &mq_pb.PublishRequest_InitMessage{ |
|
||||
Topic: &mq_pb.Topic{ |
|
||||
Namespace: "test", |
|
||||
Name: "test", |
|
||||
}, |
|
||||
Partition: &mq_pb.Partition{ |
|
||||
RangeStart: 0, |
|
||||
RangeStop: 1, |
|
||||
RingSize: 1, |
|
||||
}, |
|
||||
}, |
|
||||
}, |
|
||||
}); initErr != nil { |
|
||||
return initErr |
|
||||
|
func NewTopicPublisher(namespace, topic string) *TopicPublisher { |
||||
|
return &TopicPublisher{ |
||||
|
namespace: namespace, |
||||
|
topic: topic, |
||||
|
partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { |
||||
|
return int(a - b) |
||||
|
}), |
||||
|
broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), |
||||
} |
} |
||||
|
} |
||||
|
|
||||
for i := 0; i < 10; i++ { |
|
||||
if dataErr := pubClient.Send(&mq_pb.PublishRequest{ |
|
||||
Message: &mq_pb.PublishRequest_Data{ |
|
||||
Data: &mq_pb.DataMessage{ |
|
||||
Key: []byte(fmt.Sprintf("key-%d", i)), |
|
||||
Value: []byte(fmt.Sprintf("value-%d", i)), |
|
||||
}, |
|
||||
}, |
|
||||
}); dataErr != nil { |
|
||||
return dataErr |
|
||||
} |
|
||||
|
func (p *TopicPublisher) Connect(bootstrapBroker string) error { |
||||
|
if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { |
||||
|
return err |
||||
} |
} |
||||
return nil |
return nil |
||||
}) |
|
||||
|
|
||||
if err != nil { |
|
||||
fmt.Println(err) |
|
||||
} |
|
||||
|
|
||||
} |
} |
@ -0,0 +1,74 @@ |
|||||
|
package sub_client |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
"google.golang.org/grpc" |
||||
|
) |
||||
|
|
||||
|
func (p *TopicSubscriber) doLookup( |
||||
|
brokerAddress string, grpcDialOption grpc.DialOption) error { |
||||
|
err := pb.WithBrokerGrpcClient(true, |
||||
|
brokerAddress, |
||||
|
grpcDialOption, |
||||
|
func(client mq_pb.SeaweedMessagingClient) error { |
||||
|
lookupResp, err := client.LookupTopicBrokers(context.Background(), |
||||
|
&mq_pb.LookupTopicBrokersRequest{ |
||||
|
Topic: &mq_pb.Topic{ |
||||
|
Namespace: p.namespace, |
||||
|
Name: p.topic, |
||||
|
}, |
||||
|
IsForPublish: true, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { |
||||
|
// partition => broker
|
||||
|
p.partition2Broker.Insert( |
||||
|
brokerPartitionAssignment.Partition.RangeStart, |
||||
|
brokerPartitionAssignment.Partition.RangeStop, |
||||
|
brokerPartitionAssignment.LeaderBroker) |
||||
|
|
||||
|
// broker => publish client
|
||||
|
// send init message
|
||||
|
// save the publishing client
|
||||
|
brokerAddress := brokerPartitionAssignment.LeaderBroker |
||||
|
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("dial broker %s: %v", brokerAddress, err) |
||||
|
} |
||||
|
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) |
||||
|
publishClient, err := brokerClient.Publish(context.Background()) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("create publish client: %v", err) |
||||
|
} |
||||
|
p.broker2PublishClient.Set(brokerAddress, publishClient) |
||||
|
if err = publishClient.Send(&mq_pb.PublishRequest{ |
||||
|
Message: &mq_pb.PublishRequest_Init{ |
||||
|
Init: &mq_pb.PublishRequest_InitMessage{ |
||||
|
Topic: &mq_pb.Topic{ |
||||
|
Namespace: p.namespace, |
||||
|
Name: p.topic, |
||||
|
}, |
||||
|
Partition: &mq_pb.Partition{ |
||||
|
RingSize: brokerPartitionAssignment.Partition.RingSize, |
||||
|
RangeStart: brokerPartitionAssignment.Partition.RangeStart, |
||||
|
RangeStop: brokerPartitionAssignment.Partition.RangeStop, |
||||
|
}, |
||||
|
}, |
||||
|
}, |
||||
|
}); err != nil { |
||||
|
return fmt.Errorf("send init message: %v", err) |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
}) |
||||
|
|
||||
|
if err != nil { |
||||
|
return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) |
||||
|
} |
||||
|
return nil |
||||
|
} |
@ -0,0 +1,28 @@ |
|||||
|
package sub_client |
||||
|
|
||||
|
import ( |
||||
|
cmap "github.com/orcaman/concurrent-map" |
||||
|
"github.com/rdleal/intervalst/interval" |
||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
||||
|
) |
||||
|
|
||||
|
type SubscriberConfiguration struct { |
||||
|
} |
||||
|
|
||||
|
type TopicSubscriber struct { |
||||
|
namespace string |
||||
|
topic string |
||||
|
partition2Broker *interval.SearchTree[string, int32] |
||||
|
broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient] |
||||
|
} |
||||
|
|
||||
|
func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber { |
||||
|
return &TopicSubscriber{ |
||||
|
namespace: namespace, |
||||
|
topic: topic, |
||||
|
partition2Broker: interval.NewSearchTree[string](func(a, b int32) int { |
||||
|
return int(a - b) |
||||
|
}), |
||||
|
broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](), |
||||
|
} |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue