You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
10 KiB

10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/golang/protobuf/proto"
  14. )
  15. const (
  16. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  17. )
  18. type MasterNodes struct {
  19. nodes []string
  20. lastNode int
  21. }
  22. func (mn *MasterNodes) String() string {
  23. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  24. }
  25. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  26. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  27. return
  28. }
  29. func (mn *MasterNodes) Reset() {
  30. glog.V(4).Infof("Resetting master nodes: %v", mn)
  31. if len(mn.nodes) > 1 && mn.lastNode >= 0 {
  32. glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
  33. mn.lastNode = -mn.lastNode - 1
  34. }
  35. }
  36. func (mn *MasterNodes) FindMaster() (string, error) {
  37. if len(mn.nodes) == 0 {
  38. return "", errors.New("No master node found!")
  39. }
  40. if mn.lastNode < 0 {
  41. for _, m := range mn.nodes {
  42. glog.V(4).Infof("Listing masters on %s", m)
  43. if masters, e := operation.ListMasters(m); e == nil {
  44. if len(masters) == 0 {
  45. continue
  46. }
  47. mn.nodes = append(masters, m)
  48. mn.lastNode = rand.Intn(len(mn.nodes))
  49. glog.V(2).Infof("current master nodes is %v", mn)
  50. break
  51. } else {
  52. glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
  53. }
  54. }
  55. }
  56. if mn.lastNode < 0 {
  57. return "", errors.New("No master node available!")
  58. }
  59. return mn.nodes[mn.lastNode], nil
  60. }
  61. /*
  62. * A VolumeServer contains one Store
  63. */
  64. type Store struct {
  65. Ip string
  66. Port int
  67. PublicUrl string
  68. Locations []*DiskLocation
  69. dataCenter string //optional informaton, overwriting master setting if exists
  70. rack string //optional information, overwriting master setting if exists
  71. connected bool
  72. volumeSizeLimit uint64 //read from the master
  73. masterNodes *MasterNodes
  74. }
  75. func (s *Store) String() (str string) {
  76. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  77. return
  78. }
  79. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  80. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  81. s.Locations = make([]*DiskLocation, 0)
  82. for i := 0; i < len(dirnames); i++ {
  83. location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
  84. location.loadExistingVolumes(needleMapKind)
  85. s.Locations = append(s.Locations, location)
  86. }
  87. return
  88. }
  89. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
  90. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  91. if e != nil {
  92. return e
  93. }
  94. ttl, e := ReadTTL(ttlString)
  95. if e != nil {
  96. return e
  97. }
  98. for _, range_string := range strings.Split(volumeListString, ",") {
  99. if strings.Index(range_string, "-") < 0 {
  100. id_string := range_string
  101. id, err := NewVolumeId(id_string)
  102. if err != nil {
  103. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  104. }
  105. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
  106. } else {
  107. pair := strings.Split(range_string, "-")
  108. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  109. if start_err != nil {
  110. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  111. }
  112. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  113. if end_err != nil {
  114. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  115. }
  116. for id := start; id <= end; id++ {
  117. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
  118. e = err
  119. }
  120. }
  121. }
  122. }
  123. return e
  124. }
  125. func (s *Store) DeleteCollection(collection string) (e error) {
  126. for _, location := range s.Locations {
  127. e = location.DeleteCollectionFromDiskLocation(collection)
  128. if e != nil {
  129. return
  130. }
  131. }
  132. return
  133. }
  134. func (s *Store) findVolume(vid VolumeId) *Volume {
  135. for _, location := range s.Locations {
  136. if v, found := location.volumes[vid]; found {
  137. return v
  138. }
  139. }
  140. return nil
  141. }
  142. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  143. max := 0
  144. for _, location := range s.Locations {
  145. currentFreeCount := location.MaxVolumeCount - len(location.volumes)
  146. if currentFreeCount > max {
  147. max = currentFreeCount
  148. ret = location
  149. }
  150. }
  151. return ret
  152. }
  153. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  154. if s.findVolume(vid) != nil {
  155. return fmt.Errorf("Volume Id %d already exists!", vid)
  156. }
  157. if location := s.findFreeLocation(); location != nil {
  158. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  159. location.Directory, vid, collection, replicaPlacement, ttl)
  160. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
  161. location.volumes[vid] = volume
  162. return nil
  163. } else {
  164. return err
  165. }
  166. }
  167. return fmt.Errorf("No more free space left")
  168. }
  169. func (s *Store) Status() []*VolumeInfo {
  170. var stats []*VolumeInfo
  171. for _, location := range s.Locations {
  172. for k, v := range location.volumes {
  173. s := &VolumeInfo{
  174. Id: VolumeId(k),
  175. Size: v.ContentSize(),
  176. Collection: v.Collection,
  177. ReplicaPlacement: v.ReplicaPlacement,
  178. Version: v.Version(),
  179. FileCount: v.nm.FileCount(),
  180. DeleteCount: v.nm.DeletedCount(),
  181. DeletedByteCount: v.nm.DeletedSize(),
  182. ReadOnly: v.readOnly,
  183. Ttl: v.Ttl}
  184. stats = append(stats, s)
  185. }
  186. }
  187. sortVolumeInfos(stats)
  188. return stats
  189. }
  190. func (s *Store) SetDataCenter(dataCenter string) {
  191. s.dataCenter = dataCenter
  192. }
  193. func (s *Store) SetRack(rack string) {
  194. s.rack = rack
  195. }
  196. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  197. s.masterNodes = NewMasterNodes(bootstrapMaster)
  198. }
  199. func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
  200. masterNode, e = s.masterNodes.FindMaster()
  201. if e != nil {
  202. return
  203. }
  204. var volumeMessages []*operation.VolumeInformationMessage
  205. maxVolumeCount := 0
  206. var maxFileKey uint64
  207. for _, location := range s.Locations {
  208. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  209. for k, v := range location.volumes {
  210. if maxFileKey < v.nm.MaxFileKey() {
  211. maxFileKey = v.nm.MaxFileKey()
  212. }
  213. if !v.expired(s.volumeSizeLimit) {
  214. volumeMessage := &operation.VolumeInformationMessage{
  215. Id: proto.Uint32(uint32(k)),
  216. Size: proto.Uint64(uint64(v.Size())),
  217. Collection: proto.String(v.Collection),
  218. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  219. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  220. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  221. ReadOnly: proto.Bool(v.readOnly),
  222. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  223. Version: proto.Uint32(uint32(v.Version())),
  224. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  225. }
  226. volumeMessages = append(volumeMessages, volumeMessage)
  227. } else {
  228. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  229. location.deleteVolumeById(v.Id)
  230. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  231. } else {
  232. glog.V(0).Infoln("volume", v.Id, "is expired.")
  233. }
  234. }
  235. }
  236. }
  237. joinMessage := &operation.JoinMessage{
  238. IsInit: proto.Bool(!s.connected),
  239. Ip: proto.String(s.Ip),
  240. Port: proto.Uint32(uint32(s.Port)),
  241. PublicUrl: proto.String(s.PublicUrl),
  242. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  243. MaxFileKey: proto.Uint64(maxFileKey),
  244. DataCenter: proto.String(s.dataCenter),
  245. Rack: proto.String(s.rack),
  246. Volumes: volumeMessages,
  247. }
  248. data, err := proto.Marshal(joinMessage)
  249. if err != nil {
  250. return "", "", err
  251. }
  252. joinUrl := "http://" + masterNode + "/dir/join"
  253. glog.V(4).Infof("Connecting to %s ...", joinUrl)
  254. jsonBlob, err := util.PostBytes(joinUrl, data)
  255. if err != nil {
  256. s.masterNodes.Reset()
  257. return "", "", err
  258. }
  259. var ret operation.JoinResult
  260. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  261. glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
  262. s.masterNodes.Reset()
  263. return masterNode, "", err
  264. }
  265. if ret.Error != "" {
  266. s.masterNodes.Reset()
  267. return masterNode, "", errors.New(ret.Error)
  268. }
  269. s.volumeSizeLimit = ret.VolumeSizeLimit
  270. secretKey = security.Secret(ret.SecretKey)
  271. s.connected = true
  272. return
  273. }
  274. func (s *Store) Close() {
  275. for _, location := range s.Locations {
  276. for _, v := range location.volumes {
  277. v.Close()
  278. }
  279. }
  280. }
  281. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  282. if v := s.findVolume(i); v != nil {
  283. if v.readOnly {
  284. err = fmt.Errorf("Volume %d is read only", i)
  285. return
  286. }
  287. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  288. size, err = v.write(n)
  289. } else {
  290. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  291. }
  292. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  293. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  294. if _, _, e := s.SendHeartbeatToMaster(); e != nil {
  295. glog.V(0).Infoln("error when reporting size:", e)
  296. }
  297. }
  298. return
  299. }
  300. glog.V(0).Infoln("volume", i, "not found!")
  301. err = fmt.Errorf("Volume %d not found!", i)
  302. return
  303. }
  304. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  305. if v := s.findVolume(i); v != nil && !v.readOnly {
  306. return v.delete(n)
  307. }
  308. return 0, nil
  309. }
  310. func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
  311. if v := s.findVolume(i); v != nil {
  312. return v.readNeedle(n)
  313. }
  314. return 0, fmt.Errorf("Volume %v not found!", i)
  315. }
  316. func (s *Store) GetVolume(i VolumeId) *Volume {
  317. return s.findVolume(i)
  318. }
  319. func (s *Store) HasVolume(i VolumeId) bool {
  320. v := s.findVolume(i)
  321. return v != nil
  322. }