Browse Source

ask follower to follow

pull/5637/head
chrislu 10 months ago
parent
commit
6f75a0af55
  1. 30
      weed/mq/broker/broker_grpc_pub.go
  2. 78
      weed/mq/broker/broker_grpc_pub_follow.go
  3. 180
      weed/mq/broker/broker_grpc_sub.go
  4. 3
      weed/mq/broker/broker_topic_partition_read_write.go
  5. 18
      weed/mq/topic/local_partition.go
  6. 1
      weed/mq/topic/local_topic.go
  7. 39
      weed/pb/mq.proto
  8. 1178
      weed/pb/mq_pb/mq.pb.go
  9. 103
      weed/pb/mq_pb/mq_grpc.pb.go
  10. 1
      weed/util/log_buffer/log_buffer.go
  11. 7
      weed/util/log_buffer/log_read.go

30
weed/mq/broker/broker_grpc_pub.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
"io" "io"
@ -59,6 +60,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response) return stream.Send(response)
} }
ackInterval = int(initMessage.AckInterval) ackInterval = int(initMessage.AckInterval)
for _, follower := range initMessage.FollowerBrokers {
followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
Topic: initMessage.Topic,
Partition: initMessage.Partition,
BrokerSelf: string(b.option.BrokerAddress()),
})
return err
})
if followErr != nil {
response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
glog.Errorf("follower %v failed: %v", follower, followErr)
return stream.Send(response)
}
}
stream.Send(response) stream.Send(response)
} else { } else {
response.Error = fmt.Sprintf("missing init message") response.Error = fmt.Sprintf("missing init message")
@ -86,22 +102,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence) glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence)
}() }()
go func() { go func() {
for {
select {
case resp := <-respChan:
if resp != nil {
for resp := range respChan {
if err := stream.Send(resp); err != nil { if err := stream.Send(resp); err != nil {
glog.Errorf("Error sending response %v: %v", resp, err) glog.Errorf("Error sending response %v: %v", resp, err)
} }
} else {
return
}
case <-localTopicPartition.StopPublishersCh:
respChan <- &mq_pb.PublishMessageResponse{
AckSequence: ackSequence,
ShouldClose: true,
}
}
} }
}() }()

78
weed/mq/broker/broker_grpc_pub_follow.go

@ -0,0 +1,78 @@
package broker
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"math/rand"
"time"
)
func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
glog.V(0).Infof("PublishFollowMe %v", request)
go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
followerId := rand.Int31()
subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
ConsumerGroup: string(b.option.BrokerAddress()),
ConsumerId: fmt.Sprintf("followMe-%d", followerId),
FollowerId: followerId,
Topic: request.Topic,
PartitionOffset: &mq_pb.PartitionOffset{
Partition: request.Partition,
StartTsNs: 0,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
},
},
})
if err != nil {
glog.Errorf("FollowInMemoryMessages error: %v", err)
return err
}
b.doFollowInMemoryMessage(context.Background(), subscribeClient)
return nil
})
return &mq_pb.PublishFollowMeResponse{}, nil
}
func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
for {
resp, err := client.Recv()
if err != nil {
if err != io.EOF {
glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
}
return
}
if resp == nil {
glog.V(0).Infof("doFollowInMemoryMessage nil response")
return
}
if resp.Message != nil {
// process ctrl message or data message
switch m:= resp.Message.(type) {
case *mq_pb.FollowInMemoryMessagesResponse_Data:
// process data message
print("d")
case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
// process ctrl message
if m.Ctrl.FlushedSequence > 0 {
flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
}
if m.Ctrl.FollowerChangedToId != 0 {
// follower changed
glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
return
}
}
}
}
}

180
weed/mq/broker/broker_grpc_sub.go

@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
"time" "time"
) )
@ -69,7 +70,55 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var startPosition log_buffer.MessagePosition var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
offset := req.GetInit().GetPartitionOffset()
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
if !isConnected {
return false
}
sleepIntervalCount++
if sleepIntervalCount > 32 {
sleepIntervalCount = 32
}
time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return false
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
}
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
// reset the sleep interval count
sleepIntervalCount = 0
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{
Key: logEntry.Key,
Value: logEntry.Data,
TsNs: logEntry.TsNs,
},
}}); err != nil {
glog.Errorf("Error sending data: %v", err)
return false, err
}
counter++
return false, nil
})
}
func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
if offset.StartTsNs != 0 { if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
} }
@ -78,17 +127,92 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
} }
return
}
func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
ctx := stream.Context()
clientName := req.GetInit().ConsumerId
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
waitIntervalCount := 0
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
if localTopicPartition != nil {
break
}
waitIntervalCount++
if waitIntervalCount > 32 {
waitIntervalCount = 32
}
time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return nil
}
glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
return nil
default:
// Continue processing the request
}
} }
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
// set the current follower id
followerId := req.GetInit().FollowerId
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
isConnected := true
sleepIntervalCount := 0
var counter int64
defer func() {
isConnected = false
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
}()
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
var prevFlushTsNs int64
_,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
if !isConnected { if !isConnected {
return false return false
} }
sleepIntervalCount++ sleepIntervalCount++
if sleepIntervalCount > 10 {
sleepIntervalCount = 10
if sleepIntervalCount > 32 {
sleepIntervalCount = 32
}
time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
if localTopicPartition.LogBuffer.IsStopping() {
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FollowerChangedToId: newFollowerId,
},
},
})
return false
} }
time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context // Check if the client has disconnected by monitoring the context
select { select {
@ -104,12 +228,54 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
// Continue processing the request // Continue processing the request
} }
// send the last flushed sequence
flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
if flushTsNs != prevFlushTsNs {
prevFlushTsNs = flushTsNs
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FlushedSequence: flushTsNs,
},
},
})
}
return true return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) { }, func(logEntry *filer_pb.LogEntry) (bool, error) {
// reset the sleep interval count // reset the sleep interval count
sleepIntervalCount = 0 sleepIntervalCount = 0
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
// check the follower id
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
if newFollowerId != followerId {
glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FollowerChangedToId: newFollowerId,
},
},
})
return true, nil
}
// send the last flushed sequence
flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
if flushTsNs != prevFlushTsNs {
prevFlushTsNs = flushTsNs
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FlushedSequence: flushTsNs,
},
},
})
}
// send the log entry
if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
Data: &mq_pb.DataMessage{ Data: &mq_pb.DataMessage{
Key: logEntry.Key, Key: logEntry.Key,
Value: logEntry.Data, Value: logEntry.Data,
@ -123,4 +289,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
counter++ counter++
return false, nil return false, nil
}) })
return err
} }

3
weed/mq/broker/broker_topic_partition_read_write.go

@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"math" "math"
"sync/atomic"
"time" "time"
) )
@ -38,6 +39,8 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
break break
} }
} }
atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
} }
} }

18
weed/mq/topic/local_partition.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
"time" "time"
) )
@ -15,10 +16,9 @@ type LocalPartition struct {
FollowerBrokers []pb.ServerAddress FollowerBrokers []pb.ServerAddress
LogBuffer *log_buffer.LogBuffer LogBuffer *log_buffer.LogBuffer
ConsumerCount int32 ConsumerCount int32
StopPublishersCh chan struct{}
Publishers *LocalPartitionPublishers Publishers *LocalPartitionPublishers
StopSubscribersCh chan struct{}
Subscribers *LocalPartitionSubscribers Subscribers *LocalPartitionSubscribers
FollowerId int32
} }
var TIME_FORMAT = "2006-01-02-15-04-05" var TIME_FORMAT = "2006-01-02-15-04-05"
@ -58,6 +58,9 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
startPosition = processedPosition startPosition = processedPosition
processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn) processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
if isDone {
return nil
}
startPosition = processedPosition startPosition = processedPosition
if readInMemoryLogErr == log_buffer.ResumeFromDiskError { if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
@ -67,9 +70,6 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
return readInMemoryLogErr return readInMemoryLogErr
} }
if isDone {
return nil
}
} }
} }
@ -96,7 +96,6 @@ func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition,
func (p *LocalPartition) closePublishers() { func (p *LocalPartition) closePublishers() {
p.Publishers.SignalShutdown() p.Publishers.SignalShutdown()
close(p.StopPublishersCh)
} }
func (p *LocalPartition) closeSubscribers() { func (p *LocalPartition) closeSubscribers() {
p.Subscribers.SignalShutdown() p.Subscribers.SignalShutdown()
@ -118,3 +117,10 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
} }
return return
} }
func (p *LocalPartition) Shutdown() {
p.closePublishers()
p.closeSubscribers()
p.LogBuffer.ShutdownLogBuffer()
atomic.StoreInt32(&p.FollowerId, 0)
}

