You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
9.8 KiB

6 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
12 years ago
12 years ago
6 years ago
10 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/stats"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "google.golang.org/grpc"
  11. )
  12. const (
  13. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  14. )
  15. /*
  16. * A VolumeServer contains one Store
  17. */
  18. type Store struct {
  19. MasterAddress string
  20. grpcDialOption grpc.DialOption
  21. volumeSizeLimit uint64 //read from the master
  22. Ip string
  23. Port int
  24. PublicUrl string
  25. Locations []*DiskLocation
  26. dataCenter string //optional informaton, overwriting master setting if exists
  27. rack string //optional information, overwriting master setting if exists
  28. connected bool
  29. NeedleMapType NeedleMapType
  30. NewVolumesChan chan master_pb.VolumeShortInformationMessage
  31. DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
  32. NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
  33. DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
  34. }
  35. func (s *Store) String() (str string) {
  36. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
  37. return
  38. }
  39. func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  40. s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
  41. s.Locations = make([]*DiskLocation, 0)
  42. for i := 0; i < len(dirnames); i++ {
  43. location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
  44. location.loadExistingVolumes(needleMapKind)
  45. s.Locations = append(s.Locations, location)
  46. }
  47. s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
  48. s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
  49. s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
  50. s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
  51. return
  52. }
  53. func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
  54. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  55. if e != nil {
  56. return e
  57. }
  58. ttl, e := needle.ReadTTL(ttlString)
  59. if e != nil {
  60. return e
  61. }
  62. e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate)
  63. return e
  64. }
  65. func (s *Store) DeleteCollection(collection string) (e error) {
  66. for _, location := range s.Locations {
  67. e = location.DeleteCollectionFromDiskLocation(collection)
  68. if e != nil {
  69. return
  70. }
  71. // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
  72. }
  73. return
  74. }
  75. func (s *Store) findVolume(vid needle.VolumeId) *Volume {
  76. for _, location := range s.Locations {
  77. if v, found := location.FindVolume(vid); found {
  78. return v
  79. }
  80. }
  81. return nil
  82. }
  83. func (s *Store) FindFreeLocation() (ret *DiskLocation) {
  84. max := 0
  85. for _, location := range s.Locations {
  86. currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
  87. if currentFreeCount > max {
  88. max = currentFreeCount
  89. ret = location
  90. }
  91. }
  92. return ret
  93. }
  94. func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) error {
  95. if s.findVolume(vid) != nil {
  96. return fmt.Errorf("Volume Id %d already exists!", vid)
  97. }
  98. if location := s.FindFreeLocation(); location != nil {
  99. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  100. location.Directory, vid, collection, replicaPlacement, ttl)
  101. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
  102. location.SetVolume(vid, volume)
  103. glog.V(0).Infof("add volume %d", vid)
  104. s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
  105. Id: uint32(vid),
  106. Collection: collection,
  107. ReplicaPlacement: uint32(replicaPlacement.Byte()),
  108. Version: uint32(volume.Version()),
  109. Ttl: ttl.ToUint32(),
  110. }
  111. return nil
  112. } else {
  113. return err
  114. }
  115. }
  116. return fmt.Errorf("No more free space left")
  117. }
  118. func (s *Store) Status() []*VolumeInfo {
  119. var stats []*VolumeInfo
  120. for _, location := range s.Locations {
  121. location.RLock()
  122. for k, v := range location.volumes {
  123. s := &VolumeInfo{
  124. Id: needle.VolumeId(k),
  125. Size: v.ContentSize(),
  126. Collection: v.Collection,
  127. ReplicaPlacement: v.ReplicaPlacement,
  128. Version: v.Version(),
  129. FileCount: v.nm.FileCount(),
  130. DeleteCount: v.nm.DeletedCount(),
  131. DeletedByteCount: v.nm.DeletedSize(),
  132. ReadOnly: v.readOnly,
  133. Ttl: v.Ttl,
  134. CompactRevision: uint32(v.CompactionRevision),
  135. }
  136. stats = append(stats, s)
  137. }
  138. location.RUnlock()
  139. }
  140. sortVolumeInfos(stats)
  141. return stats
  142. }
  143. func (s *Store) SetDataCenter(dataCenter string) {
  144. s.dataCenter = dataCenter
  145. }
  146. func (s *Store) SetRack(rack string) {
  147. s.rack = rack
  148. }
  149. func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
  150. var volumeMessages []*master_pb.VolumeInformationMessage
  151. maxVolumeCount := 0
  152. var maxFileKey NeedleId
  153. collectionVolumeSize := make(map[string]uint64)
  154. for _, location := range s.Locations {
  155. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  156. location.Lock()
  157. for _, v := range location.volumes {
  158. if maxFileKey < v.nm.MaxFileKey() {
  159. maxFileKey = v.nm.MaxFileKey()
  160. }
  161. if !v.expired(s.GetVolumeSizeLimit()) {
  162. volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
  163. } else {
  164. if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  165. location.deleteVolumeById(v.Id)
  166. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  167. } else {
  168. glog.V(0).Infoln("volume", v.Id, "is expired.")
  169. }
  170. }
  171. fileSize, _, _ := v.FileStat()
  172. collectionVolumeSize[v.Collection] += fileSize
  173. }
  174. location.Unlock()
  175. }
  176. for col, size := range collectionVolumeSize {
  177. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
  178. }
  179. return &master_pb.Heartbeat{
  180. Ip: s.Ip,
  181. Port: uint32(s.Port),
  182. PublicUrl: s.PublicUrl,
  183. MaxVolumeCount: uint32(maxVolumeCount),
  184. MaxFileKey: NeedleIdToUint64(maxFileKey),
  185. DataCenter: s.dataCenter,
  186. Rack: s.rack,
  187. Volumes: volumeMessages,
  188. HasNoVolumes: len(volumeMessages) == 0,
  189. }
  190. }
  191. func (s *Store) Close() {
  192. for _, location := range s.Locations {
  193. location.Close()
  194. }
  195. }
  196. func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
  197. if v := s.findVolume(i); v != nil {
  198. if v.readOnly {
  199. err = fmt.Errorf("volume %d is read only", i)
  200. return
  201. }
  202. // TODO: count needle size ahead
  203. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  204. _, size, isUnchanged, err = v.writeNeedle(n)
  205. } else {
  206. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
  207. }
  208. return
  209. }
  210. glog.V(0).Infoln("volume", i, "not found!")
  211. err = fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
  212. return
  213. }
  214. func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) {
  215. if v := s.findVolume(i); v != nil && !v.readOnly {
  216. return v.deleteNeedle(n)
  217. }
  218. return 0, nil
  219. }
  220. func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
  221. if v := s.findVolume(i); v != nil {
  222. return v.readNeedle(n)
  223. }
  224. return 0, fmt.Errorf("volume %d not found", i)
  225. }
  226. func (s *Store) GetVolume(i needle.VolumeId) *Volume {
  227. return s.findVolume(i)
  228. }
  229. func (s *Store) HasVolume(i needle.VolumeId) bool {
  230. v := s.findVolume(i)
  231. return v != nil
  232. }
  233. func (s *Store) MountVolume(i needle.VolumeId) error {
  234. for _, location := range s.Locations {
  235. if found := location.LoadVolume(i, s.NeedleMapType); found == true {
  236. glog.V(0).Infof("mount volume %d", i)
  237. v := s.findVolume(i)
  238. s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
  239. Id: uint32(v.Id),
  240. Collection: v.Collection,
  241. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  242. Version: uint32(v.Version()),
  243. Ttl: v.Ttl.ToUint32(),
  244. }
  245. return nil
  246. }
  247. }
  248. return fmt.Errorf("volume %d not found on disk", i)
  249. }
  250. func (s *Store) UnmountVolume(i needle.VolumeId) error {
  251. v := s.findVolume(i)
  252. if v == nil {
  253. return nil
  254. }
  255. message := master_pb.VolumeShortInformationMessage{
  256. Id: uint32(v.Id),
  257. Collection: v.Collection,
  258. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  259. Version: uint32(v.Version()),
  260. Ttl: v.Ttl.ToUint32(),
  261. }
  262. for _, location := range s.Locations {
  263. if err := location.UnloadVolume(i); err == nil {
  264. glog.V(0).Infof("UnmountVolume %d", i)
  265. s.DeletedVolumesChan <- message
  266. return nil
  267. }
  268. }
  269. return fmt.Errorf("volume %d not found on disk", i)
  270. }
  271. func (s *Store) DeleteVolume(i needle.VolumeId) error {
  272. v := s.findVolume(i)
  273. if v == nil {
  274. return nil
  275. }
  276. message := master_pb.VolumeShortInformationMessage{
  277. Id: uint32(v.Id),
  278. Collection: v.Collection,
  279. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  280. Version: uint32(v.Version()),
  281. Ttl: v.Ttl.ToUint32(),
  282. }
  283. for _, location := range s.Locations {
  284. if error := location.deleteVolumeById(i); error == nil {
  285. glog.V(0).Infof("DeleteVolume %d", i)
  286. s.DeletedVolumesChan <- message
  287. return nil
  288. }
  289. }
  290. return fmt.Errorf("volume %d not found on disk", i)
  291. }
  292. func (s *Store) SetVolumeSizeLimit(x uint64) {
  293. atomic.StoreUint64(&s.volumeSizeLimit, x)
  294. }
  295. func (s *Store) GetVolumeSizeLimit() uint64 {
  296. return atomic.LoadUint64(&s.volumeSizeLimit)
  297. }