|
|
@ -2,7 +2,6 @@ package sub_coordinator |
|
|
|
|
|
|
|
import ( |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
|
"sync" |
|
|
@ -147,12 +146,12 @@ func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) { |
|
|
|
if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists { |
|
|
|
consumerLoad += inflightAdjustments |
|
|
|
} |
|
|
|
fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) |
|
|
|
// fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
|
|
|
|
if consumerLoad < int(consumer.MaxPartitionCount) { |
|
|
|
if consumerLoad < minLoad { |
|
|
|
bestConsumer = consumer |
|
|
|
minLoad = consumerLoad |
|
|
|
fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) |
|
|
|
// fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -188,7 +187,7 @@ func (m *Market) loopBalanceLoad() { |
|
|
|
m.inflightAdjustments = inflightAdjustments |
|
|
|
|
|
|
|
m.doBalanceLoad() |
|
|
|
println("Balance load completed.") |
|
|
|
// println("Balance load completed.")
|
|
|
|
m.Status() |
|
|
|
} |
|
|
|
case <-m.balanceRequestChan: |
|
|
@ -279,7 +278,7 @@ func (m *Market) adjustUnassignedPartitions() { |
|
|
|
if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists { |
|
|
|
continue |
|
|
|
} |
|
|
|
fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition) |
|
|
|
// fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition)
|
|
|
|
m.assignPartitionToConsumer(partitionSlot) |
|
|
|
} |
|
|
|
} |
|
|
@ -291,7 +290,7 @@ func (m *Market) ConfirmAdjustment(adjustment *Adjustment) { |
|
|
|
} else { |
|
|
|
m.unassignPartitionSlot(adjustment.partition) |
|
|
|
} |
|
|
|
glog.V(0).Infof("ConfirmAdjustment %+v", adjustment) |
|
|
|
glog.V(1).Infof("ConfirmAdjustment %+v", adjustment) |
|
|
|
m.Status() |
|
|
|
} |
|
|
|
|
|
|
@ -354,15 +353,15 @@ func (m *Market) Status() { |
|
|
|
m.mu.Lock() |
|
|
|
defer m.mu.Unlock() |
|
|
|
|
|
|
|
glog.V(0).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances)) |
|
|
|
glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances)) |
|
|
|
for partition, slot := range m.partitions { |
|
|
|
if slot.AssignedTo == nil { |
|
|
|
glog.V(0).Infof("Partition %+v is not assigned to any consumer", partition) |
|
|
|
glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition) |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId) |
|
|
|
glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId) |
|
|
|
} |
|
|
|
} |
|
|
|
for _, consumer := range m.consumerInstances { |
|
|
|
glog.V(0).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions)) |
|
|
|
glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions)) |
|
|
|
} |
|
|
|
} |