You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.8 KiB

6 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "net/http"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. "github.com/seaweedfs/seaweedfs/weed/storage"
  15. )
  16. type VolumeServer struct {
  17. volume_server_pb.UnimplementedVolumeServerServer
  18. inFlightUploadDataSize int64
  19. inFlightDownloadDataSize int64
  20. concurrentUploadLimit int64
  21. concurrentDownloadLimit int64
  22. inFlightUploadDataLimitCond *sync.Cond
  23. inFlightDownloadDataLimitCond *sync.Cond
  24. inflightUploadDataTimeout time.Duration
  25. hasSlowRead bool
  26. SeedMasterNodes []pb.ServerAddress
  27. currentMaster pb.ServerAddress
  28. pulseSeconds int
  29. dataCenter string
  30. rack string
  31. store *storage.Store
  32. guard *security.Guard
  33. grpcDialOption grpc.DialOption
  34. needleMapKind storage.NeedleMapKind
  35. FixJpgOrientation bool
  36. ReadMode string
  37. compactionBytePerSecond int64
  38. metricsAddress string
  39. metricsIntervalSec int
  40. fileSizeLimitBytes int64
  41. isHeartbeating bool
  42. stopChan chan bool
  43. }
  44. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  45. port int, grpcPort int, publicUrl string,
  46. folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  47. idxFolder string,
  48. needleMapKind storage.NeedleMapKind,
  49. masterNodes []pb.ServerAddress, pulseSeconds int,
  50. dataCenter string, rack string,
  51. whiteList []string,
  52. fixJpgOrientation bool,
  53. readMode string,
  54. compactionMBPerSecond int,
  55. fileSizeLimitMB int,
  56. concurrentUploadLimit int64,
  57. concurrentDownloadLimit int64,
  58. inflightUploadDataTimeout time.Duration,
  59. hasSlowRead bool,
  60. ) *VolumeServer {
  61. v := util.GetViper()
  62. signingKey := v.GetString("jwt.signing.key")
  63. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  64. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  65. enableUiAccess := v.GetBool("access.ui")
  66. readSigningKey := v.GetString("jwt.signing.read.key")
  67. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  68. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  69. vs := &VolumeServer{
  70. pulseSeconds: pulseSeconds,
  71. dataCenter: dataCenter,
  72. rack: rack,
  73. needleMapKind: needleMapKind,
  74. FixJpgOrientation: fixJpgOrientation,
  75. ReadMode: readMode,
  76. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  77. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  78. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  79. isHeartbeating: true,
  80. stopChan: make(chan bool),
  81. inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  82. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  83. concurrentUploadLimit: concurrentUploadLimit,
  84. concurrentDownloadLimit: concurrentDownloadLimit,
  85. inflightUploadDataTimeout: inflightUploadDataTimeout,
  86. hasSlowRead: hasSlowRead,
  87. }
  88. vs.SeedMasterNodes = masterNodes
  89. vs.checkWithMaster()
  90. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
  91. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  92. handleStaticResources(adminMux)
  93. adminMux.HandleFunc("/status", vs.statusHandler)
  94. adminMux.HandleFunc("/healthz", vs.healthzHandler)
  95. if signingKey == "" || enableUiAccess {
  96. // only expose the volume server details for safe environments
  97. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  98. /*
  99. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  100. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  101. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  102. */
  103. }
  104. adminMux.HandleFunc("/", vs.privateStoreHandler)
  105. if publicMux != adminMux {
  106. // separated admin and public port
  107. handleStaticResources(publicMux)
  108. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  109. }
  110. go vs.heartbeat()
  111. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  112. return vs
  113. }
  114. func (vs *VolumeServer) SetStopping() {
  115. glog.V(0).Infoln("Stopping volume server...")
  116. vs.store.SetStopping()
  117. }
  118. func (vs *VolumeServer) Shutdown() {
  119. glog.V(0).Infoln("Shutting down volume server...")
  120. vs.store.Close()
  121. glog.V(0).Infoln("Shut down successfully!")
  122. }