chrislu
2 years ago
25 changed files with 151 additions and 3373 deletions
-
6other/java/client/src/main/proto/filer.proto
-
12weed/mq/broker.go
-
130weed/mq/broker/broker_append.go
-
37weed/mq/broker/broker_grpc_server.go
-
50weed/mq/broker/broker_grpc_server_discovery.go
-
112weed/mq/broker/broker_grpc_server_publish.go
-
178weed/mq/broker/broker_grpc_server_subscribe.go
-
64weed/mq/broker/broker_server.go
-
124weed/mq/broker/topic_manager.go
-
5weed/mq/msgclient/chan_config.go
-
76weed/mq/msgclient/chan_pub.go
-
85weed/mq/msgclient/chan_sub.go
-
55weed/mq/msgclient/client.go
-
63weed/mq/msgclient/config.go
-
118weed/mq/msgclient/publisher.go
-
120weed/mq/msgclient/subscriber.go
-
23weed/mq/topic.go
-
6weed/pb/filer.proto
-
111weed/pb/filer_pb/filer.pb.go
-
104weed/pb/filer_pb/filer_grpc.pb.go
-
107weed/pb/mq.proto
-
1615weed/pb/mq_pb/mq.pb.go
-
247weed/pb/mq_pb/mq_grpc.pb.go
-
72weed/server/filer_grpc_server_admin.go
-
4weed/server/filer_server.go
@ -0,0 +1,12 @@ |
|||
package mq |
|||
|
|||
const LAST_MINUTES = 10 |
|||
|
|||
type TopicStat struct { |
|||
MessageCounts [LAST_MINUTES]int64 |
|||
ByteCounts [LAST_MINUTES]int64 |
|||
} |
|||
|
|||
func NewTopicStat() *TopicStat { |
|||
return &TopicStat{} |
|||
} |
@ -1,130 +0,0 @@ |
|||
package broker |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"io" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/operation" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
func (broker *MessageQueueBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error { |
|||
|
|||
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) |
|||
if err2 != nil { |
|||
return err2 |
|||
} |
|||
|
|||
dir, name := util.FullPath(targetFile).DirAndName() |
|||
|
|||
// append the chunk
|
|||
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
request := &filer_pb.AppendToEntryRequest{ |
|||
Directory: dir, |
|||
EntryName: name, |
|||
Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)}, |
|||
} |
|||
|
|||
_, err := client.AppendToEntry(context.Background(), request) |
|||
if err != nil { |
|||
glog.V(0).Infof("append to file %v: %v", request, err) |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
}); err != nil { |
|||
return fmt.Errorf("append to file %v: %v", targetFile, err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (broker *MessageQueueBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { |
|||
|
|||
var assignResult = &operation.AssignResult{} |
|||
|
|||
// assign a volume location
|
|||
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
assignErr := util.Retry("assignVolume", func() error { |
|||
request := &filer_pb.AssignVolumeRequest{ |
|||
Count: 1, |
|||
Replication: topicConfig.Replication, |
|||
Collection: topicConfig.Collection, |
|||
} |
|||
|
|||
resp, err := client.AssignVolume(context.Background(), request) |
|||
if err != nil { |
|||
glog.V(0).Infof("assign volume failure %v: %v", request, err) |
|||
return err |
|||
} |
|||
if resp.Error != "" { |
|||
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) |
|||
} |
|||
|
|||
assignResult.Auth = security.EncodedJwt(resp.Auth) |
|||
assignResult.Fid = resp.FileId |
|||
assignResult.Url = resp.Location.Url |
|||
assignResult.PublicUrl = resp.Location.PublicUrl |
|||
assignResult.GrpcPort = int(resp.Location.GrpcPort) |
|||
assignResult.Count = uint64(resp.Count) |
|||
|
|||
return nil |
|||
}) |
|||
if assignErr != nil { |
|||
return assignErr |
|||
} |
|||
|
|||
return nil |
|||
}); err != nil { |
|||
return nil, nil, err |
|||
} |
|||
|
|||
// upload data
|
|||
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) |
|||
uploadOption := &operation.UploadOption{ |
|||
UploadUrl: targetUrl, |
|||
Filename: "", |
|||
Cipher: broker.option.Cipher, |
|||
IsInputCompressed: false, |
|||
MimeType: "", |
|||
PairMap: nil, |
|||
Jwt: assignResult.Auth, |
|||
} |
|||
uploadResult, err := operation.UploadData(data, uploadOption) |
|||
if err != nil { |
|||
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) |
|||
} |
|||
// println("uploaded to", targetUrl)
|
|||
return assignResult, uploadResult, nil |
|||
} |
|||
|
|||
var _ = filer_pb.FilerClient(&MessageQueueBroker{}) |
|||
|
|||
func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { |
|||
|
|||
for _, filer := range broker.option.Filers { |
|||
if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { |
|||
if err == io.EOF { |
|||
return |
|||
} |
|||
glog.V(0).Infof("fail to connect to %s: %v", filer, err) |
|||
} else { |
|||
break |
|||
} |
|||
} |
|||
|
|||
return |
|||
|
|||
} |
|||
|
|||
func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string { |
|||
return location.Url |
|||
} |
@ -1,37 +0,0 @@ |
|||
package broker |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func (broker *MessageQueueBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func (broker *MessageQueueBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { |
|||
resp := &mq_pb.DeleteTopicResponse{} |
|||
dir, entry := genTopicDirEntry(request.Namespace, request.Topic) |
|||
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { |
|||
return nil, err |
|||
} else if exists { |
|||
err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil) |
|||
} |
|||
return resp, nil |
|||
} |
|||
|
|||
func (broker *MessageQueueBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { |
|||
panic("implement me") |
|||
} |
|||
|
|||
func genTopicDir(namespace, topic string) string { |
|||
return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic) |
|||
} |
|||
|
|||
func genTopicDirEntry(namespace, topic string) (dir, entry string) { |
|||
return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic |
|||
} |
@ -1,112 +0,0 @@ |
|||
package broker |
|||
|
|||
import ( |
|||
"crypto/md5" |
|||
"fmt" |
|||
"io" |
|||
|
|||
"github.com/golang/protobuf/proto" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { |
|||
|
|||
// process initial request
|
|||
in, err := stream.Recv() |
|||
if err == io.EOF { |
|||
return nil |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// TODO look it up
|
|||
topicConfig := &mq_pb.TopicConfiguration{ |
|||
// IsTransient: true,
|
|||
} |
|||
|
|||
// send init response
|
|||
initResponse := &mq_pb.PublishResponse{ |
|||
Config: nil, |
|||
Redirect: nil, |
|||
} |
|||
err = stream.Send(initResponse) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
if initResponse.Redirect != nil { |
|||
return nil |
|||
} |
|||
|
|||
// get lock
|
|||
tp := TopicPartition{ |
|||
Namespace: in.Init.Namespace, |
|||
Topic: in.Init.Topic, |
|||
Partition: in.Init.Partition, |
|||
} |
|||
|
|||
tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic) |
|||
md5File := fmt.Sprintf("p%02d.md5", tp.Partition) |
|||
// println("chan data stored under", tpDir, "as", md5File)
|
|||
|
|||
if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists { |
|||
return fmt.Errorf("channel is already closed") |
|||
} |
|||
|
|||
tl := broker.topicManager.RequestLock(tp, topicConfig, true) |
|||
defer broker.topicManager.ReleaseLock(tp, true) |
|||
|
|||
md5hash := md5.New() |
|||
// process each message
|
|||
for { |
|||
// println("recv")
|
|||
in, err := stream.Recv() |
|||
// glog.V(0).Infof("recieved %v err: %v", in, err)
|
|||
if err == io.EOF { |
|||
return nil |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
if in.Data == nil { |
|||
continue |
|||
} |
|||
|
|||
// fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
|
|||
|
|||
data, err := proto.Marshal(in.Data) |
|||
if err != nil { |
|||
glog.Errorf("marshall error: %v\n", err) |
|||
continue |
|||
} |
|||
|
|||
tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) |
|||
|
|||
if in.Data.IsClose { |
|||
// println("server received closing")
|
|||
break |
|||
} |
|||
|
|||
md5hash.Write(in.Data.Value) |
|||
|
|||
} |
|||
|
|||
if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil { |
|||
glog.V(0).Infof("err writing %s: %v", md5File, err) |
|||
} |
|||
|
|||
// fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
|
|||
|
|||
// send the close ack
|
|||
// println("server send ack closing")
|
|||
if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil { |
|||
glog.V(0).Infof("err sending close response: %v", err) |
|||
} |
|||
return nil |
|||
|
|||
} |
@ -1,178 +0,0 @@ |
|||
package broker |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer" |
|||
"io" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/golang/protobuf/proto" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func (broker *MessageQueueBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error { |
|||
|
|||
// process initial request
|
|||
in, err := stream.Recv() |
|||
if err == io.EOF { |
|||
return nil |
|||
} |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
var processedTsNs int64 |
|||
var messageCount int64 |
|||
subscriberId := in.Init.SubscriberId |
|||
|
|||
// TODO look it up
|
|||
topicConfig := &mq_pb.TopicConfiguration{ |
|||
// IsTransient: true,
|
|||
} |
|||
|
|||
// get lock
|
|||
tp := TopicPartition{ |
|||
Namespace: in.Init.Namespace, |
|||
Topic: in.Init.Topic, |
|||
Partition: in.Init.Partition, |
|||
} |
|||
fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String()) |
|||
defer func() { |
|||
fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs)) |
|||
}() |
|||
|
|||
lock := broker.topicManager.RequestLock(tp, topicConfig, false) |
|||
defer broker.topicManager.ReleaseLock(tp, false) |
|||
|
|||
isConnected := true |
|||
go func() { |
|||
for isConnected { |
|||
if _, err := stream.Recv(); err != nil { |
|||
// println("disconnecting connection to", subscriberId, tp.String())
|
|||
isConnected = false |
|||
lock.cond.Signal() |
|||
} |
|||
} |
|||
}() |
|||
|
|||
lastReadTime := time.Now() |
|||
switch in.Init.StartPosition { |
|||
case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP: |
|||
lastReadTime = time.Unix(0, in.Init.TimestampNs) |
|||
case mq_pb.SubscriberMessage_InitMessage_LATEST: |
|||
case mq_pb.SubscriberMessage_InitMessage_EARLIEST: |
|||
lastReadTime = time.Unix(0, 0) |
|||
} |
|||
|
|||
// how to process each message
|
|||
// an error returned will end the subscription
|
|||
eachMessageFn := func(m *mq_pb.Message) error { |
|||
err := stream.Send(&mq_pb.BrokerMessage{ |
|||
Data: m, |
|||
}) |
|||
if err != nil { |
|||
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) |
|||
} |
|||
return err |
|||
} |
|||
|
|||
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { |
|||
m := &mq_pb.Message{} |
|||
if err = proto.Unmarshal(logEntry.Data, m); err != nil { |
|||
glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err) |
|||
return err |
|||
} |
|||
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
|
|||
if err = eachMessageFn(m); err != nil { |
|||
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) |
|||
return err |
|||
} |
|||
if m.IsClose { |
|||
// println("processed EOF")
|
|||
return io.EOF |
|||
} |
|||
processedTsNs = logEntry.TsNs |
|||
messageCount++ |
|||
return nil |
|||
} |
|||
|
|||
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
|
|||
|
|||
for { |
|||
|
|||
if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { |
|||
if err != io.EOF { |
|||
// println("stopping from persisted logs", err.Error())
|
|||
return err |
|||
} |
|||
} |
|||
|
|||
if processedTsNs != 0 { |
|||
lastReadTime = time.Unix(0, processedTsNs) |
|||
} |
|||
|
|||
lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool { |
|||
lock.Mutex.Lock() |
|||
lock.cond.Wait() |
|||
lock.Mutex.Unlock() |
|||
return isConnected |
|||
}, eachLogEntryFn) |
|||
if err != nil { |
|||
if err == log_buffer.ResumeFromDiskError { |
|||
continue |
|||
} |
|||
glog.Errorf("processed to %v: %v", lastReadTime, err) |
|||
if err != log_buffer.ResumeError { |
|||
break |
|||
} |
|||
} |
|||
} |
|||
|
|||
return err |
|||
|
|||
} |
|||
|
|||
func (broker *MessageQueueBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { |
|||
startTime = startTime.UTC() |
|||
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) |
|||
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) |
|||
|
|||
sizeBuf := make([]byte, 4) |
|||
startTsNs := startTime.UnixNano() |
|||
|
|||
topicDir := genTopicDir(tp.Namespace, tp.Topic) |
|||
partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) |
|||
|
|||
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { |
|||
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) |
|||
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { |
|||
if dayEntry.Name == startDate { |
|||
hourMinute := util.FileNameBase(hourMinuteEntry.Name) |
|||
if strings.Compare(hourMinute, startHourMinute) < 0 { |
|||
return nil |
|||
} |
|||
} |
|||
if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) { |
|||
return nil |
|||
} |
|||
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
|
|||
chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) |
|||
defer chunkedFileReader.Close() |
|||
if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil { |
|||
chunkedFileReader.Close() |
|||
if err == io.EOF { |
|||
return err |
|||
} |
|||
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) |
|||
} |
|||
return nil |
|||
}, "", false, 24*60) |
|||
}, startDate, true, 366) |
|||
|
|||
} |
@ -1,124 +0,0 @@ |
|||
package broker |
|||
|
|||
import ( |
|||
"fmt" |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer" |
|||
) |
|||
|
|||
type TopicPartition struct { |
|||
Namespace string |
|||
Topic string |
|||
Partition int32 |
|||
} |
|||
|
|||
const ( |
|||
TopicPartitionFmt = "%s/%s_%02d" |
|||
) |
|||
|
|||
func (tp *TopicPartition) String() string { |
|||
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) |
|||
} |
|||
|
|||
type TopicControl struct { |
|||
sync.Mutex |
|||
cond *sync.Cond |
|||
subscriberCount int |
|||
publisherCount int |
|||
logBuffer *log_buffer.LogBuffer |
|||
} |
|||
|
|||
type TopicManager struct { |
|||
sync.Mutex |
|||
topicControls map[TopicPartition]*TopicControl |
|||
broker *MessageQueueBroker |
|||
} |
|||
|
|||
func NewTopicManager(messageBroker *MessageQueueBroker) *TopicManager { |
|||
return &TopicManager{ |
|||
topicControls: make(map[TopicPartition]*TopicControl), |
|||
broker: messageBroker, |
|||
} |
|||
} |
|||
|
|||
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer { |
|||
|
|||
flushFn := func(startTime, stopTime time.Time, buf []byte) { |
|||
|
|||
if topicConfig.IsTransient { |
|||
// return
|
|||
} |
|||
|
|||
// fmt.Printf("flushing with topic config %+v\n", topicConfig)
|
|||
|
|||
startTime, stopTime = startTime.UTC(), stopTime.UTC() |
|||
targetFile := fmt.Sprintf( |
|||
"%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", |
|||
filer.TopicsDir, tp.Namespace, tp.Topic, |
|||
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), |
|||
tp.Partition, |
|||
) |
|||
|
|||
if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil { |
|||
glog.V(0).Infof("log write failed %s: %v", targetFile, err) |
|||
} |
|||
} |
|||
logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() { |
|||
tl.cond.Broadcast() |
|||
}) |
|||
|
|||
return logBuffer |
|||
} |
|||
|
|||
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl { |
|||
tm.Lock() |
|||
defer tm.Unlock() |
|||
|
|||
tc, found := tm.topicControls[partition] |
|||
if !found { |
|||
tc = &TopicControl{} |
|||
tc.cond = sync.NewCond(&tc.Mutex) |
|||
tm.topicControls[partition] = tc |
|||
tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig) |
|||
} |
|||
if isPublisher { |
|||
tc.publisherCount++ |
|||
} else { |
|||
tc.subscriberCount++ |
|||
} |
|||
return tc |
|||
} |
|||
|
|||
func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) { |
|||
tm.Lock() |
|||
defer tm.Unlock() |
|||
|
|||
lock, found := tm.topicControls[partition] |
|||
if !found { |
|||
return |
|||
} |
|||
if isPublisher { |
|||
lock.publisherCount-- |
|||
} else { |
|||
lock.subscriberCount-- |
|||
} |
|||
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 { |
|||
delete(tm.topicControls, partition) |
|||
lock.logBuffer.Shutdown() |
|||
} |
|||
} |
|||
|
|||
func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) { |
|||
tm.Lock() |
|||
defer tm.Unlock() |
|||
|
|||
for k := range tm.topicControls { |
|||
tps = append(tps, k) |
|||
} |
|||
return |
|||
} |
@ -1,5 +0,0 @@ |
|||
package msgclient |
|||
|
|||
func (mc *MessagingClient) DeleteChannel(chanName string) error { |
|||
return mc.DeleteTopic("chan", chanName) |
|||
} |
@ -1,76 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"crypto/md5" |
|||
"hash" |
|||
"io" |
|||
"log" |
|||
|
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
type PubChannel struct { |
|||
client mq_pb.SeaweedMessaging_PublishClient |
|||
grpcConnection *grpc.ClientConn |
|||
md5hash hash.Hash |
|||
} |
|||
|
|||
func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { |
|||
tp := broker.TopicPartition{ |
|||
Namespace: "chan", |
|||
Topic: chanName, |
|||
Partition: 0, |
|||
} |
|||
grpcConnection, err := mc.findBroker(tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
pc, err := setupPublisherClient(grpcConnection, tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return &PubChannel{ |
|||
client: pc, |
|||
grpcConnection: grpcConnection, |
|||
md5hash: md5.New(), |
|||
}, nil |
|||
} |
|||
|
|||
func (pc *PubChannel) Publish(m []byte) error { |
|||
err := pc.client.Send(&mq_pb.PublishRequest{ |
|||
Data: &mq_pb.Message{ |
|||
Value: m, |
|||
}, |
|||
}) |
|||
if err == nil { |
|||
pc.md5hash.Write(m) |
|||
} |
|||
return err |
|||
} |
|||
func (pc *PubChannel) Close() error { |
|||
|
|||
// println("send closing")
|
|||
if err := pc.client.Send(&mq_pb.PublishRequest{ |
|||
Data: &mq_pb.Message{ |
|||
IsClose: true, |
|||
}, |
|||
}); err != nil { |
|||
log.Printf("err send close: %v", err) |
|||
} |
|||
// println("receive closing")
|
|||
if _, err := pc.client.Recv(); err != nil && err != io.EOF { |
|||
log.Printf("err receive close: %v", err) |
|||
} |
|||
// println("close connection")
|
|||
if err := pc.grpcConnection.Close(); err != nil { |
|||
log.Printf("err connection close: %v", err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (pc *PubChannel) Md5() []byte { |
|||
return pc.md5hash.Sum(nil) |
|||
} |
@ -1,85 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"context" |
|||
"crypto/md5" |
|||
"hash" |
|||
"io" |
|||
"log" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
type SubChannel struct { |
|||
ch chan []byte |
|||
stream mq_pb.SeaweedMessaging_SubscribeClient |
|||
md5hash hash.Hash |
|||
cancel context.CancelFunc |
|||
} |
|||
|
|||
func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { |
|||
tp := broker.TopicPartition{ |
|||
Namespace: "chan", |
|||
Topic: chanName, |
|||
Partition: 0, |
|||
} |
|||
grpcConnection, err := mc.findBroker(tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
t := &SubChannel{ |
|||
ch: make(chan []byte), |
|||
stream: sc, |
|||
md5hash: md5.New(), |
|||
cancel: cancel, |
|||
} |
|||
|
|||
go func() { |
|||
for { |
|||
resp, subErr := t.stream.Recv() |
|||
if subErr == io.EOF { |
|||
return |
|||
} |
|||
if subErr != nil { |
|||
log.Printf("fail to receive from netchan %s: %v", chanName, subErr) |
|||
return |
|||
} |
|||
if resp.Data == nil { |
|||
// this could be heartbeat from broker
|
|||
continue |
|||
} |
|||
if resp.Data.IsClose { |
|||
t.stream.Send(&mq_pb.SubscriberMessage{ |
|||
IsClose: true, |
|||
}) |
|||
close(t.ch) |
|||
cancel() |
|||
return |
|||
} |
|||
t.ch <- resp.Data.Value |
|||
t.md5hash.Write(resp.Data.Value) |
|||
} |
|||
}() |
|||
|
|||
return t, nil |
|||
} |
|||
|
|||
func (sc *SubChannel) Channel() chan []byte { |
|||
return sc.ch |
|||
} |
|||
|
|||
func (sc *SubChannel) Md5() []byte { |
|||
return sc.md5hash.Sum(nil) |
|||
} |
|||
|
|||
func (sc *SubChannel) Cancel() { |
|||
sc.cancel() |
|||
} |
@ -1,55 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
|
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
) |
|||
|
|||
type MessagingClient struct { |
|||
bootstrapBrokers []string |
|||
grpcConnections map[broker.TopicPartition]*grpc.ClientConn |
|||
grpcDialOption grpc.DialOption |
|||
} |
|||
|
|||
func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { |
|||
return &MessagingClient{ |
|||
bootstrapBrokers: bootstrapBrokers, |
|||
grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), |
|||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), |
|||
} |
|||
} |
|||
|
|||
func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { |
|||
|
|||
for _, broker := range mc.bootstrapBrokers { |
|||
grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) |
|||
if err != nil { |
|||
log.Printf("dial broker %s: %v", broker, err) |
|||
continue |
|||
} |
|||
defer grpcConnection.Close() |
|||
|
|||
resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), |
|||
&mq_pb.FindBrokerRequest{ |
|||
Namespace: tp.Namespace, |
|||
Topic: tp.Topic, |
|||
Parition: tp.Partition, |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
targetBroker := resp.Broker |
|||
return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) |
|||
} |
|||
return nil, fmt.Errorf("no broker found for %+v", tp) |
|||
} |
@ -1,63 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"context" |
|||
"log" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { |
|||
|
|||
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { |
|||
_, err := client.ConfigureTopic(context.Background(), |
|||
&mq_pb.ConfigureTopicRequest{ |
|||
Namespace: tp.Namespace, |
|||
Topic: tp.Topic, |
|||
Configuration: &mq_pb.TopicConfiguration{ |
|||
PartitionCount: 0, |
|||
Collection: "", |
|||
Replication: "", |
|||
IsTransient: false, |
|||
Partitoning: 0, |
|||
}, |
|||
}) |
|||
return err |
|||
}) |
|||
|
|||
} |
|||
|
|||
func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { |
|||
|
|||
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { |
|||
_, err := client.DeleteTopic(context.Background(), |
|||
&mq_pb.DeleteTopicRequest{ |
|||
Namespace: namespace, |
|||
Topic: topic, |
|||
}) |
|||
return err |
|||
}) |
|||
} |
|||
|
|||
func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error { |
|||
|
|||
var lastErr error |
|||
for _, broker := range mc.bootstrapBrokers { |
|||
grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) |
|||
if err != nil { |
|||
log.Printf("dial broker %s: %v", broker, err) |
|||
continue |
|||
} |
|||
defer grpcConnection.Close() |
|||
|
|||
err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection)) |
|||
if err == nil { |
|||
return nil |
|||
} |
|||
lastErr = err |
|||
} |
|||
|
|||
return lastErr |
|||
} |
@ -1,118 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"context" |
|||
|
|||
"github.com/OneOfOne/xxhash" |
|||
"google.golang.org/grpc" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
) |
|||
|
|||
type Publisher struct { |
|||
publishClients []mq_pb.SeaweedMessaging_PublishClient |
|||
topicConfiguration *mq_pb.TopicConfiguration |
|||
messageCount uint64 |
|||
publisherId string |
|||
} |
|||
|
|||
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { |
|||
// read topic configuration
|
|||
topicConfiguration := &mq_pb.TopicConfiguration{ |
|||
PartitionCount: 4, |
|||
} |
|||
publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) |
|||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { |
|||
tp := broker.TopicPartition{ |
|||
Namespace: namespace, |
|||
Topic: topic, |
|||
Partition: int32(i), |
|||
} |
|||
grpcClientConn, err := mc.findBroker(tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
client, err := setupPublisherClient(grpcClientConn, tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
publishClients[i] = client |
|||
} |
|||
return &Publisher{ |
|||
publishClients: publishClients, |
|||
topicConfiguration: topicConfiguration, |
|||
}, nil |
|||
} |
|||
|
|||
func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) { |
|||
|
|||
stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
// send init message
|
|||
err = stream.Send(&mq_pb.PublishRequest{ |
|||
Init: &mq_pb.PublishRequest_InitMessage{ |
|||
Namespace: tp.Namespace, |
|||
Topic: tp.Topic, |
|||
Partition: tp.Partition, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
// process init response
|
|||
initResponse, err := stream.Recv() |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if initResponse.Redirect != nil { |
|||
// TODO follow redirection
|
|||
} |
|||
if initResponse.Config != nil { |
|||
} |
|||
|
|||
// setup looks for control messages
|
|||
doneChan := make(chan error, 1) |
|||
go func() { |
|||
for { |
|||
in, err := stream.Recv() |
|||
if err != nil { |
|||
doneChan <- err |
|||
return |
|||
} |
|||
if in.Redirect != nil { |
|||
} |
|||
if in.Config != nil { |
|||
} |
|||
} |
|||
}() |
|||
|
|||
return stream, nil |
|||
|
|||
} |
|||
|
|||
func (p *Publisher) Publish(m *mq_pb.Message) error { |
|||
hashValue := p.messageCount |
|||
p.messageCount++ |
|||
if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash { |
|||
if m.Key != nil { |
|||
hashValue = xxhash.Checksum64(m.Key) |
|||
} |
|||
} else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash { |
|||
hashValue = xxhash.Checksum64(m.Key) |
|||
} else { |
|||
// round robin
|
|||
} |
|||
|
|||
idx := int(hashValue) % len(p.publishClients) |
|||
if idx < 0 { |
|||
idx += len(p.publishClients) |
|||
} |
|||
return p.publishClients[idx].Send(&mq_pb.PublishRequest{ |
|||
Data: m, |
|||
}) |
|||
} |
@ -1,120 +0,0 @@ |
|||
package msgclient |
|||
|
|||
import ( |
|||
"context" |
|||
"io" |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/mq/broker" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" |
|||
"google.golang.org/grpc" |
|||
) |
|||
|
|||
type Subscriber struct { |
|||
subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient |
|||
subscriberCancels []context.CancelFunc |
|||
subscriberId string |
|||
} |
|||
|
|||
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { |
|||
// read topic configuration
|
|||
topicConfiguration := &mq_pb.TopicConfiguration{ |
|||
PartitionCount: 4, |
|||
} |
|||
subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) |
|||
subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) |
|||
|
|||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ { |
|||
if partitionId >= 0 && i != partitionId { |
|||
continue |
|||
} |
|||
tp := broker.TopicPartition{ |
|||
Namespace: namespace, |
|||
Topic: topic, |
|||
Partition: int32(i), |
|||
} |
|||
grpcClientConn, err := mc.findBroker(tp) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
subscriberClients[i] = client |
|||
subscriberCancels[i] = cancel |
|||
} |
|||
|
|||
return &Subscriber{ |
|||
subscriberClients: subscriberClients, |
|||
subscriberCancels: subscriberCancels, |
|||
subscriberId: subscriberId, |
|||
}, nil |
|||
} |
|||
|
|||
func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) { |
|||
stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
// send init message
|
|||
err = stream.Send(&mq_pb.SubscriberMessage{ |
|||
Init: &mq_pb.SubscriberMessage_InitMessage{ |
|||
Namespace: tp.Namespace, |
|||
Topic: tp.Topic, |
|||
Partition: tp.Partition, |
|||
StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP, |
|||
TimestampNs: startTime.UnixNano(), |
|||
SubscriberId: subscriberId, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
return stream, nil |
|||
} |
|||
|
|||
func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error { |
|||
for { |
|||
resp, listenErr := subscriberClient.Recv() |
|||
if listenErr == io.EOF { |
|||
return nil |
|||
} |
|||
if listenErr != nil { |
|||
println(listenErr.Error()) |
|||
return listenErr |
|||
} |
|||
if resp.Data == nil { |
|||
// this could be heartbeat from broker
|
|||
continue |
|||
} |
|||
processFn(resp.Data) |
|||
} |
|||
} |
|||
|
|||
// Subscribe starts goroutines to process the messages
|
|||
func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) { |
|||
var wg sync.WaitGroup |
|||
for i := 0; i < len(s.subscriberClients); i++ { |
|||
if s.subscriberClients[i] != nil { |
|||
wg.Add(1) |
|||
go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) { |
|||
defer wg.Done() |
|||
doSubscribe(subscriberClient, processFn) |
|||
}(s.subscriberClients[i]) |
|||
} |
|||
} |
|||
wg.Wait() |
|||
} |
|||
|
|||
func (s *Subscriber) Shutdown() { |
|||
for i := 0; i < len(s.subscriberClients); i++ { |
|||
if s.subscriberCancels[i] != nil { |
|||
s.subscriberCancels[i]() |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,23 @@ |
|||
package mq |
|||
|
|||
import "time" |
|||
|
|||
type Namespace string |
|||
|
|||
type Topic struct { |
|||
namespace Namespace |
|||
name string |
|||
} |
|||
|
|||
type Partition struct { |
|||
rangeStart int |
|||
rangeStop int // exclusive
|
|||
ringSize int |
|||
} |
|||
|
|||
type Segment struct { |
|||
topic Topic |
|||
id int32 |
|||
partition Partition |
|||
lastModified time.Time |
|||
} |
1615
weed/pb/mq_pb/mq.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
Write
Preview
Loading…
Cancel
Save
Reference in new issue