|
|
@ -3,13 +3,17 @@ package broker |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"encoding/binary" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"io" |
|
|
|
"strings" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" |
|
|
|
) |
|
|
|
|
|
|
@ -19,9 +23,14 @@ type BufferRange struct { |
|
|
|
end int64 |
|
|
|
} |
|
|
|
|
|
|
|
// ErrNoPartitionAssignment indicates no broker assignment found for the partition.
|
|
|
|
// This is a normal case that means there are no unflushed messages for this partition.
|
|
|
|
var ErrNoPartitionAssignment = errors.New("no broker assignment found for partition") |
|
|
|
|
|
|
|
// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
|
|
|
|
// that haven't been flushed to disk yet, using buffer_start metadata for deduplication
|
|
|
|
// Now supports streaming responses and buffer index filtering for better performance
|
|
|
|
// Includes broker routing to redirect requests to the correct broker hosting the topic/partition
|
|
|
|
func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { |
|
|
|
// Convert protobuf types to internal types
|
|
|
|
t := topic.FromPbTopic(req.Topic) |
|
|
@ -35,10 +44,36 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage |
|
|
|
b.accessLock.Unlock() |
|
|
|
|
|
|
|
if localPartition == nil { |
|
|
|
return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ |
|
|
|
Error: fmt.Sprintf("partition %v %v not found on this broker", t, partition), |
|
|
|
EndOfStream: true, |
|
|
|
}) |
|
|
|
// Topic/partition not found locally, attempt to find the correct broker and redirect
|
|
|
|
glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition) |
|
|
|
|
|
|
|
// Look up which broker hosts this topic/partition
|
|
|
|
brokerHost, err := b.findBrokerForTopicPartition(req.Topic, req.Partition) |
|
|
|
if err != nil { |
|
|
|
if errors.Is(err, ErrNoPartitionAssignment) { |
|
|
|
// Normal case: no broker assignment means no unflushed messages
|
|
|
|
glog.V(2).Infof("No broker assignment for %v %v - no unflushed messages", t, partition) |
|
|
|
return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ |
|
|
|
EndOfStream: true, |
|
|
|
}) |
|
|
|
} |
|
|
|
return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ |
|
|
|
Error: fmt.Sprintf("failed to find broker for %v %v: %v", t, partition, err), |
|
|
|
EndOfStream: true, |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
if brokerHost == "" { |
|
|
|
// This should not happen after ErrNoPartitionAssignment check, but keep for safety
|
|
|
|
glog.V(2).Infof("Empty broker host for %v %v - no unflushed messages", t, partition) |
|
|
|
return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ |
|
|
|
EndOfStream: true, |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
// Redirect to the correct broker
|
|
|
|
glog.V(1).Infof("Redirecting GetUnflushedMessages request for %v %v to broker %s", t, partition, brokerHost) |
|
|
|
return b.redirectGetUnflushedMessages(brokerHost, req, stream) |
|
|
|
} |
|
|
|
|
|
|
|
// Build deduplication map from existing log files using buffer_start metadata
|
|
|
@ -237,3 +272,87 @@ func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRang |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// findBrokerForTopicPartition finds which broker hosts the specified topic/partition
|
|
|
|
func (b *MessageQueueBroker) findBrokerForTopicPartition(topic *schema_pb.Topic, partition *schema_pb.Partition) (string, error) { |
|
|
|
// Use LookupTopicBrokers to find which broker hosts this topic/partition
|
|
|
|
ctx := context.Background() |
|
|
|
lookupReq := &mq_pb.LookupTopicBrokersRequest{ |
|
|
|
Topic: topic, |
|
|
|
} |
|
|
|
|
|
|
|
// If we're not the lock owner (balancer), we need to redirect to the balancer first
|
|
|
|
var lookupResp *mq_pb.LookupTopicBrokersResponse |
|
|
|
var err error |
|
|
|
|
|
|
|
if !b.isLockOwner() { |
|
|
|
// Redirect to balancer to get topic broker assignments
|
|
|
|
balancerAddress := pb.ServerAddress(b.lockAsBalancer.LockOwner()) |
|
|
|
err = b.withBrokerClient(false, balancerAddress, func(client mq_pb.SeaweedMessagingClient) error { |
|
|
|
lookupResp, err = client.LookupTopicBrokers(ctx, lookupReq) |
|
|
|
return err |
|
|
|
}) |
|
|
|
} else { |
|
|
|
// We are the balancer, handle the lookup directly
|
|
|
|
lookupResp, err = b.LookupTopicBrokers(ctx, lookupReq) |
|
|
|
} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return "", fmt.Errorf("failed to lookup topic brokers: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Find the broker assignment that matches our partition
|
|
|
|
for _, assignment := range lookupResp.BrokerPartitionAssignments { |
|
|
|
if b.partitionsMatch(partition, assignment.Partition) { |
|
|
|
if assignment.LeaderBroker != "" { |
|
|
|
return assignment.LeaderBroker, nil |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return "", ErrNoPartitionAssignment |
|
|
|
} |
|
|
|
|
|
|
|
// partitionsMatch checks if two partitions represent the same partition
|
|
|
|
func (b *MessageQueueBroker) partitionsMatch(p1, p2 *schema_pb.Partition) bool { |
|
|
|
return p1.RingSize == p2.RingSize && |
|
|
|
p1.RangeStart == p2.RangeStart && |
|
|
|
p1.RangeStop == p2.RangeStop && |
|
|
|
p1.UnixTimeNs == p2.UnixTimeNs |
|
|
|
} |
|
|
|
|
|
|
|
// redirectGetUnflushedMessages forwards the GetUnflushedMessages request to the correct broker
|
|
|
|
func (b *MessageQueueBroker) redirectGetUnflushedMessages(brokerHost string, req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { |
|
|
|
ctx := stream.Context() |
|
|
|
|
|
|
|
// Connect to the target broker and forward the request
|
|
|
|
return b.withBrokerClient(false, pb.ServerAddress(brokerHost), func(client mq_pb.SeaweedMessagingClient) error { |
|
|
|
// Create a new stream to the target broker
|
|
|
|
targetStream, err := client.GetUnflushedMessages(ctx, req) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to create stream to broker %s: %v", brokerHost, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Forward all responses from the target broker to our client
|
|
|
|
for { |
|
|
|
response, err := targetStream.Recv() |
|
|
|
if err != nil { |
|
|
|
if errors.Is(err, io.EOF) { |
|
|
|
// Normal end of stream
|
|
|
|
return nil |
|
|
|
} |
|
|
|
return fmt.Errorf("error receiving from broker %s: %v", brokerHost, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Forward the response to our client
|
|
|
|
if sendErr := stream.Send(response); sendErr != nil { |
|
|
|
return fmt.Errorf("error forwarding response to client: %v", sendErr) |
|
|
|
} |
|
|
|
|
|
|
|
// Check if this is the end of stream
|
|
|
|
if response.EndOfStream { |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
} |