You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
12 KiB

7 years ago
2 years ago
1 year ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
1 year ago
1 year ago
2 years ago
6 years ago
  1. package wdclient
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/stats"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  14. )
  15. type MasterClient struct {
  16. FilerGroup string
  17. clientType string
  18. clientHost pb.ServerAddress
  19. rack string
  20. currentMaster pb.ServerAddress
  21. currentMasterLock sync.RWMutex
  22. masters pb.ServerDiscovery
  23. grpcDialOption grpc.DialOption
  24. *vidMap
  25. vidMapCacheSize int
  26. OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
  27. OnPeerUpdateLock sync.RWMutex
  28. }
  29. func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
  30. return &MasterClient{
  31. FilerGroup: filerGroup,
  32. clientType: clientType,
  33. clientHost: clientHost,
  34. rack: rack,
  35. masters: masters,
  36. grpcDialOption: grpcDialOption,
  37. vidMap: newVidMap(clientDataCenter),
  38. vidMapCacheSize: 5,
  39. }
  40. }
  41. func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
  42. mc.OnPeerUpdateLock.Lock()
  43. mc.OnPeerUpdate = onPeerUpdate
  44. mc.OnPeerUpdateLock.Unlock()
  45. }
  46. func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
  47. return mc.LookupFileIdWithFallback
  48. }
  49. func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
  50. fullUrls, err = mc.vidMap.LookupFileId(fileId)
  51. if err == nil && len(fullUrls) > 0 {
  52. return
  53. }
  54. err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  55. resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  56. VolumeOrFileIds: []string{fileId},
  57. })
  58. if err != nil {
  59. return fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
  60. }
  61. for vid, vidLocation := range resp.VolumeIdLocations {
  62. for _, vidLoc := range vidLocation.Locations {
  63. loc := Location{
  64. Url: vidLoc.Url,
  65. PublicUrl: vidLoc.PublicUrl,
  66. GrpcPort: int(vidLoc.GrpcPort),
  67. DataCenter: vidLoc.DataCenter,
  68. DataInRemote: vidLoc.DataInRemote,
  69. }
  70. glog.V(4).Infof("found location %s for %s, data in remote: %v", loc.Url, fileId, loc.DataInRemote)
  71. mc.vidMap.addLocation(uint32(vid), loc)
  72. httpUrl := "http://" + loc.Url + "/" + fileId
  73. // Prefer same data center
  74. if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
  75. fullUrls = append([]string{httpUrl}, fullUrls...)
  76. } else {
  77. fullUrls = append(fullUrls, httpUrl)
  78. }
  79. }
  80. }
  81. return nil
  82. })
  83. return
  84. }
  85. func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
  86. mc.currentMasterLock.RLock()
  87. defer mc.currentMasterLock.RUnlock()
  88. return mc.currentMaster
  89. }
  90. func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
  91. mc.currentMasterLock.Lock()
  92. mc.currentMaster = master
  93. mc.currentMasterLock.Unlock()
  94. }
  95. func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
  96. mc.WaitUntilConnected(ctx)
  97. return mc.getCurrentMaster()
  98. }
  99. func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
  100. mc.WaitUntilConnected(ctx)
  101. return mc.masters.GetInstances()
  102. }
  103. func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
  104. for {
  105. select {
  106. case <-ctx.Done():
  107. glog.V(0).Infof("Connection wait stopped: %v", ctx.Err())
  108. return
  109. default:
  110. if mc.getCurrentMaster() != "" {
  111. return
  112. }
  113. time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
  114. print(".")
  115. }
  116. }
  117. }
  118. func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
  119. glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
  124. return
  125. default:
  126. mc.tryAllMasters(ctx)
  127. time.Sleep(time.Second)
  128. }
  129. }
  130. }
  131. func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
  132. for _, master := range mc.masters.GetInstances() {
  133. if master == myMasterAddress {
  134. continue
  135. }
  136. if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  137. ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
  138. defer cancel()
  139. resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
  140. if err != nil {
  141. return err
  142. }
  143. leader = resp.Leader
  144. return nil
  145. }); grpcErr != nil {
  146. glog.V(0).Infof("connect to %s: %v", master, grpcErr)
  147. }
  148. if leader != "" {
  149. glog.V(0).Infof("existing leader is %s", leader)
  150. return
  151. }
  152. }
  153. glog.V(0).Infof("No existing leader found!")
  154. return
  155. }
  156. func (mc *MasterClient) tryAllMasters(ctx context.Context) {
  157. var nextHintedLeader pb.ServerAddress
  158. mc.masters.RefreshBySrvIfAvailable()
  159. for _, master := range mc.masters.GetInstances() {
  160. nextHintedLeader = mc.tryConnectToMaster(ctx, master)
  161. for nextHintedLeader != "" {
  162. select {
  163. case <-ctx.Done():
  164. glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
  165. return
  166. default:
  167. nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader)
  168. }
  169. }
  170. mc.setCurrentMaster("")
  171. }
  172. }
  173. func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
  174. glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
  175. stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
  176. gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  177. ctx, cancel := context.WithCancel(ctx)
  178. defer cancel()
  179. stream, err := client.KeepConnected(ctx)
  180. if err != nil {
  181. glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err)
  182. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
  183. return err
  184. }
  185. if err = stream.Send(&master_pb.KeepConnectedRequest{
  186. FilerGroup: mc.FilerGroup,
  187. DataCenter: mc.DataCenter,
  188. Rack: mc.rack,
  189. ClientType: mc.clientType,
  190. ClientAddress: string(mc.clientHost),
  191. Version: util.Version(),
  192. }); err != nil {
  193. glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err)
  194. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
  195. return err
  196. }
  197. glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
  198. resp, err := stream.Recv()
  199. if err != nil {
  200. glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
  201. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
  202. return err
  203. }
  204. // check if it is the leader to determine whether to reset the vidMap
  205. if resp.VolumeLocation != nil {
  206. if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
  207. glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
  208. nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
  209. stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
  210. return nil
  211. }
  212. mc.resetVidMap()
  213. mc.updateVidMap(resp)
  214. } else {
  215. mc.resetVidMap()
  216. }
  217. mc.setCurrentMaster(master)
  218. for {
  219. resp, err := stream.Recv()
  220. if err != nil {
  221. glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
  222. stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
  223. return err
  224. }
  225. if resp.VolumeLocation != nil {
  226. // maybe the leader is changed
  227. if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
  228. glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
  229. nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
  230. stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
  231. return nil
  232. }
  233. mc.updateVidMap(resp)
  234. }
  235. if resp.ClusterNodeUpdate != nil {
  236. update := resp.ClusterNodeUpdate
  237. mc.OnPeerUpdateLock.RLock()
  238. if mc.OnPeerUpdate != nil {
  239. if update.FilerGroup == mc.FilerGroup {
  240. if update.IsAdd {
  241. glog.V(0).Infof("+ %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
  242. } else {
  243. glog.V(0).Infof("- %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
  244. }
  245. stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc()
  246. mc.OnPeerUpdate(update, time.Now())
  247. }
  248. }
  249. mc.OnPeerUpdateLock.RUnlock()
  250. }
  251. if err := ctx.Err(); err != nil {
  252. glog.V(0).Infof("Connection attempt to master stopped: %v", err)
  253. return err
  254. }
  255. }
  256. })
  257. if gprcErr != nil {
  258. stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
  259. glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
  260. }
  261. return
  262. }
  263. func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
  264. if resp.VolumeLocation.IsEmptyUrl() {
  265. glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp)
  266. return
  267. }
  268. // process new volume location
  269. loc := Location{
  270. Url: resp.VolumeLocation.Url,
  271. PublicUrl: resp.VolumeLocation.PublicUrl,
  272. DataCenter: resp.VolumeLocation.DataCenter,
  273. GrpcPort: int(resp.VolumeLocation.GrpcPort),
  274. }
  275. for _, newVid := range resp.VolumeLocation.NewVids {
  276. glog.V(2).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid)
  277. mc.addLocation(newVid, loc)
  278. }
  279. for _, remoteVid := range resp.VolumeLocation.RemoteVids {
  280. loc.DataInRemote = true
  281. glog.V(2).Infof("%s.%s: %s masterClient adds remote volume %d", mc.FilerGroup, mc.clientType, loc.Url, remoteVid)
  282. mc.addLocation(remoteVid, loc)
  283. }
  284. for _, deletedVid := range resp.VolumeLocation.DeletedVids {
  285. glog.V(2).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid)
  286. mc.deleteLocation(deletedVid, loc)
  287. }
  288. for _, newEcVid := range resp.VolumeLocation.NewEcVids {
  289. glog.V(2).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid)
  290. mc.addEcLocation(newEcVid, loc)
  291. }
  292. for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
  293. glog.V(2).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid)
  294. mc.deleteEcLocation(deletedEcVid, loc)
  295. }
  296. glog.V(1).Infof("updateVidMap(%s) %s.%s: %s volume add local: %d, remote: %d, del: %d, add ec: %d del ec: %d",
  297. resp.VolumeLocation.DataCenter, mc.FilerGroup, mc.clientType, loc.Url,
  298. len(resp.VolumeLocation.NewVids), len(resp.VolumeLocation.RemoteVids),
  299. len(resp.VolumeLocation.DeletedVids), len(resp.VolumeLocation.NewEcVids),
  300. len(resp.VolumeLocation.DeletedEcVids))
  301. }
  302. func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
  303. getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) }
  304. return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
  305. }
  306. func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
  307. return util.Retry("master grpc", func() error {
  308. return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  309. return fn(client)
  310. })
  311. })
  312. }
  313. func (mc *MasterClient) resetVidMap() {
  314. tail := &vidMap{
  315. vid2Locations: mc.vid2Locations,
  316. ecVid2Locations: mc.ecVid2Locations,
  317. DataCenter: mc.DataCenter,
  318. cache: mc.cache,
  319. }
  320. nvm := newVidMap(mc.DataCenter)
  321. nvm.cache = tail
  322. mc.vidMap = nvm
  323. //trim
  324. for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
  325. if i == mc.vidMapCacheSize-1 {
  326. tail.cache = nil
  327. } else {
  328. tail = tail.cache
  329. }
  330. }
  331. }