You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							118 lines
						
					
					
						
							2.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							118 lines
						
					
					
						
							2.8 KiB
						
					
					
				
								package msgclient
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
								
							 | 
						|
									"github.com/OneOfOne/xxhash"
							 | 
						|
									"google.golang.org/grpc"
							 | 
						|
								
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/messaging/broker"
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type Publisher struct {
							 | 
						|
									publishClients     []messaging_pb.SeaweedMessaging_PublishClient
							 | 
						|
									topicConfiguration *messaging_pb.TopicConfiguration
							 | 
						|
									messageCount       uint64
							 | 
						|
									publisherId        string
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
							 | 
						|
									// read topic configuration
							 | 
						|
									topicConfiguration := &messaging_pb.TopicConfiguration{
							 | 
						|
										PartitionCount: 4,
							 | 
						|
									}
							 | 
						|
									publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
							 | 
						|
									for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
							 | 
						|
										tp := broker.TopicPartition{
							 | 
						|
											Namespace: namespace,
							 | 
						|
											Topic:     topic,
							 | 
						|
											Partition: int32(i),
							 | 
						|
										}
							 | 
						|
										grpcClientConn, err := mc.findBroker(tp)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										client, err := setupPublisherClient(grpcClientConn, tp)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
										publishClients[i] = client
							 | 
						|
									}
							 | 
						|
									return &Publisher{
							 | 
						|
										publishClients:     publishClients,
							 | 
						|
										topicConfiguration: topicConfiguration,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
							 | 
						|
								
							 | 
						|
									stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// send init message
							 | 
						|
									err = stream.Send(&messaging_pb.PublishRequest{
							 | 
						|
										Init: &messaging_pb.PublishRequest_InitMessage{
							 | 
						|
											Namespace: tp.Namespace,
							 | 
						|
											Topic:     tp.Topic,
							 | 
						|
											Partition: tp.Partition,
							 | 
						|
										},
							 | 
						|
									})
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// process init response
							 | 
						|
									initResponse, err := stream.Recv()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									if initResponse.Redirect != nil {
							 | 
						|
										// TODO follow redirection
							 | 
						|
									}
							 | 
						|
									if initResponse.Config != nil {
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// setup looks for control messages
							 | 
						|
									doneChan := make(chan error, 1)
							 | 
						|
									go func() {
							 | 
						|
										for {
							 | 
						|
											in, err := stream.Recv()
							 | 
						|
											if err != nil {
							 | 
						|
												doneChan <- err
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											if in.Redirect != nil {
							 | 
						|
											}
							 | 
						|
											if in.Config != nil {
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									return stream, nil
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *Publisher) Publish(m *messaging_pb.Message) error {
							 | 
						|
									hashValue := p.messageCount
							 | 
						|
									p.messageCount++
							 | 
						|
									if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
							 | 
						|
										if m.Key != nil {
							 | 
						|
											hashValue = xxhash.Checksum64(m.Key)
							 | 
						|
										}
							 | 
						|
									} else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
							 | 
						|
										hashValue = xxhash.Checksum64(m.Key)
							 | 
						|
									} else {
							 | 
						|
										// round robin
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									idx := int(hashValue) % len(p.publishClients)
							 | 
						|
									if idx < 0 {
							 | 
						|
										idx += len(p.publishClients)
							 | 
						|
									}
							 | 
						|
									return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
							 | 
						|
										Data: m,
							 | 
						|
									})
							 | 
						|
								}
							 |