Browse Source
			
			
			Revert "Revert "Merge branch 'master' into sub""
			
				
		Revert "Revert "Merge branch 'master' into sub""
	
		
	
			
				This reverts commit 0bb97709d4.
			
			
				pull/4857/head
			
			
		
				 21 changed files with 290 additions and 199 deletions
			
			
		- 
					12.github/workflows/container_dev.yml
- 
					12.github/workflows/container_latest.yml
- 
					10.github/workflows/container_release1.yml
- 
					10.github/workflows/container_release2.yml
- 
					10.github/workflows/container_release3.yml
- 
					10.github/workflows/container_release4.yml
- 
					10.github/workflows/container_release5.yml
- 
					39go.mod
- 
					73go.sum
- 
					2weed/cluster/lock_manager/lock_manager.go
- 
					7weed/mount/dirty_pages_chunked.go
- 
					20weed/mq/balancer/allocate.go
- 
					62weed/mq/balancer/allocate_test.go
- 
					59weed/mq/balancer/balancer.go
- 
					43weed/mq/balancer/lookup.go
- 
					4weed/mq/broker/broker_grpc_admin.go
- 
					81weed/mq/topic_allocation/allocation.go
- 
					7weed/server/filer_server.go
- 
					12weed/server/filer_server_handlers.go
- 
					2weed/server/filer_server_handlers_write.go
- 
					4weed/storage/store_ec_delete.go
| @ -0,0 +1,20 @@ | |||
| package balancer | |||
| 
 | |||
| import ( | |||
| 	cmap "github.com/orcaman/concurrent-map/v2" | |||
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |||
| ) | |||
| 
 | |||
| func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) { | |||
| 	return []*mq_pb.BrokerPartitionAssignment{ | |||
| 		{ | |||
| 			LeaderBroker:    "localhost:17777", | |||
| 			FollowerBrokers: []string{"localhost:17777"}, | |||
| 			Partition: &mq_pb.Partition{ | |||
| 				RingSize:   MaxPartitionCount, | |||
| 				RangeStart: 0, | |||
| 				RangeStop:  MaxPartitionCount, | |||
| 			}, | |||
| 		}, | |||
| 	} | |||
| } | |||
| @ -0,0 +1,62 @@ | |||
| package balancer | |||
| 
 | |||
| import ( | |||
| 	cmap "github.com/orcaman/concurrent-map/v2" | |||
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |||
| 	"reflect" | |||
| 	"testing" | |||
| ) | |||
| 
 | |||
| func Test_allocateOneBroker(t *testing.T) { | |||
| 	brokers := cmap.New[*BrokerStats]() | |||
| 	brokers.SetIfAbsent("localhost:17777", &BrokerStats{ | |||
| 		TopicPartitionCount: 0, | |||
| 		ConsumerCount:       0, | |||
| 		CpuUsagePercent:     0, | |||
| 	}) | |||
| 
 | |||
| 	tests := []struct { | |||
| 		name            string | |||
| 		args            args | |||
| 		wantAssignments []*mq_pb.BrokerPartitionAssignment | |||
| 	}{ | |||
| 		{ | |||
| 			name: "test only one broker", | |||
| 			args: args{ | |||
| 				brokers:        brokers, | |||
| 				partitionCount: 6, | |||
| 			}, | |||
| 			wantAssignments: []*mq_pb.BrokerPartitionAssignment{ | |||
| 				{ | |||
| 					LeaderBroker:    "localhost:17777", | |||
| 					FollowerBrokers: []string{"localhost:17777"}, | |||
| 					Partition: &mq_pb.Partition{ | |||
| 						RingSize:   MaxPartitionCount, | |||
| 						RangeStart: 0, | |||
| 						RangeStop:  MaxPartitionCount, | |||
| 					}, | |||
| 				}, | |||
| 			}, | |||
| 		}, | |||
| 	} | |||
| 	testThem(t, tests) | |||
| } | |||
| 
 | |||
| type args struct { | |||
| 	brokers        cmap.ConcurrentMap[string, *BrokerStats] | |||
| 	partitionCount int | |||
| } | |||
| 
 | |||
| func testThem(t *testing.T, tests []struct { | |||
| 	name            string | |||
| 	args            args | |||
| 	wantAssignments []*mq_pb.BrokerPartitionAssignment | |||
| }) { | |||
| 	for _, tt := range tests { | |||
| 		t.Run(tt.name, func(t *testing.T) { | |||
| 			if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { | |||
| 				t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) | |||
| 			} | |||
| 		}) | |||
| 	} | |||
| } | |||
| @ -0,0 +1,43 @@ | |||
| package balancer | |||
| 
 | |||
