You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
8.8 KiB

7 years ago
7 years ago
5 years ago
7 years ago
5 years ago
3 years ago
7 years ago
3 years ago
5 years ago
3 years ago
4 years ago
3 years ago
6 years ago
  1. package wdclient
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/stats"
  5. "math/rand"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. )
  13. type MasterClient struct {
  14. FilerGroup string
  15. clientType string
  16. clientHost pb.ServerAddress
  17. rack string
  18. currentMaster pb.ServerAddress
  19. masters map[string]pb.ServerAddress
  20. grpcDialOption grpc.DialOption
  21. vidMap
  22. OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
  23. }
  24. func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
  25. return &MasterClient{
  26. FilerGroup: filerGroup,
  27. clientType: clientType,
  28. clientHost: clientHost,
  29. rack: rack,
  30. masters: masters,
  31. grpcDialOption: grpcDialOption,
  32. vidMap: newVidMap(clientDataCenter),
  33. }
  34. }
  35. func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
  36. return mc.LookupFileIdWithFallback
  37. }
  38. func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
  39. fullUrls, err = mc.vidMap.LookupFileId(fileId)
  40. if err == nil {
  41. return
  42. }
  43. err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
  44. resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  45. VolumeOrFileIds: []string{fileId},
  46. })
  47. if err != nil {
  48. return err
  49. }
  50. for vid, vidLocation := range resp.VolumeIdLocations {
  51. for _, vidLoc := range vidLocation.Locations {
  52. loc := Location{
  53. Url: vidLoc.Url,
  54. PublicUrl: vidLoc.PublicUrl,
  55. GrpcPort: int(vidLoc.GrpcPort),
  56. }
  57. mc.vidMap.addLocation(uint32(vid), loc)
  58. fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
  59. }
  60. }
  61. return nil
  62. })
  63. return
  64. }
  65. func (mc *MasterClient) GetMaster() pb.ServerAddress {
  66. mc.WaitUntilConnected()
  67. return mc.currentMaster
  68. }
  69. func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
  70. mc.WaitUntilConnected()
  71. return mc.masters
  72. }
  73. func (mc *MasterClient) WaitUntilConnected() {
  74. for mc.currentMaster == "" {
  75. time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
  76. }
  77. }
  78. func (mc *MasterClient) KeepConnectedToMaster() {
  79. glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
  80. for {
  81. mc.tryAllMasters()
  82. time.Sleep(time.Second)
  83. }
  84. }
  85. func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
  86. for _, master := range mc.masters {
  87. if master == myMasterAddress {
  88. continue
  89. }
  90. if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
  91. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
  92. defer cancel()
  93. resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
  94. if err != nil {
  95. return err
  96. }
  97. leader = resp.Leader
  98. return nil
  99. }); grpcErr != nil {
  100. glog.V(0).Infof("connect to %s: %v", master, grpcErr)
  101. }
  102. if leader != "" {
  103. glog.V(0).Infof("existing leader is %s", leader)
  104. return
  105. }
  106. }
  107. glog.V(0).Infof("No existing leader found!")
  108. return
  109. }
  110. func (mc *MasterClient) tryAllMasters() {
  111. var nextHintedLeader pb.ServerAddress
  112. for _, master := range mc.masters {
  113. nextHintedLeader = mc.tryConnectToMaster(master)
  114. for nextHintedLeader != "" {
  115. nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
  116. }
  117. mc.currentMaster = ""
  118. }
  119. }
  120. func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
  121. glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
  122. stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
  123. gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
  124. ctx, cancel := context.WithCancel(context.Background())
  125. defer cancel()
  126. stream, err := client.KeepConnected(ctx)
  127. if err != nil {
  128. glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err)
  129. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
  130. return err
  131. }
  132. if err = stream.Send(&master_pb.KeepConnectedRequest{
  133. FilerGroup: mc.FilerGroup,
  134. DataCenter: mc.DataCenter,
  135. Rack: mc.rack,
  136. ClientType: mc.clientType,
  137. ClientAddress: string(mc.clientHost),
  138. Version: util.Version(),
  139. }); err != nil {
  140. glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err)
  141. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
  142. return err
  143. }
  144. glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
  145. resp, err := stream.Recv()
  146. if err != nil {
  147. glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
  148. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
  149. return err
  150. }
  151. // check if it is the leader to determine whether to reset the vidMap
  152. if resp.VolumeLocation != nil {
  153. if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
  154. glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
  155. nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
  156. stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
  157. return nil
  158. }
  159. mc.vidMap = newVidMap("")
  160. mc.updateVidMap(resp)
  161. } else {
  162. mc.vidMap = newVidMap("")
  163. }
  164. mc.currentMaster = master
  165. for {
  166. resp, err := stream.Recv()
  167. if err != nil {
  168. glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
  169. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
  170. return err
  171. }
  172. if resp.VolumeLocation != nil {
  173. // maybe the leader is changed
  174. if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
  175. glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
  176. nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
  177. stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
  178. return nil
  179. }
  180. mc.updateVidMap(resp)
  181. }
  182. if resp.ClusterNodeUpdate != nil {
  183. update := resp.ClusterNodeUpdate
  184. if mc.OnPeerUpdate != nil {
  185. if update.FilerGroup == mc.FilerGroup {
  186. if update.IsAdd {
  187. glog.V(0).Infof("+ %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader)
  188. } else {
  189. glog.V(0).Infof("- %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader)
  190. }
  191. stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc()
  192. mc.OnPeerUpdate(update, time.Now())
  193. }
  194. }
  195. }
  196. }
  197. })
  198. if gprcErr != nil {
  199. stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
  200. glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
  201. }
  202. return
  203. }
  204. func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
  205. // process new volume location
  206. loc := Location{
  207. Url: resp.VolumeLocation.Url,
  208. PublicUrl: resp.VolumeLocation.PublicUrl,
  209. DataCenter: resp.VolumeLocation.DataCenter,
  210. GrpcPort: int(resp.VolumeLocation.GrpcPort),
  211. }
  212. for _, newVid := range resp.VolumeLocation.NewVids {
  213. glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid)
  214. mc.addLocation(newVid, loc)
  215. }
  216. for _, deletedVid := range resp.VolumeLocation.DeletedVids {
  217. glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid)
  218. mc.deleteLocation(deletedVid, loc)
  219. }
  220. for _, newEcVid := range resp.VolumeLocation.NewEcVids {
  221. glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid)
  222. mc.addEcLocation(newEcVid, loc)
  223. }
  224. for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
  225. glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid)
  226. mc.deleteEcLocation(deletedEcVid, loc)
  227. }
  228. }
  229. func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
  230. return util.Retry("master grpc", func() error {
  231. for mc.currentMaster == "" {
  232. time.Sleep(3 * time.Second)
  233. }
  234. return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
  235. return fn(client)
  236. })
  237. })
  238. }