diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 6208b1435..89c568b0d 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -1,13 +1,11 @@ package broker import ( - "fmt" "io" "time" "github.com/golang/protobuf/proto" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -27,7 +25,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis topicConfig := &messaging_pb.TopicConfiguration{ } - + // send init response initResponse := &messaging_pb.PublishResponse{ Config: nil, @@ -47,20 +45,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis Topic: in.Init.Topic, Partition: in.Init.Partition, } - logBuffer := broker.topicLocks.RequestPublisherLock(tp, func(startTime, stopTime time.Time, buf []byte) { - - targetFile := fmt.Sprintf( - "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", - filer2.TopicsDir, tp.Namespace, tp.Topic, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), - tp.Partition, - ) - - if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil { - glog.V(0).Infof("log write failed %s: %v", targetFile, err) - } - - }) + tl := broker.topicLocks.RequestLock(tp, topicConfig, true) defer broker.topicLocks.ReleaseLock(tp, true) updatesChan := make(chan int32) @@ -78,7 +63,6 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis } }() - // process each message for { in, err := stream.Recv() @@ -100,7 +84,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis Headers: in.Data.Headers, } - println("received message:", string(in.Data.Value)) + // fmt.Printf("received: %d : %s\n", len(m.Value), string(m.Value)) data, err := proto.Marshal(m) if err != nil { @@ -108,7 +92,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis continue } - logBuffer.AddToBuffer(in.Data.Key, data) + tl.logBuffer.AddToBuffer(in.Data.Key, data) } } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 5a3c4f785..acf0330c6 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,12 +2,12 @@ package broker import ( "io" - "sync" "time" "github.com/golang/protobuf/proto" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -24,6 +24,12 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } subscriberId := in.Init.SubscriberId + println("+ subscriber:", subscriberId) + defer println("- subscriber:", subscriberId) + + // TODO look it up + topicConfig := &messaging_pb.TopicConfiguration{ + } // get lock tp := TopicPartition{ @@ -31,9 +37,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs Topic: in.Init.Topic, Partition: in.Init.Partition, } - lock := broker.topicLocks.RequestSubscriberLock(tp) + lock := broker.topicLocks.RequestLock(tp, topicConfig, false) defer broker.topicLocks.ReleaseLock(tp, false) - cond := sync.NewCond(&lock.Mutex) lastReadTime := time.Now() switch in.Init.StartPosition { @@ -65,13 +70,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs size := util.BytesToUint32(buf[pos : pos+4]) entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + m := &messaging_pb.Message{} - if err = proto.Unmarshal(entryData, m); err != nil { + if err = proto.Unmarshal(logEntry.Data, m); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) pos += 4 + int(size) continue } + // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value)) if err = eachMessageFn(m); err != nil { return err } @@ -81,7 +94,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } lock.Mutex.Lock() - cond.Wait() + lock.cond.Wait() lock.Mutex.Unlock() } diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 158a84e6c..29c227274 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -31,9 +31,10 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio messageBroker = &MessageBroker{ option: option, grpcDialOption: grpcDialOption, - topicLocks: NewTopicLocks(), } + messageBroker.topicLocks = NewTopicLocks(messageBroker) + messageBroker.checkPeers() // go messageBroker.loopForEver() diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go index 9e4ea6824..652ff0545 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_lock.go @@ -1,9 +1,13 @@ package broker import ( + "fmt" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" ) @@ -14,6 +18,7 @@ type TopicPartition struct { } type TopicLock struct { sync.Mutex + cond *sync.Cond subscriberCount int publisherCount int logBuffer *log_buffer.LogBuffer @@ -21,44 +26,56 @@ type TopicLock struct { type TopicLocks struct { sync.Mutex - locks map[TopicPartition]*TopicLock + locks map[TopicPartition]*TopicLock + broker *MessageBroker } -func NewTopicLocks() *TopicLocks { +func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks { return &TopicLocks{ - locks: make(map[TopicPartition]*TopicLock), + locks: make(map[TopicPartition]*TopicLock), + broker: messageBroker, } } -func (tl *TopicLocks) RequestSubscriberLock(partition TopicPartition) *TopicLock { - tl.Lock() - defer tl.Unlock() +func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { - lock, found := tl.locks[partition] - if !found { - lock = &TopicLock{} - tl.locks[partition] = lock + flushFn := func(startTime, stopTime time.Time, buf []byte) { + + targetFile := fmt.Sprintf( + "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", + filer2.TopicsDir, tp.Namespace, tp.Topic, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + tp.Partition, + ) + + if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil { + glog.V(0).Infof("log write failed %s: %v", targetFile, err) + } } - lock.subscriberCount++ + logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { + tl.cond.Broadcast() + }) - return lock + return logBuffer } -func (tl *TopicLocks) RequestPublisherLock(partition TopicPartition, flushFn func(startTime, stopTime time.Time, buf []byte)) *log_buffer.LogBuffer { +func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock { tl.Lock() defer tl.Unlock() lock, found := tl.locks[partition] if !found { lock = &TopicLock{} + lock.cond = sync.NewCond(&lock.Mutex) tl.locks[partition] = lock + lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig) } - lock.publisherCount++ - cond := sync.NewCond(&lock.Mutex) - lock.logBuffer = log_buffer.NewLogBuffer(time.Minute, flushFn, func() { - cond.Broadcast() - }) - return lock.logBuffer + if isPublisher { + lock.publisherCount++ + } else { + lock.subscriberCount++ + } + return lock } func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {