You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
4.4 KiB

6 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "net/http"
  4. "sync"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/storage"
  14. )
  15. type VolumeServer struct {
  16. volume_server_pb.UnimplementedVolumeServerServer
  17. inFlightUploadDataSize int64
  18. inFlightDownloadDataSize int64
  19. concurrentUploadLimit int64
  20. concurrentDownloadLimit int64
  21. inFlightDownloadDataLimitCond *sync.Cond
  22. SeedMasterNodes []pb.ServerAddress
  23. currentMaster pb.ServerAddress
  24. pulseSeconds int
  25. dataCenter string
  26. rack string
  27. store *storage.Store
  28. guard *security.Guard
  29. grpcDialOption grpc.DialOption
  30. needleMapKind storage.NeedleMapKind
  31. FixJpgOrientation bool
  32. ReadMode string
  33. compactionBytePerSecond int64
  34. metricsAddress string
  35. metricsIntervalSec int
  36. fileSizeLimitBytes int64
  37. isHeartbeating bool
  38. stopChan chan bool
  39. }
  40. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  41. port int, grpcPort int, publicUrl string,
  42. folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  43. idxFolder string,
  44. needleMapKind storage.NeedleMapKind,
  45. masterNodes []pb.ServerAddress, pulseSeconds int,
  46. dataCenter string, rack string,
  47. whiteList []string,
  48. fixJpgOrientation bool,
  49. readMode string,
  50. compactionMBPerSecond int,
  51. fileSizeLimitMB int,
  52. concurrentUploadLimit int64,
  53. concurrentDownloadLimit int64,
  54. ) *VolumeServer {
  55. v := util.GetViper()
  56. signingKey := v.GetString("jwt.signing.key")
  57. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  58. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  59. enableUiAccess := v.GetBool("access.ui")
  60. readSigningKey := v.GetString("jwt.signing.read.key")
  61. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  62. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  63. vs := &VolumeServer{
  64. pulseSeconds: pulseSeconds,
  65. dataCenter: dataCenter,
  66. rack: rack,
  67. needleMapKind: needleMapKind,
  68. FixJpgOrientation: fixJpgOrientation,
  69. ReadMode: readMode,
  70. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  71. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  72. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  73. isHeartbeating: true,
  74. stopChan: make(chan bool),
  75. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  76. concurrentUploadLimit: concurrentUploadLimit,
  77. concurrentDownloadLimit: concurrentDownloadLimit,
  78. }
  79. vs.SeedMasterNodes = masterNodes
  80. vs.checkWithMaster()
  81. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
  82. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  83. handleStaticResources(adminMux)
  84. adminMux.HandleFunc("/status", vs.statusHandler)
  85. adminMux.HandleFunc("/healthz", vs.healthzHandler)
  86. if signingKey == "" || enableUiAccess {
  87. // only expose the volume server details for safe environments
  88. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  89. /*
  90. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  91. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  92. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  93. */
  94. }
  95. adminMux.HandleFunc("/", vs.privateStoreHandler)
  96. if publicMux != adminMux {
  97. // separated admin and public port
  98. handleStaticResources(publicMux)
  99. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  100. }
  101. go vs.heartbeat()
  102. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  103. return vs
  104. }
  105. func (vs *VolumeServer) SetStopping() {
  106. glog.V(0).Infoln("Stopping volume server...")
  107. vs.store.SetStopping()
  108. }
  109. func (vs *VolumeServer) Shutdown() {
  110. glog.V(0).Infoln("Shutting down volume server...")
  111. vs.store.Close()
  112. glog.V(0).Infoln("Shut down successfully!")
  113. }