You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							85 lines
						
					
					
						
							2.4 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							85 lines
						
					
					
						
							2.4 KiB
						
					
					
				| package sub_client | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |
| 	"sync" | |
| ) | |
| 
 | |
| func (sub *TopicSubscriber) doProcess() error { | |
| 	var wg sync.WaitGroup | |
| 	for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { | |
| 		brokerAddress := brokerPartitionAssignment.LeaderBroker | |
| 		grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) | |
| 		if err != nil { | |
| 			return fmt.Errorf("dial broker %s: %v", brokerAddress, err) | |
| 		} | |
| 		brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) | |
| 		subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ | |
| 			Message: &mq_pb.SubscribeRequest_Init{ | |
| 				Init: &mq_pb.SubscribeRequest_InitMessage{ | |
| 					ConsumerGroup: sub.SubscriberConfig.GroupId, | |
| 					ConsumerId:    sub.SubscriberConfig.GroupInstanceId, | |
| 					Topic: &mq_pb.Topic{ | |
| 						Namespace: sub.ContentConfig.Namespace, | |
| 						Name:      sub.ContentConfig.Topic, | |
| 					}, | |
| 					Partition: &mq_pb.Partition{ | |
| 						RingSize:   brokerPartitionAssignment.Partition.RingSize, | |
| 						RangeStart: brokerPartitionAssignment.Partition.RangeStart, | |
| 						RangeStop:  brokerPartitionAssignment.Partition.RangeStop, | |
| 					}, | |
| 					Filter: sub.ContentConfig.Filter, | |
| 					Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ | |
| 						StartTimestampNs: sub.alreadyProcessedTsNs, | |
| 					}, | |
| 				}, | |
| 			}, | |
| 		}) | |
| 		if err != nil { | |
| 			return fmt.Errorf("create subscribe client: %v", err) | |
| 		} | |
| 		wg.Add(1) | |
| 		go func() { | |
| 			defer wg.Done() | |
| 			if sub.OnCompletionFunc != nil { | |
| 				defer sub.OnCompletionFunc() | |
| 			} | |
| 			defer func() { | |
| 				subscribeClient.SendMsg(&mq_pb.SubscribeRequest{ | |
| 					Message: &mq_pb.SubscribeRequest_Ack{ | |
| 						Ack: &mq_pb.SubscribeRequest_AckMessage{ | |
| 							Sequence: 0, | |
| 						}, | |
| 					}, | |
| 				}) | |
| 				subscribeClient.CloseSend() | |
| 			}() | |
| 			for { | |
| 				resp, err := subscribeClient.Recv() | |
| 				if err != nil { | |
| 					fmt.Printf("subscribe error: %v\n", err) | |
| 					return | |
| 				} | |
| 				if resp.Message == nil { | |
| 					continue | |
| 				} | |
| 				switch m := resp.Message.(type) { | |
| 				case *mq_pb.SubscribeResponse_Data: | |
| 					if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { | |
| 						return | |
| 					} | |
| 					sub.alreadyProcessedTsNs = m.Data.TsNs | |
| 				case *mq_pb.SubscribeResponse_Ctrl: | |
| 					if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { | |
| 						return | |
| 					} | |
| 				} | |
| 			} | |
| 		}() | |
| 	} | |
| 	wg.Wait() | |
| 	return nil | |
| }
 |