Browse Source

add agent sub

mq
chrislu 1 week ago
parent
commit
cb54ff8ad4
  1. 4
      weed/mq/client/agent_client/agent_subscribe.go
  2. 5
      weed/mq/client/agent_client/subscribe_session.go
  3. 54
      weed/mq/client/cmd/agent_sub_record/agent_sub_record.go
  4. 13
      weed/mq/client/cmd/example/my_record.go

4
weed/mq/client/agent_client/agent_subscribe.go

@ -14,4 +14,8 @@ func (a *SubscribeSession) SubscribeMessageRecord(
}
onEachMessageFn(resp.Key, resp.Value)
}
if onCompletionFn != nil {
onCompletionFn()
}
return nil
}

5
weed/mq/client/agent_client/subscribe_session.go

@ -61,3 +61,8 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri
sessionId: resp.SessionId,
}, nil
}
func (s *SubscribeSession) CloseSession() error {
err := s.stream.CloseSend()
return err
}

54
weed/mq/client/cmd/agent_sub_record/agent_sub_record.go

@ -0,0 +1,54 @@
package main
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
"github.com/seaweedfs/seaweedfs/weed/mq/client/cmd/example"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"log"
"time"
)
var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("topic", "test", "topic")
agent = flag.String("agent", "localhost:16777", "mq agent address")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
)
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
session, err := agent_client.NewSubscribeSession(*agent, &agent_client.SubscribeOption{
ConsumerGroup: "test",
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
Topic: topic.NewTopic(*namespace, *t),
Filter: "",
MaxSubscribedPartitions: int32(*maxPartitionCount),
PerPartitionConcurrency: int32(*perPartitionConcurrency),
})
if err != nil {
log.Printf("new subscribe session: %v", err)
return
}
defer session.CloseSession()
counter := 0
session.SubscribeMessageRecord(func(key []byte, recordValue *schema_pb.RecordValue) {
counter++
record := example.FromRecordValue(recordValue)
fmt.Printf("%s %v\n", string(key), record)
}, func() {
log.Printf("done received %d messages", counter)
})
}

13
weed/mq/client/cmd/example/my_record.go

@ -41,3 +41,16 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
SetBool("field7", r.Field7).
RecordEnd()
}
func FromRecordValue(recordValue *schema_pb.RecordValue) *MyRecord {
return &MyRecord{
Key: recordValue.Fields["key"].GetBytesValue(),
Field1: recordValue.Fields["field1"].GetBytesValue(),
Field2: recordValue.Fields["field2"].GetStringValue(),
Field3: recordValue.Fields["field3"].GetInt32Value(),
Field4: recordValue.Fields["field4"].GetInt64Value(),
Field5: recordValue.Fields["field5"].GetFloatValue(),
Field6: recordValue.Fields["field6"].GetDoubleValue(),
Field7: recordValue.Fields["field7"].GetBoolValue(),
}
}
Loading…
Cancel
Save