diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 436c4158f..5deb9d7ca 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -31,7 +31,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } else if alreadyKnown { return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } - defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) + defer func() { + glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) + fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) + fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting + }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -112,6 +116,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq defer func() { glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) + fs.listenersCond.Broadcast() // nudges the subscribers that are waiting }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)