|
@ -2,12 +2,12 @@ package broker |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"io" |
|
|
"io" |
|
|
"sync" |
|
|
|
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/golang/protobuf/proto" |
|
|
"github.com/golang/protobuf/proto" |
|
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
) |
|
|
) |
|
@ -24,6 +24,12 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
subscriberId := in.Init.SubscriberId |
|
|
subscriberId := in.Init.SubscriberId |
|
|
|
|
|
println("+ subscriber:", subscriberId) |
|
|
|
|
|
defer println("- subscriber:", subscriberId) |
|
|
|
|
|
|
|
|
|
|
|
// TODO look it up
|
|
|
|
|
|
topicConfig := &messaging_pb.TopicConfiguration{ |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// get lock
|
|
|
// get lock
|
|
|
tp := TopicPartition{ |
|
|
tp := TopicPartition{ |
|
@ -31,9 +37,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs |
|
|
Topic: in.Init.Topic, |
|
|
Topic: in.Init.Topic, |
|
|
Partition: in.Init.Partition, |
|
|
Partition: in.Init.Partition, |
|
|
} |
|
|
} |
|
|
lock := broker.topicLocks.RequestSubscriberLock(tp) |
|
|
|
|
|
|
|
|
lock := broker.topicLocks.RequestLock(tp, topicConfig, false) |
|
|
defer broker.topicLocks.ReleaseLock(tp, false) |
|
|
defer broker.topicLocks.ReleaseLock(tp, false) |
|
|
cond := sync.NewCond(&lock.Mutex) |
|
|
|
|
|
|
|
|
|
|
|
lastReadTime := time.Now() |
|
|
lastReadTime := time.Now() |
|
|
switch in.Init.StartPosition { |
|
|
switch in.Init.StartPosition { |
|
@ -65,13 +70,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs |
|
|
size := util.BytesToUint32(buf[pos : pos+4]) |
|
|
size := util.BytesToUint32(buf[pos : pos+4]) |
|
|
entryData := buf[pos+4 : pos+4+int(size)] |
|
|
entryData := buf[pos+4 : pos+4+int(size)] |
|
|
|
|
|
|
|
|
|
|
|
logEntry := &filer_pb.LogEntry{} |
|
|
|
|
|
if err = proto.Unmarshal(entryData, logEntry); err != nil { |
|
|
|
|
|
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) |
|
|
|
|
|
pos += 4 + int(size) |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
m := &messaging_pb.Message{} |
|
|
m := &messaging_pb.Message{} |
|
|
if err = proto.Unmarshal(entryData, m); err != nil { |
|
|
|
|
|
|
|
|
if err = proto.Unmarshal(logEntry.Data, m); err != nil { |
|
|
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) |
|
|
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) |
|
|
pos += 4 + int(size) |
|
|
pos += 4 + int(size) |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value))
|
|
|
if err = eachMessageFn(m); err != nil { |
|
|
if err = eachMessageFn(m); err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
@ -81,7 +94,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
lock.Mutex.Lock() |
|
|
lock.Mutex.Lock() |
|
|
cond.Wait() |
|
|
|
|
|
|
|
|
lock.cond.Wait() |
|
|
lock.Mutex.Unlock() |
|
|
lock.Mutex.Unlock() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|