1
weed/mq/topic/local_topic.go

@ -27,6 +27,7 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
for i, localPartition := range localTopic.Partitions { for i, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) { if localPartition.Partition.Equals(partition) {
foundPartitionIndex = i foundPartitionIndex = i
localPartition.Shutdown()
break break
} }
} }

39
weed/pb/mq.proto

@ -45,6 +45,11 @@ service SeaweedMessaging {
} }
rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) { rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) {
} }
// The lead broker asks a follower broker to follow itself
rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) {
}
rpc FollowInMemoryMessages (FollowInMemoryMessagesRequest) returns (stream FollowInMemoryMessagesResponse) {
}
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -205,6 +210,14 @@ message PublishMessageResponse {
string error = 2; string error = 2;
bool should_close = 3; bool should_close = 3;
} }
message PublishFollowMeRequest {
Topic topic = 1;
Partition partition = 2;
string broker_self = 3;
}
message PublishFollowMeResponse {
string error = 1;
}
message SubscribeMessageRequest { message SubscribeMessageRequest {
message InitMessage { message InitMessage {
string consumer_group = 1; string consumer_group = 1;
@ -233,6 +246,32 @@ message SubscribeMessageResponse {
DataMessage data = 2; DataMessage data = 2;
} }
} }
message FollowInMemoryMessagesRequest {
message InitMessage {
string consumer_group = 1;
string consumer_id = 2;
int32 follower_id = 3;
Topic topic = 4;
PartitionOffset partition_offset = 5;
}
message AckMessage {
int64 sequence = 1;
}
oneof message {
InitMessage init = 1;
AckMessage ack = 2;
}
}
message FollowInMemoryMessagesResponse {
message CtrlMessage {
int64 flushed_sequence = 1;
int32 follower_changed_to_id = 2;
}
oneof message {
CtrlMessage ctrl = 1;
DataMessage data = 2;
}
}
message ClosePublishersRequest { message ClosePublishersRequest {
Topic topic = 1; Topic topic = 1;
int64 unix_time_ns = 2; int64 unix_time_ns = 2;

1178
weed/pb/mq_pb/mq.pb.go
File diff suppressed because it is too large
View File

103
weed/pb/mq_pb/mq_grpc.pb.go

@ -31,6 +31,8 @@ const (
SeaweedMessaging_SubscriberToSubCoordinator_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscriberToSubCoordinator" SeaweedMessaging_SubscriberToSubCoordinator_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscriberToSubCoordinator"
SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage" SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage"
SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage" SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage"
SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe"
SeaweedMessaging_FollowInMemoryMessages_FullMethodName = "/messaging_pb.SeaweedMessaging/FollowInMemoryMessages"
) )
// SeaweedMessagingClient is the client API for SeaweedMessaging service. // SeaweedMessagingClient is the client API for SeaweedMessaging service.
@ -55,6 +57,9 @@ type SeaweedMessagingClient interface {
// data plane for each topic partition // data plane for each topic partition
PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error) PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error)
SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error)
// The lead broker asks a follower broker to follow itself
PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error)
FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error)
} }
type seaweedMessagingClient struct { type seaweedMessagingClient struct {
@ -262,6 +267,47 @@ func (x *seaweedMessagingSubscribeMessageClient) Recv() (*SubscribeMessageRespon
return m, nil return m, nil
} }
func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error) {
out := new(PublishFollowMeResponse)
err := c.cc.Invoke(ctx, SeaweedMessaging_PublishFollowMe_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedMessagingClient) FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[4], SeaweedMessaging_FollowInMemoryMessages_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &seaweedMessagingFollowInMemoryMessagesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedMessaging_FollowInMemoryMessagesClient interface {
Recv() (*FollowInMemoryMessagesResponse, error)
grpc.ClientStream
}
type seaweedMessagingFollowInMemoryMessagesClient struct {
grpc.ClientStream
}
func (x *seaweedMessagingFollowInMemoryMessagesClient) Recv() (*FollowInMemoryMessagesResponse, error) {
m := new(FollowInMemoryMessagesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// SeaweedMessagingServer is the server API for SeaweedMessaging service. // SeaweedMessagingServer is the server API for SeaweedMessaging service.
// All implementations must embed UnimplementedSeaweedMessagingServer // All implementations must embed UnimplementedSeaweedMessagingServer
// for forward compatibility // for forward compatibility
@ -284,6 +330,9 @@ type SeaweedMessagingServer interface {
// data plane for each topic partition // data plane for each topic partition
PublishMessage(SeaweedMessaging_PublishMessageServer) error PublishMessage(SeaweedMessaging_PublishMessageServer) error
SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error
// The lead broker asks a follower broker to follow itself
PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error)
FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error
mustEmbedUnimplementedSeaweedMessagingServer() mustEmbedUnimplementedSeaweedMessagingServer()
} }
@ -327,6 +376,12 @@ func (UnimplementedSeaweedMessagingServer) PublishMessage(SeaweedMessaging_Publi
func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error { func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeMessage not implemented") return status.Errorf(codes.Unimplemented, "method SubscribeMessage not implemented")
} }
func (UnimplementedSeaweedMessagingServer) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented")
}
func (UnimplementedSeaweedMessagingServer) FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error {
return status.Errorf(codes.Unimplemented, "method FollowInMemoryMessages not implemented")
}
func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {}
// UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service. // UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service.
@ -583,6 +638,45 @@ func (x *seaweedMessagingSubscribeMessageServer) Send(m *SubscribeMessageRespons
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishFollowMeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: SeaweedMessaging_PublishFollowMe_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, req.(*PublishFollowMeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedMessaging_FollowInMemoryMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(FollowInMemoryMessagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedMessagingServer).FollowInMemoryMessages(m, &seaweedMessagingFollowInMemoryMessagesServer{stream})
}
type SeaweedMessaging_FollowInMemoryMessagesServer interface {
Send(*FollowInMemoryMessagesResponse) error
grpc.ServerStream
}
type seaweedMessagingFollowInMemoryMessagesServer struct {
grpc.ServerStream
}
func (x *seaweedMessagingFollowInMemoryMessagesServer) Send(m *FollowInMemoryMessagesResponse) error {
return x.ServerStream.SendMsg(m)
}
// SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -622,6 +716,10 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
MethodName: "CloseSubscribers", MethodName: "CloseSubscribers",
Handler: _SeaweedMessaging_CloseSubscribers_Handler, Handler: _SeaweedMessaging_CloseSubscribers_Handler,
}, },
{
MethodName: "PublishFollowMe",
Handler: _SeaweedMessaging_PublishFollowMe_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
@ -647,6 +745,11 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
Handler: _SeaweedMessaging_SubscribeMessage_Handler, Handler: _SeaweedMessaging_SubscribeMessage_Handler,
ServerStreams: true, ServerStreams: true,
}, },
{
StreamName: "FollowInMemoryMessages",
Handler: _SeaweedMessaging_FollowInMemoryMessages_Handler,
ServerStreams: true,
},
}, },
Metadata: "mq.proto", Metadata: "mq.proto",
} }

1
weed/util/log_buffer/log_buffer.go

@ -27,6 +27,7 @@ type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time,
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct { type LogBuffer struct {
LastFlushTsNs int64
name string name string
prevBuffers *SealedBuffers prevBuffers *SealedBuffers
buf []byte buf []byte

7
weed/util/log_buffer/log_read.go

@ -57,7 +57,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if bytesBuf != nil { if bytesBuf != nil {
readSize = bytesBuf.Len() readSize = bytesBuf.Len()
} }
glog.V(0).Infof("%s ReadFromBuffer at %v batch:%d, read bytes:%v batch:%d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
glog.V(0).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
if bytesBuf == nil { if bytesBuf == nil {
if batchIndex >= 0 { if batchIndex >= 0 {
lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
@ -72,6 +72,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
isDone = true isDone = true
return return
} }
if logBuffer.IsStopping() {
isDone = true
return
}
} }
buf := bytesBuf.Bytes() buf := bytesBuf.Bytes()
@ -107,6 +111,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
return return
} }
if isDone { if isDone {
glog.V(0).Infof("LoopProcessLogData2: %s process log entry %d", readerName, batchSize+1)
return return
} }

Loading…
Cancel
Save