You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.9 KiB

10 years ago
12 years ago
12 years ago
10 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. )
  11. const (
  12. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  13. )
  14. type MasterNodes struct {
  15. nodes []string
  16. leader string
  17. possibleLeader string
  18. }
  19. func (mn *MasterNodes) String() string {
  20. return fmt.Sprintf("nodes:%v, leader:%s", mn.nodes, mn.leader)
  21. }
  22. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  23. mn = &MasterNodes{nodes: []string{bootstrapNode}, leader: ""}
  24. return
  25. }
  26. func (mn *MasterNodes) Reset() {
  27. if mn.leader != "" {
  28. mn.leader = ""
  29. glog.V(0).Infof("Resetting master nodes: %v", mn)
  30. }
  31. }
  32. func (mn *MasterNodes) SetPossibleLeader(possibleLeader string) {
  33. // TODO try to check this leader first
  34. mn.possibleLeader = possibleLeader
  35. }
  36. func (mn *MasterNodes) FindMaster() (leader string, err error) {
  37. if len(mn.nodes) == 0 {
  38. return "", errors.New("No master node found!")
  39. }
  40. if mn.leader == "" {
  41. for _, m := range mn.nodes {
  42. glog.V(4).Infof("Listing masters on %s", m)
  43. if leader, masters, e := operation.ListMasters(m); e == nil {
  44. if leader != "" {
  45. mn.nodes = append(masters, m)
  46. mn.leader = leader
  47. glog.V(2).Infof("current master nodes is %v", mn)
  48. break
  49. }
  50. } else {
  51. glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
  52. }
  53. }
  54. }
  55. if mn.leader == "" {
  56. return "", errors.New("No master node available!")
  57. }
  58. return mn.leader, nil
  59. }
  60. /*
  61. * A VolumeServer contains one Store
  62. */
  63. type Store struct {
  64. Ip string
  65. Port int
  66. PublicUrl string
  67. Locations []*DiskLocation
  68. dataCenter string //optional informaton, overwriting master setting if exists
  69. rack string //optional information, overwriting master setting if exists
  70. connected bool
  71. VolumeSizeLimit uint64 //read from the master
  72. Client pb.Seaweed_SendHeartbeatClient
  73. }
  74. func (s *Store) String() (str string) {
  75. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
  76. return
  77. }
  78. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  79. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  80. s.Locations = make([]*DiskLocation, 0)
  81. for i := 0; i < len(dirnames); i++ {
  82. location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
  83. location.loadExistingVolumes(needleMapKind)
  84. s.Locations = append(s.Locations, location)
  85. }
  86. return
  87. }
  88. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
  89. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  90. if e != nil {
  91. return e
  92. }
  93. ttl, e := ReadTTL(ttlString)
  94. if e != nil {
  95. return e
  96. }
  97. for _, range_string := range strings.Split(volumeListString, ",") {
  98. if strings.Index(range_string, "-") < 0 {
  99. id_string := range_string
  100. id, err := NewVolumeId(id_string)
  101. if err != nil {
  102. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  103. }
  104. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate)
  105. } else {
  106. pair := strings.Split(range_string, "-")
  107. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  108. if start_err != nil {
  109. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  110. }
  111. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  112. if end_err != nil {
  113. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  114. }
  115. for id := start; id <= end; id++ {
  116. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate); err != nil {
  117. e = err
  118. }
  119. }
  120. }
  121. }
  122. return e
  123. }
  124. func (s *Store) DeleteCollection(collection string) (e error) {
  125. for _, location := range s.Locations {
  126. e = location.DeleteCollectionFromDiskLocation(collection)
  127. if e != nil {
  128. return
  129. }
  130. }
  131. return
  132. }
  133. func (s *Store) findVolume(vid VolumeId) *Volume {
  134. for _, location := range s.Locations {
  135. if v, found := location.FindVolume(vid); found {
  136. return v
  137. }
  138. }
  139. return nil
  140. }
  141. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  142. max := 0
  143. for _, location := range s.Locations {
  144. currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
  145. if currentFreeCount > max {
  146. max = currentFreeCount
  147. ret = location
  148. }
  149. }
  150. return ret
  151. }
  152. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) error {
  153. if s.findVolume(vid) != nil {
  154. return fmt.Errorf("Volume Id %d already exists!", vid)
  155. }
  156. if location := s.findFreeLocation(); location != nil {
  157. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  158. location.Directory, vid, collection, replicaPlacement, ttl)
  159. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
  160. location.SetVolume(vid, volume)
  161. return nil
  162. } else {
  163. return err
  164. }
  165. }
  166. return fmt.Errorf("No more free space left")
  167. }
  168. func (s *Store) Status() []*VolumeInfo {
  169. var stats []*VolumeInfo
  170. for _, location := range s.Locations {
  171. location.RLock()
  172. for k, v := range location.volumes {
  173. s := &VolumeInfo{
  174. Id: VolumeId(k),
  175. Size: v.ContentSize(),
  176. Collection: v.Collection,
  177. ReplicaPlacement: v.ReplicaPlacement,
  178. Version: v.Version(),
  179. FileCount: v.nm.FileCount(),
  180. DeleteCount: v.nm.DeletedCount(),
  181. DeletedByteCount: v.nm.DeletedSize(),
  182. ReadOnly: v.readOnly,
  183. Ttl: v.Ttl}
  184. stats = append(stats, s)
  185. }
  186. location.RUnlock()
  187. }
  188. sortVolumeInfos(stats)
  189. return stats
  190. }
  191. func (s *Store) SetDataCenter(dataCenter string) {
  192. s.dataCenter = dataCenter
  193. }
  194. func (s *Store) SetRack(rack string) {
  195. s.rack = rack
  196. }
  197. func (s *Store) CollectHeartbeat() *pb.Heartbeat {
  198. var volumeMessages []*pb.VolumeInformationMessage
  199. maxVolumeCount := 0
  200. var maxFileKey uint64
  201. for _, location := range s.Locations {
  202. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  203. location.Lock()
  204. for k, v := range location.volumes {
  205. if maxFileKey < v.nm.MaxFileKey() {
  206. maxFileKey = v.nm.MaxFileKey()
  207. }
  208. if !v.expired(s.VolumeSizeLimit) {
  209. volumeMessage := &pb.VolumeInformationMessage{
  210. Id: uint32(k),
  211. Size: uint64(v.Size()),
  212. Collection: v.Collection,
  213. FileCount: uint64(v.nm.FileCount()),
  214. DeleteCount: uint64(v.nm.DeletedCount()),
  215. DeletedByteCount: v.nm.DeletedSize(),
  216. ReadOnly: v.readOnly,
  217. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  218. Version: uint32(v.Version()),
  219. Ttl: v.Ttl.ToUint32(),
  220. }
  221. volumeMessages = append(volumeMessages, volumeMessage)
  222. } else {
  223. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  224. location.deleteVolumeById(v.Id)
  225. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  226. } else {
  227. glog.V(0).Infoln("volume", v.Id, "is expired.")
  228. }
  229. }
  230. }
  231. location.Unlock()
  232. }
  233. return &pb.Heartbeat{
  234. Ip: s.Ip,
  235. Port: uint32(s.Port),
  236. PublicUrl: s.PublicUrl,
  237. MaxVolumeCount: uint32(maxVolumeCount),
  238. MaxFileKey: maxFileKey,
  239. DataCenter: s.dataCenter,
  240. Rack: s.rack,
  241. Volumes: volumeMessages,
  242. }
  243. }
  244. func (s *Store) Close() {
  245. for _, location := range s.Locations {
  246. location.Close()
  247. }
  248. }
  249. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  250. if v := s.findVolume(i); v != nil {
  251. if v.readOnly {
  252. err = fmt.Errorf("Volume %d is read only", i)
  253. return
  254. }
  255. // TODO: count needle size ahead
  256. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  257. size, err = v.writeNeedle(n)
  258. } else {
  259. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
  260. }
  261. if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) {
  262. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit)
  263. if s.Client != nil {
  264. if e := s.Client.Send(s.CollectHeartbeat()); e != nil {
  265. glog.V(0).Infoln("error when reporting size:", e)
  266. }
  267. }
  268. }
  269. return
  270. }
  271. glog.V(0).Infoln("volume", i, "not found!")
  272. err = fmt.Errorf("Volume %d not found!", i)
  273. return
  274. }
  275. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  276. if v := s.findVolume(i); v != nil && !v.readOnly {
  277. return v.deleteNeedle(n)
  278. }
  279. return 0, nil
  280. }
  281. func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
  282. if v := s.findVolume(i); v != nil {
  283. return v.readNeedle(n)
  284. }
  285. return 0, fmt.Errorf("Volume %v not found!", i)
  286. }
  287. func (s *Store) GetVolume(i VolumeId) *Volume {
  288. return s.findVolume(i)
  289. }
  290. func (s *Store) HasVolume(i VolumeId) bool {
  291. v := s.findVolume(i)
  292. return v != nil
  293. }