diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index fb8878b2d..31d2a8082 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -2,6 +2,7 @@ package broker import ( "context" + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -160,7 +161,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs select { case <-ctx.Done(): err := ctx.Err() - if err == context.Canceled { + if errors.Is(err, context.Canceled) { // Client disconnected return false } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index f4c6bfe9d..dfe594b46 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -1,6 +1,7 @@ package weed_server import ( + "errors" "fmt" "strings" "sync/atomic" @@ -24,7 +25,8 @@ const ( func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { - peerAddress := findClientAddress(stream.Context(), 0) + ctx := stream.Context() + peerAddress := findClientAddress(ctx, 0) isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) if isReplacing { @@ -81,17 +83,24 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + return false + default: + } + fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { - if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) - if readInMemoryLogErr != log_buffer.ResumeError { + if !errors.Is(readInMemoryLogErr, log_buffer.ResumeError) { break } } @@ -112,7 +121,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error { - peerAddress := findClientAddress(stream.Context(), 0) + ctx := stream.Context() + peerAddress := findClientAddress(ctx, 0) // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata() req.ClientId = -req.ClientId @@ -165,6 +175,14 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { + + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + return false + default: + } + fs.listenersLock.Lock() atomic.AddInt64(&fs.listenersWaits, 1) fs.listenersCond.Wait()