Browse Source

remove FollowInMemoryMessages

mq-subscribe
chrislu 9 months ago
parent
commit
516cba7053
  1. 88
      weed/mq/broker/broker_grpc_pub_follow.go
  2. 165
      weed/mq/broker/broker_grpc_sub.go
  3. 28
      weed/pb/mq.proto
  4. 923
      weed/pb/mq_pb/mq.pb.go
  5. 64
      weed/pb/mq_pb/mq_grpc.pb.go

88
weed/mq/broker/broker_grpc_pub_follow.go

@ -2,97 +2,11 @@ package broker
import ( import (
"context" "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"math/rand"
"sync"
"time"
) )
func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) { func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) {
glog.V(0).Infof("PublishFollowMe %v", request) glog.V(0).Infof("PublishFollowMe %v", request)
var wg sync.WaitGroup
wg.Add(1)
var ret error
go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
followerId := rand.Int31()
subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
ConsumerGroup: string(b.option.BrokerAddress()),
ConsumerId: fmt.Sprintf("followMe@%s-%d", b.option.BrokerAddress(), followerId),
FollowerId: followerId,
Topic: request.Topic,
PartitionOffset: &mq_pb.PartitionOffset{
Partition: request.Partition,
StartTsNs: 0,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
},
},
})
if err != nil {
glog.Errorf("FollowInMemoryMessages error: %v", err)
ret = err
return err
}
// receive first hello message
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
}
if resp == nil {
glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
return io.ErrUnexpectedEOF
}
wg.Done()
b.doFollowInMemoryMessage(context.Background(), followerId, subscribeClient)
return nil
})
wg.Wait()
return &mq_pb.PublishFollowMeResponse{}, ret
}
func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, followerId int32, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
for {
resp, err := client.Recv()
if err != nil {
if err != io.EOF {
glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
}
return
}
if resp == nil {
glog.V(0).Infof("doFollowInMemoryMessage nil response")
return
}
if resp.Message != nil {
// process ctrl message or data message
switch m := resp.Message.(type) {
case *mq_pb.FollowInMemoryMessagesResponse_Data:
// process data message
print("d")
case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
// process ctrl message
if m.Ctrl.FlushedSequence > 0 {
flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
}
if m.Ctrl.FollowerChangedToId != 0 {
// follower changed
glog.V(0).Infof("doFollowInMemoryMessage follower changed from %d to %d", followerId, m.Ctrl.FollowerChangedToId)
return
}
default:
glog.V(0).Infof("doFollowInMemoryMessage unknown message type: %v", m)
}
}
}
return &mq_pb.PublishFollowMeResponse{}, nil
} }

165
weed/mq/broker/broker_grpc_sub.go

@ -8,7 +8,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
"time" "time"
) )
@ -129,167 +128,3 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
} }
return return
} }
func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
ctx := stream.Context()
clientName := req.GetInit().ConsumerId
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
waitIntervalCount := 0
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
if localTopicPartition != nil {
break
}
waitIntervalCount++
if waitIntervalCount > 32 {
waitIntervalCount = 32
}
time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return nil
}
glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
return nil
default:
// Continue processing the request
}
}
// set the current follower id
followerId := req.GetInit().FollowerId
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
var counter int64
defer func() {
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
}()
// send first hello message
// to indicate the follower is connected
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
},
})
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
var prevFlushTsNs int64
_, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
// wait for the log buffer to be ready
localTopicPartition.ListenersLock.Lock()
atomic.AddInt64(&localTopicPartition.ListenersWaits, 1)
localTopicPartition.ListenersCond.Wait()
atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
localTopicPartition.ListenersLock.Unlock()
if localTopicPartition.LogBuffer.IsStopping() {
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FollowerChangedToId: newFollowerId,
},
},
})
return false
}
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return false
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
}
// send the last flushed sequence
flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
if flushTsNs != prevFlushTsNs {
prevFlushTsNs = flushTsNs
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FlushedSequence: flushTsNs,
},
},
})
}
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
// check the follower id
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
if newFollowerId != followerId {
glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FollowerChangedToId: newFollowerId,
},
},
})
return true, nil
}
// send the last flushed sequence
flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
if flushTsNs != prevFlushTsNs {
prevFlushTsNs = flushTsNs
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
FlushedSequence: flushTsNs,
},
},
})
}
// send the log entry
if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
Data: &mq_pb.DataMessage{
Key: logEntry.Key,
Value: logEntry.Data,
TsNs: logEntry.TsNs,
},
}}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return false, err
}
counter++
return false, nil
})
return err
}

28
weed/pb/mq.proto

