You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
5.5 KiB

6 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "net/http"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. "github.com/seaweedfs/seaweedfs/weed/storage"
  15. )
  16. type VolumeServer struct {
  17. volume_server_pb.UnimplementedVolumeServerServer
  18. inFlightUploadDataSize int64
  19. inFlightDownloadDataSize int64
  20. concurrentUploadLimit int64
  21. concurrentDownloadLimit int64
  22. inFlightUploadDataLimitCond *sync.Cond
  23. inFlightDownloadDataLimitCond *sync.Cond
  24. inflightUploadDataTimeout time.Duration
  25. hasSlowRead bool
  26. readBufferSizeMB int
  27. SeedMasterNodes []pb.ServerAddress
  28. whiteList []string
  29. currentMaster pb.ServerAddress
  30. pulseSeconds int
  31. dataCenter string
  32. rack string
  33. store *storage.Store
  34. guard *security.Guard
  35. grpcDialOption grpc.DialOption
  36. needleMapKind storage.NeedleMapKind
  37. ldbTimout int64
  38. FixJpgOrientation bool
  39. ReadMode string
  40. compactionBytePerSecond int64
  41. metricsAddress string
  42. metricsIntervalSec int
  43. fileSizeLimitBytes int64
  44. isHeartbeating bool
  45. stopChan chan bool
  46. }
  47. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  48. port int, grpcPort int, publicUrl string,
  49. folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  50. idxFolder string,
  51. needleMapKind storage.NeedleMapKind,
  52. masterNodes []pb.ServerAddress, pulseSeconds int,
  53. dataCenter string, rack string,
  54. whiteList []string,
  55. fixJpgOrientation bool,
  56. readMode string,
  57. compactionMBPerSecond int,
  58. fileSizeLimitMB int,
  59. concurrentUploadLimit int64,
  60. concurrentDownloadLimit int64,
  61. inflightUploadDataTimeout time.Duration,
  62. hasSlowRead bool,
  63. readBufferSizeMB int,
  64. ldbTimeout int64,
  65. ) *VolumeServer {
  66. v := util.GetViper()
  67. signingKey := v.GetString("jwt.signing.key")
  68. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  69. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  70. enableUiAccess := v.GetBool("access.ui")
  71. readSigningKey := v.GetString("jwt.signing.read.key")
  72. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  73. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  74. vs := &VolumeServer{
  75. pulseSeconds: pulseSeconds,
  76. dataCenter: dataCenter,
  77. rack: rack,
  78. needleMapKind: needleMapKind,
  79. FixJpgOrientation: fixJpgOrientation,
  80. ReadMode: readMode,
  81. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  82. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  83. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  84. isHeartbeating: true,
  85. stopChan: make(chan bool),
  86. inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  87. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  88. concurrentUploadLimit: concurrentUploadLimit,
  89. concurrentDownloadLimit: concurrentDownloadLimit,
  90. inflightUploadDataTimeout: inflightUploadDataTimeout,
  91. hasSlowRead: hasSlowRead,
  92. readBufferSizeMB: readBufferSizeMB,
  93. ldbTimout: ldbTimeout,
  94. whiteList: whiteList,
  95. }
  96. whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
  97. vs.SeedMasterNodes = masterNodes
  98. vs.checkWithMaster()
  99. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
  100. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  101. handleStaticResources(adminMux)
  102. adminMux.HandleFunc("/status", vs.statusHandler)
  103. adminMux.HandleFunc("/healthz", vs.healthzHandler)
  104. if signingKey == "" || enableUiAccess {
  105. // only expose the volume server details for safe environments
  106. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  107. /*
  108. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  109. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  110. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  111. */
  112. }
  113. adminMux.HandleFunc("/", vs.privateStoreHandler)
  114. if publicMux != adminMux {
  115. // separated admin and public port
  116. handleStaticResources(publicMux)
  117. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  118. }
  119. go vs.heartbeat()
  120. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  121. return vs
  122. }
  123. func (vs *VolumeServer) SetStopping() {
  124. glog.V(0).Infoln("Stopping volume server...")
  125. vs.store.SetStopping()
  126. }
  127. func (vs *VolumeServer) LoadNewVolumes() {
  128. glog.V(0).Infoln(" Loading new volume ids ...")
  129. vs.store.LoadNewVolumes()
  130. }
  131. func (vs *VolumeServer) Shutdown() {
  132. glog.V(0).Infoln("Shutting down volume server...")
  133. vs.store.Close()
  134. glog.V(0).Infoln("Shut down successfully!")
  135. }
  136. func (vs *VolumeServer) Reload() {
  137. glog.V(0).Infoln("Reload volume server...")
  138. util.LoadConfiguration("security", false)
  139. v := util.GetViper()
  140. vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...))
  141. }