Browse Source

add pub sub md5

pull/1318/head
Chris Lu 5 years ago
parent
commit
d693e77418
  1. 2
      weed/messaging/broker/broker_grpc_server.go
  2. 14
      weed/messaging/msgclient/pub_chan.go
  3. 17
      weed/messaging/msgclient/sub_chan.go

2
weed/messaging/broker/broker_grpc_server.go

@ -33,5 +33,5 @@ func genTopicDir(namespace, topic string) string {
} }
func genTopicDirEntry(namespace, topic string) (dir, entry string) { func genTopicDirEntry(namespace, topic string) (dir, entry string) {
return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace), topic
return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic
} }

14
weed/messaging/msgclient/pub_chan.go

@ -1,6 +1,8 @@
package msgclient package msgclient
import ( import (
"crypto/md5"
"hash"
"io" "io"
"log" "log"
@ -13,6 +15,7 @@ import (
type PubChannel struct { type PubChannel struct {
client messaging_pb.SeaweedMessaging_PublishClient client messaging_pb.SeaweedMessaging_PublishClient
grpcConnection *grpc.ClientConn grpcConnection *grpc.ClientConn
md5hash hash.Hash
} }
func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
@ -32,15 +35,20 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
return &PubChannel{ return &PubChannel{
client: pc, client: pc,
grpcConnection: grpcConnection, grpcConnection: grpcConnection,
md5hash: md5.New(),
}, nil }, nil
} }
func (pc *PubChannel) Publish(m []byte) error { func (pc *PubChannel) Publish(m []byte) error {
return pc.client.Send(&messaging_pb.PublishRequest{
err := pc.client.Send(&messaging_pb.PublishRequest{
Data: &messaging_pb.Message{ Data: &messaging_pb.Message{
Value: m, Value: m,
}, },
}) })
if err == nil {
pc.md5hash.Write(m)
}
return err
} }
func (pc *PubChannel) Close() error { func (pc *PubChannel) Close() error {
@ -62,3 +70,7 @@ func (pc *PubChannel) Close() error {
} }
return nil return nil
} }
func (pc *PubChannel) Md5() []byte {
return pc.md5hash.Sum(nil)
}

17
weed/messaging/msgclient/sub_chan.go

@ -1,6 +1,8 @@
package msgclient package msgclient
import ( import (
"crypto/md5"
"hash"
"io" "io"
"log" "log"
"time" "time"
@ -10,8 +12,9 @@ import (
) )
type SubChannel struct { type SubChannel struct {
ch chan []byte
stream messaging_pb.SeaweedMessaging_SubscribeClient
ch chan []byte
stream messaging_pb.SeaweedMessaging_SubscribeClient
md5hash hash.Hash
} }
func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
@ -30,8 +33,9 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
} }
t := &SubChannel{ t := &SubChannel{
ch: make(chan []byte),
stream: sc,
ch: make(chan []byte),
stream: sc,
md5hash: md5.New(),
} }
go func() { go func() {
@ -51,6 +55,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
close(t.ch) close(t.ch)
return return
} }
t.md5hash.Write(resp.Data.Value)
t.ch <- resp.Data.Value t.ch <- resp.Data.Value
} }
}() }()
@ -61,3 +66,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
func (sc *SubChannel) Channel() chan []byte { func (sc *SubChannel) Channel() chan []byte {
return sc.ch return sc.ch
} }
func (sc *SubChannel) Md5() []byte {
return sc.md5hash.Sum(nil)
}
Loading…
Cancel
Save