| import ( | |||
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |||
| ) | |||
| 
 | |||
| func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { | |||
| 	// find existing topic partition assignments
 | |||
| 	for brokerStatsItem := range b.Brokers.IterBuffered() { | |||
| 		broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val | |||
| 		for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { | |||
| 			topicPartitionStat := topicPartitionStatsItem.Val | |||
| 			if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && | |||
| 				topicPartitionStat.TopicPartition.Topic == topic.Name { | |||
| 				assignment := &mq_pb.BrokerPartitionAssignment{ | |||
| 					Partition: &mq_pb.Partition{ | |||
| 						RingSize:   MaxPartitionCount, | |||
| 						RangeStart: topicPartitionStat.RangeStart, | |||
| 						RangeStop:  topicPartitionStat.RangeStop, | |||
| 					}, | |||
| 				} | |||
| 				if topicPartitionStat.IsLeader { | |||
| 					assignment.LeaderBroker = broker | |||
| 				} else { | |||
| 					assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker) | |||
| 				} | |||
| 				assignments = append(assignments, assignment) | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 	if len(assignments) > 0 { | |||
| 		return assignments, nil | |||
| 	} | |||
| 
 | |||
| 	// find the topic partitions on the filer
 | |||
| 	// if the topic is not found
 | |||
| 	//   if the request is_for_publish
 | |||
| 	//     create the topic
 | |||
| 	//   if the request is_for_subscribe
 | |||
| 	//     return error not found
 | |||
| 	// t := topic.FromPbTopic(request.Topic)
 | |||
| 	return allocateTopicPartitions(b.Brokers, 6), nil | |||
| } | |||
| @ -1,81 +0,0 @@ | |||
| package topic_allocation | |||
| 
 | |||
| import ( | |||
| 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | |||
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |||
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |||
| 	"modernc.org/mathutil" | |||
| ) | |||
| 
 | |||
| const ( | |||
| 	DefaultBrokerCount = 4 | |||
| ) | |||
| 
 | |||
| // AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
 | |||
| func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { | |||
| 	// create a previous assignment if not exists
 | |||
| 	if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { | |||
| 		prevAssignment = &mq_pb.TopicPartitionsAssignment{ | |||
| 			PartitionCount: topic.PartitionCount, | |||
| 		} | |||
| 		partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount | |||
| 		for i := 0; i < DefaultBrokerCount; i++ { | |||
| 			prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ | |||
| 				PartitionStart: int32(i * partitionCountForEachBroker), | |||
| 				PartitionStop:  mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), | |||
| 			}) | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	// create a new assignment
 | |||
| 	assignment = &mq_pb.TopicPartitionsAssignment{ | |||
| 		PartitionCount: prevAssignment.PartitionCount, | |||
| 	} | |||
| 
 | |||
| 	// allocate partitions for each partition range
 | |||
| 	for _, brokerPartition := range prevAssignment.BrokerPartitions { | |||
| 		// allocate partitions for each partition range
 | |||
| 		leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) | |||
| 		if err != nil { | |||
| 			return nil, err | |||
| 		} | |||
| 
 | |||
| 		followerBrokers := make([]string, len(followers)) | |||
| 		for i, follower := range followers { | |||
| 			followerBrokers[i] = string(follower) | |||
| 		} | |||
| 
 | |||
| 		assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ | |||
| 			PartitionStart:  brokerPartition.PartitionStart, | |||
| 			PartitionStop:   brokerPartition.PartitionStop, | |||
| 			LeaderBroker:    string(leader), | |||
| 			FollowerBrokers: followerBrokers, | |||
| 		}) | |||
| 	} | |||
| 
 | |||
| 	return | |||
| } | |||
| 
 | |||
| func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { | |||
| 	// allocate leader
 | |||
| 	leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) | |||
| 	if err != nil { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	// allocate followers
 | |||
| 	followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) | |||
| 	if err != nil { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	return | |||
| } | |||
| 
 | |||
| func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { | |||
| 	return | |||
| } | |||
| 
 | |||
| func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { | |||
| 	return | |||
| } | |||
						Write
						Preview
					
					
					Loading…
					
					Cancel
						Save
					
		Reference in new issue