diff --git a/weed/mq/client/agent_client/agent_subscribe.go b/weed/mq/client/agent_client/agent_subscribe.go index 626a3a123..61024365a 100644 --- a/weed/mq/client/agent_client/agent_subscribe.go +++ b/weed/mq/client/agent_client/agent_subscribe.go @@ -14,4 +14,8 @@ func (a *SubscribeSession) SubscribeMessageRecord( } onEachMessageFn(resp.Key, resp.Value) } + if onCompletionFn != nil { + onCompletionFn() + } + return nil } diff --git a/weed/mq/client/agent_client/subscribe_session.go b/weed/mq/client/agent_client/subscribe_session.go index fc87e17ca..5002a6727 100644 --- a/weed/mq/client/agent_client/subscribe_session.go +++ b/weed/mq/client/agent_client/subscribe_session.go @@ -61,3 +61,8 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri sessionId: resp.SessionId, }, nil } + +func (s *SubscribeSession) CloseSession() error { + err := s.stream.CloseSend() + return err +} diff --git a/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go b/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go new file mode 100644 index 000000000..80313e4d1 --- /dev/null +++ b/weed/mq/client/cmd/agent_sub_record/agent_sub_record.go @@ -0,0 +1,54 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client" + "github.com/seaweedfs/seaweedfs/weed/mq/client/cmd/example" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "log" + "time" +) + +var ( + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + agent = flag.String("agent", "localhost:16777", "mq agent address") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") + timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + + clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") +) + +func main() { + flag.Parse() + util_http.InitGlobalHttpClient() + + session, err := agent_client.NewSubscribeSession(*agent, &agent_client.SubscribeOption{ + ConsumerGroup: "test", + ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), + Topic: topic.NewTopic(*namespace, *t), + Filter: "", + MaxSubscribedPartitions: int32(*maxPartitionCount), + PerPartitionConcurrency: int32(*perPartitionConcurrency), + }) + if err != nil { + log.Printf("new subscribe session: %v", err) + return + } + defer session.CloseSession() + + counter := 0 + session.SubscribeMessageRecord(func(key []byte, recordValue *schema_pb.RecordValue) { + counter++ + record := example.FromRecordValue(recordValue) + fmt.Printf("%s %v\n", string(key), record) + }, func() { + log.Printf("done received %d messages", counter) + }) + +} diff --git a/weed/mq/client/cmd/example/my_record.go b/weed/mq/client/cmd/example/my_record.go index 2ca693438..ea6a0e7bd 100644 --- a/weed/mq/client/cmd/example/my_record.go +++ b/weed/mq/client/cmd/example/my_record.go @@ -41,3 +41,16 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { SetBool("field7", r.Field7). RecordEnd() } + +func FromRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { + return &MyRecord{ + Key: recordValue.Fields["key"].GetBytesValue(), + Field1: recordValue.Fields["field1"].GetBytesValue(), + Field2: recordValue.Fields["field2"].GetStringValue(), + Field3: recordValue.Fields["field3"].GetInt32Value(), + Field4: recordValue.Fields["field4"].GetInt64Value(), + Field5: recordValue.Fields["field5"].GetFloatValue(), + Field6: recordValue.Fields["field6"].GetDoubleValue(), + Field7: recordValue.Fields["field7"].GetBoolValue(), + } +}