@ -48,8 +48,6 @@ service SeaweedMessaging {
// The lead broker asks a follower broker to follow itself // The lead broker asks a follower broker to follow itself
rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) { rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) {
} }
rpc FollowInMemoryMessages (FollowInMemoryMessagesRequest) returns (stream FollowInMemoryMessagesResponse) {
}
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -246,32 +244,6 @@ message SubscribeMessageResponse {
DataMessage data = 2; DataMessage data = 2;
} }
} }
message FollowInMemoryMessagesRequest {
message InitMessage {
string consumer_group = 1;
string consumer_id = 2;
int32 follower_id = 3;
Topic topic = 4;
PartitionOffset partition_offset = 5;
}
message AckMessage {
int64 sequence = 1;
}
oneof message {
InitMessage init = 1;
AckMessage ack = 2;
}
}
message FollowInMemoryMessagesResponse {
message CtrlMessage {
int64 flushed_sequence = 1;
int32 follower_changed_to_id = 2;
}
oneof message {
CtrlMessage ctrl = 1;
DataMessage data = 2;
}
}
message ClosePublishersRequest { message ClosePublishersRequest {
Topic topic = 1; Topic topic = 1;
int64 unix_time_ns = 2; int64 unix_time_ns = 2;

923
weed/pb/mq_pb/mq.pb.go
File diff suppressed because it is too large
View File

64
weed/pb/mq_pb/mq_grpc.pb.go

@ -32,7 +32,6 @@ const (
SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage" SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage"
SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage" SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage"
SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe" SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe"
SeaweedMessaging_FollowInMemoryMessages_FullMethodName = "/messaging_pb.SeaweedMessaging/FollowInMemoryMessages"
) )
// SeaweedMessagingClient is the client API for SeaweedMessaging service. // SeaweedMessagingClient is the client API for SeaweedMessaging service.
@ -59,7 +58,6 @@ type SeaweedMessagingClient interface {
SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error) SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error)
// The lead broker asks a follower broker to follow itself // The lead broker asks a follower broker to follow itself
PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error) PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error)
FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error)
} }
type seaweedMessagingClient struct { type seaweedMessagingClient struct {
@ -276,38 +274,6 @@ func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, in *Publis
return out, nil return out, nil
} }
func (c *seaweedMessagingClient) FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[4], SeaweedMessaging_FollowInMemoryMessages_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &seaweedMessagingFollowInMemoryMessagesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedMessaging_FollowInMemoryMessagesClient interface {
Recv() (*FollowInMemoryMessagesResponse, error)
grpc.ClientStream
}
type seaweedMessagingFollowInMemoryMessagesClient struct {
grpc.ClientStream
}
func (x *seaweedMessagingFollowInMemoryMessagesClient) Recv() (*FollowInMemoryMessagesResponse, error) {
m := new(FollowInMemoryMessagesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// SeaweedMessagingServer is the server API for SeaweedMessaging service. // SeaweedMessagingServer is the server API for SeaweedMessaging service.
// All implementations must embed UnimplementedSeaweedMessagingServer // All implementations must embed UnimplementedSeaweedMessagingServer
// for forward compatibility // for forward compatibility
@ -332,7 +298,6 @@ type SeaweedMessagingServer interface {
SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error
// The lead broker asks a follower broker to follow itself // The lead broker asks a follower broker to follow itself
PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error)
FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error
mustEmbedUnimplementedSeaweedMessagingServer() mustEmbedUnimplementedSeaweedMessagingServer()
} }
@ -379,9 +344,6 @@ func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageReq
func (UnimplementedSeaweedMessagingServer) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) { func (UnimplementedSeaweedMessagingServer) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented") return nil, status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented")
} }
func (UnimplementedSeaweedMessagingServer) FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error {
return status.Errorf(codes.Unimplemented, "method FollowInMemoryMessages not implemented")
}
func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {}
// UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service. // UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service.
@ -656,27 +618,6 @@ func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _SeaweedMessaging_FollowInMemoryMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(FollowInMemoryMessagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedMessagingServer).FollowInMemoryMessages(m, &seaweedMessagingFollowInMemoryMessagesServer{stream})
}
type SeaweedMessaging_FollowInMemoryMessagesServer interface {
Send(*FollowInMemoryMessagesResponse) error
grpc.ServerStream
}
type seaweedMessagingFollowInMemoryMessagesServer struct {
grpc.ServerStream
}
func (x *seaweedMessagingFollowInMemoryMessagesServer) Send(m *FollowInMemoryMessagesResponse) error {
return x.ServerStream.SendMsg(m)
}
// SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -745,11 +686,6 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
Handler: _SeaweedMessaging_SubscribeMessage_Handler, Handler: _SeaweedMessaging_SubscribeMessage_Handler,
ServerStreams: true, ServerStreams: true,
}, },
{
StreamName: "FollowInMemoryMessages",
Handler: _SeaweedMessaging_FollowInMemoryMessages_Handler,
ServerStreams: true,
},
}, },
Metadata: "mq.proto", Metadata: "mq.proto",
} }
Loading…
Cancel
Save