You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							298 lines
						
					
					
						
							8.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							298 lines
						
					
					
						
							8.8 KiB
						
					
					
				
								package pub_client
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
							 | 
						|
									"google.golang.org/grpc"
							 | 
						|
									"google.golang.org/grpc/codes"
							 | 
						|
									"google.golang.org/grpc/credentials/insecure"
							 | 
						|
									"google.golang.org/grpc/status"
							 | 
						|
									"log"
							 | 
						|
									"sort"
							 | 
						|
									"sync"
							 | 
						|
									"sync/atomic"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type EachPartitionError struct {
							 | 
						|
									*mq_pb.BrokerPartitionAssignment
							 | 
						|
									Err        error
							 | 
						|
									generation int
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type EachPartitionPublishJob struct {
							 | 
						|
									*mq_pb.BrokerPartitionAssignment
							 | 
						|
									stopChan   chan bool
							 | 
						|
									wg         sync.WaitGroup
							 | 
						|
									generation int
							 | 
						|
									inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
							 | 
						|
								
							 | 
						|
									if err := p.doConfigureTopic(); err != nil {
							 | 
						|
										wg.Done()
							 | 
						|
										return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									log.Printf("start scheduler thread for topic %s", p.config.Topic)
							 | 
						|
								
							 | 
						|
									generation := 0
							 | 
						|
									var errChan chan EachPartitionError
							 | 
						|
									for {
							 | 
						|
										glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
							 | 
						|
										if assignments, err := p.doLookupTopicPartitions(); err == nil {
							 | 
						|
											generation++
							 | 
						|
											glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
							 | 
						|
											if errChan == nil {
							 | 
						|
												errChan = make(chan EachPartitionError, len(assignments))
							 | 
						|
											}
							 | 
						|
											p.onEachAssignments(generation, assignments, errChan)
							 | 
						|
										} else {
							 | 
						|
											glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
							 | 
						|
											time.Sleep(5 * time.Second)
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if generation == 1 {
							 | 
						|
											wg.Done()
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// wait for any error to happen. If so, consume all remaining errors, and retry
							 | 
						|
										for {
							 | 
						|
											select {
							 | 
						|
											case eachErr := <-errChan:
							 | 
						|
												glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
							 | 
						|
												if eachErr.generation < generation {
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
							 | 
						|
									// TODO assuming this is not re-configured so the partitions are fixed.
							 | 
						|
									sort.Slice(assignments, func(i, j int) bool {
							 | 
						|
										return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
							 | 
						|
									})
							 | 
						|
									var jobs []*EachPartitionPublishJob
							 | 
						|
									hasExistingJob := len(p.jobs) == len(assignments)
							 | 
						|
									for i, assignment := range assignments {
							 | 
						|
										if assignment.LeaderBroker == "" {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										if hasExistingJob {
							 | 
						|
											var existingJob *EachPartitionPublishJob
							 | 
						|
											existingJob = p.jobs[i]
							 | 
						|
											if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
							 | 
						|
												existingJob.generation = generation
							 | 
						|
												jobs = append(jobs, existingJob)
							 | 
						|
												continue
							 | 
						|
											} else {
							 | 
						|
												if existingJob.LeaderBroker != "" {
							 | 
						|
													close(existingJob.stopChan)
							 | 
						|
													existingJob.LeaderBroker = ""
							 | 
						|
													existingJob.wg.Wait()
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// start a go routine to publish to this partition
							 | 
						|
										job := &EachPartitionPublishJob{
							 | 
						|
											BrokerPartitionAssignment: assignment,
							 | 
						|
											stopChan:                  make(chan bool, 1),
							 | 
						|
											generation:                generation,
							 | 
						|
											inputQueue:                buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
							 | 
						|
										}
							 | 
						|
										job.wg.Add(1)
							 | 
						|
										go func(job *EachPartitionPublishJob) {
							 | 
						|
											defer job.wg.Done()
							 | 
						|
											if err := p.doPublishToPartition(job); err != nil {
							 | 
						|
												log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
							 | 
						|
												errChan <- EachPartitionError{assignment, err, generation}
							 | 
						|
											}
							 | 
						|
										}(job)
							 | 
						|
										jobs = append(jobs, job)
							 | 
						|
										// TODO assuming this is not re-configured so the partitions are fixed.
							 | 
						|
										// better just re-use the existing job
							 | 
						|
										p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
							 | 
						|
									}
							 | 
						|
									p.jobs = jobs
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
							 | 
						|
								
							 | 
						|
									log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
							 | 
						|
								
							 | 
						|
									grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
							 | 
						|
									}
							 | 
						|
									brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
							 | 
						|
									stream, err := brokerClient.PublishMessage(context.Background())
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("create publish client: %w", err)
							 | 
						|
									}
							 | 
						|
									publishClient := &PublishClient{
							 | 
						|
										SeaweedMessaging_PublishMessageClient: stream,
							 | 
						|
										Broker:                                job.LeaderBroker,
							 | 
						|
									}
							 | 
						|
									if err = publishClient.Send(&mq_pb.PublishMessageRequest{
							 | 
						|
										Message: &mq_pb.PublishMessageRequest_Init{
							 | 
						|
											Init: &mq_pb.PublishMessageRequest_InitMessage{
							 | 
						|
												Topic:          p.config.Topic.ToPbTopic(),
							 | 
						|
												Partition:      job.Partition,
							 | 
						|
												AckInterval:    128,
							 | 
						|
												FollowerBroker: job.FollowerBroker,
							 | 
						|
												PublisherName:  p.config.PublisherName,
							 | 
						|
											},
							 | 
						|
										},
							 | 
						|
									}); err != nil {
							 | 
						|
										return fmt.Errorf("send init message: %w", err)
							 | 
						|
									}
							 | 
						|
									// process the hello message
							 | 
						|
									resp, err := stream.Recv()
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("recv init response: %w", err)
							 | 
						|
									}
							 | 
						|
									if resp.Error != "" {
							 | 
						|
										return fmt.Errorf("init response error: %v", resp.Error)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var publishedTsNs int64
							 | 
						|
									hasMoreData := int32(1)
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
									wg.Add(1)
							 | 
						|
									go func() {
							 | 
						|
										defer wg.Done()
							 | 
						|
										for {
							 | 
						|
											ackResp, err := publishClient.Recv()
							 | 
						|
											if err != nil {
							 | 
						|
												e, _ := status.FromError(err)
							 | 
						|
												if e.Code() == codes.Unknown && e.Message() == "EOF" {
							 | 
						|
													log.Printf("publish to %s EOF", publishClient.Broker)
							 | 
						|
													return
							 | 
						|
												}
							 | 
						|
												publishClient.Err = err
							 | 
						|
												log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											if ackResp.Error != "" {
							 | 
						|
												publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
							 | 
						|
												log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											if ackResp.AckSequence > 0 {
							 | 
						|
												log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
							 | 
						|
											}
							 | 
						|
											if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									publishCounter := 0
							 | 
						|
									for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
							 | 
						|
										if data.Ctrl != nil && data.Ctrl.IsClose {
							 | 
						|
											// need to set this before sending to brokers, to avoid timing issue
							 | 
						|
											atomic.StoreInt32(&hasMoreData, 0)
							 | 
						|
										}
							 | 
						|
										if err := publishClient.Send(&mq_pb.PublishMessageRequest{
							 | 
						|
											Message: &mq_pb.PublishMessageRequest_Data{
							 | 
						|
												Data: data,
							 | 
						|
											},
							 | 
						|
										}); err != nil {
							 | 
						|
											return fmt.Errorf("send publish data: %w", err)
							 | 
						|
										}
							 | 
						|
										publishCounter++
							 | 
						|
										atomic.StoreInt64(&publishedTsNs, data.TsNs)
							 | 
						|
									}
							 | 
						|
									if publishCounter > 0 {
							 | 
						|
										wg.Wait()
							 | 
						|
									} else {
							 | 
						|
										// CloseSend would cancel the context on the server side
							 | 
						|
										if err := publishClient.CloseSend(); err != nil {
							 | 
						|
											return fmt.Errorf("close send: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) doConfigureTopic() (err error) {
							 | 
						|
									if len(p.config.Brokers) == 0 {
							 | 
						|
										return fmt.Errorf("topic configuring found no bootstrap brokers")
							 | 
						|
									}
							 | 
						|
									var lastErr error
							 | 
						|
									for _, brokerAddress := range p.config.Brokers {
							 | 
						|
										err = pb.WithBrokerGrpcClient(false,
							 | 
						|
											brokerAddress,
							 | 
						|
											p.grpcDialOption,
							 | 
						|
											func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
												_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
							 | 
						|
													Topic:          p.config.Topic.ToPbTopic(),
							 | 
						|
													PartitionCount: p.config.PartitionCount,
							 | 
						|
													RecordType:     p.config.RecordType, // TODO schema upgrade
							 | 
						|
												})
							 | 
						|
												return err
							 | 
						|
											})
							 | 
						|
										if err == nil {
							 | 
						|
											lastErr = nil
							 | 
						|
											return nil
							 | 
						|
										} else {
							 | 
						|
											lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if lastErr != nil {
							 | 
						|
										return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
							 | 
						|
									if len(p.config.Brokers) == 0 {
							 | 
						|
										return nil, fmt.Errorf("lookup found no bootstrap brokers")
							 | 
						|
									}
							 | 
						|
									var lastErr error
							 | 
						|
									for _, brokerAddress := range p.config.Brokers {
							 | 
						|
										err := pb.WithBrokerGrpcClient(false,
							 | 
						|
											brokerAddress,
							 | 
						|
											p.grpcDialOption,
							 | 
						|
											func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
												lookupResp, err := client.LookupTopicBrokers(context.Background(),
							 | 
						|
													&mq_pb.LookupTopicBrokersRequest{
							 | 
						|
														Topic: p.config.Topic.ToPbTopic(),
							 | 
						|
													})
							 | 
						|
												glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
							 | 
						|
								
							 | 
						|
												if err != nil {
							 | 
						|
													return err
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												if len(lookupResp.BrokerPartitionAssignments) == 0 {
							 | 
						|
													return fmt.Errorf("no broker partition assignments")
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												assignments = lookupResp.BrokerPartitionAssignments
							 | 
						|
								
							 | 
						|
												return nil
							 | 
						|
											})
							 | 
						|
										if err == nil {
							 | 
						|
											return assignments, nil
							 | 
						|
										} else {
							 | 
						|
											lastErr = err
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
							 | 
						|
								
							 | 
						|
								}
							 |