You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.0 KiB

7 years ago
  1. package kafka
  2. import (
  3. "github.com/Shopify/sarama"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/msgqueue"
  6. "github.com/golang/protobuf/proto"
  7. )
  8. func init() {
  9. msgqueue.MessageQueues = append(msgqueue.MessageQueues, &KafkaQueue{})
  10. }
  11. type KafkaQueue struct {
  12. topic string
  13. producer sarama.AsyncProducer
  14. }
  15. func (k *KafkaQueue) GetName() string {
  16. return "kafka"
  17. }
  18. func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
  19. glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
  20. glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
  21. return k.initialize(
  22. configuration.GetStringSlice("hosts"),
  23. configuration.GetString("topic"),
  24. )
  25. }
  26. func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
  27. config := sarama.NewConfig()
  28. config.Producer.RequiredAcks = sarama.WaitForLocal
  29. config.Producer.Partitioner = sarama.NewHashPartitioner
  30. config.Producer.Return.Successes = true
  31. config.Producer.Return.Errors = true
  32. k.producer, err = sarama.NewAsyncProducer(hosts, config)
  33. k.topic = topic
  34. go k.handleSuccess()
  35. go k.handleError()
  36. return nil
  37. }
  38. func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
  39. bytes, err := proto.Marshal(message)
  40. if err != nil {
  41. return
  42. }
  43. msg := &sarama.ProducerMessage{
  44. Topic: k.topic,
  45. Key: sarama.StringEncoder(key),
  46. Value: sarama.ByteEncoder(bytes),
  47. }
  48. k.producer.Input() <- msg
  49. return nil
  50. }
  51. func (k *KafkaQueue) handleSuccess() {
  52. for {
  53. pm := <-k.producer.Successes()
  54. if pm != nil {
  55. glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
  56. }
  57. }
  58. }
  59. func (k *KafkaQueue) handleError() {
  60. for {
  61. err := <-k.producer.Errors()
  62. if err != nil {
  63. glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
  64. }
  65. }
  66. }