diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 9e951ce65..d8e8faa31 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"io"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
@@ -25,39 +26,47 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
 		return err
 	}
 
+	var processedTsNs int64
 	var messageCount int64
 	subscriberId := in.Init.SubscriberId
-	fmt.Printf("+ subscriber %s\n", subscriberId)
-	defer func() {
-		fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
-	}()
 
 	// TODO look it up
 	topicConfig := &messaging_pb.TopicConfiguration{
 		// IsTransient: true,
 	}
 
-	if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
-		return err
-	}
-
 	// get lock
 	tp := TopicPartition{
 		Namespace: in.Init.Namespace,
 		Topic:     in.Init.Topic,
 		Partition: in.Init.Partition,
 	}
+	fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
+	defer func() {
+		fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
+	}()
+
 	lock := broker.topicManager.RequestLock(tp, topicConfig, false)
 	defer broker.topicManager.ReleaseLock(tp, false)
 
 	isConnected := true
+	var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948
 	go func() {
+		lastActiveTime := time.Now().UnixNano()
+		sleepTime := 1737 * time.Millisecond
 		for isConnected {
-			time.Sleep(1737 * time.Millisecond)
+			time.Sleep(sleepTime)
+			if lastActiveTime != processedTsNs {
+				lastActiveTime = processedTsNs
+				continue
+			}
+			streamLock.Lock()
+			// println("checking connection health to", subscriberId, tp.String())
 			if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
 				isConnected = false
 				lock.cond.Signal()
 			}
+			streamLock.Unlock()
 		}
 	}()
 
@@ -69,14 +78,15 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
 	case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
 		lastReadTime = time.Unix(0, 0)
 	}
-	var processedTsNs int64
 
 	// how to process each message
 	// an error returned will end the subscription
 	eachMessageFn := func(m *messaging_pb.Message) error {
+		streamLock.Lock()
 		err := stream.Send(&messaging_pb.BrokerMessage{
 			Data: m,
 		})
+		streamLock.Unlock()
 		if err != nil {
 			glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
 		}
@@ -105,7 +115,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
 
 	if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
 		if err != io.EOF {
-			println("stopping from persisted logs", err.Error())
+			// println("stopping from persisted logs", err.Error())
 			return err
 		}
 	}
@@ -114,6 +124,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
 		lastReadTime = time.Unix(0, processedTsNs)
 	}
 
+	// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
+
 	err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
 		lock.Mutex.Lock()
 		lock.cond.Wait()
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
index 2e66923e2..f96bba2ec 100644
--- a/weed/messaging/msgclient/subscriber.go
+++ b/weed/messaging/msgclient/subscriber.go
@@ -15,7 +15,7 @@ type Subscriber struct {
 	subscriberId      string
 }
 
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
 	// read topic configuration
 	topicConfiguration := &messaging_pb.TopicConfiguration{
 		PartitionCount: 4,
@@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
 	subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
 
 	for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+		if partitionId>=0 && i != partitionId {
+			continue
+		}
 		tp := broker.TopicPartition{
 			Namespace: namespace,
 			Topic: topic,
@@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti
 		return
 	}
 
-	// process init response
-	_, err = stream.Recv()
-	if err != nil {
-		return
-	}
 	return stream, nil
 }
 
-func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
+func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
 	for {
-		resp, listenErr := s.subscriberClients[partition].Recv()
+		resp, listenErr := subscriberClient.Recv()
 		if listenErr == io.EOF {
 			return nil
 		}
@@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M
 // Subscribe starts goroutines to process the messages
 func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
 	for i := 0; i < len(s.subscriberClients); i++ {
-		go s.doSubscribe(i, processFn)
+		if s.subscriberClients[i] != nil {
+			go doSubscribe(s.subscriberClients[i], processFn)
+		}
 	}
 }