You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

40 lines
1.1 KiB

  1. package topic
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  6. "time"
  7. )
  8. type LocalPartition struct {
  9. Partition
  10. isLeader bool
  11. FollowerBrokers []pb.ServerAddress
  12. logBuffer *log_buffer.LogBuffer
  13. }
  14. func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
  15. p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
  16. }
  17. func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
  18. isLeaer := assignment.LeaderBroker == string(self)
  19. localPartition := &LocalPartition{
  20. Partition: Partition{
  21. RangeStart: assignment.PartitionStart,
  22. RangeStop: assignment.PartitionStop,
  23. RingSize: PartitionCount,
  24. },
  25. isLeader: isLeaer,
  26. }
  27. if !isLeaer {
  28. return localPartition
  29. }
  30. followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
  31. for i, follower := range assignment.FollowerBrokers {
  32. followers[i] = pb.ServerAddress(follower)
  33. }
  34. localPartition.FollowerBrokers = followers
  35. return localPartition
  36. }