You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
11 KiB

10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "strconv"
  9. "strings"
  10. "github.com/chrislusf/seaweedfs/go/glog"
  11. "github.com/chrislusf/seaweedfs/go/operation"
  12. "github.com/chrislusf/seaweedfs/go/security"
  13. "github.com/chrislusf/seaweedfs/go/util"
  14. "github.com/golang/protobuf/proto"
  15. )
  16. const (
  17. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. MaxVolumeCount int
  22. volumes map[VolumeId]*Volume
  23. }
  24. func (mn *DiskLocation) reset() {
  25. }
  26. type MasterNodes struct {
  27. nodes []string
  28. lastNode int
  29. }
  30. func (mn *MasterNodes) String() string {
  31. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  32. }
  33. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  34. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  35. return
  36. }
  37. func (mn *MasterNodes) reset() {
  38. glog.V(4).Infof("Resetting master nodes: %v", mn)
  39. if len(mn.nodes) > 1 && mn.lastNode > 0 {
  40. glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
  41. mn.lastNode = -mn.lastNode
  42. }
  43. }
  44. func (mn *MasterNodes) findMaster() (string, error) {
  45. if len(mn.nodes) == 0 {
  46. return "", errors.New("No master node found!")
  47. }
  48. if mn.lastNode < 0 {
  49. for _, m := range mn.nodes {
  50. glog.V(4).Infof("Listing masters on %s", m)
  51. if masters, e := operation.ListMasters(m); e == nil {
  52. if len(masters) == 0 {
  53. continue
  54. }
  55. mn.nodes = append(masters, m)
  56. mn.lastNode = rand.Intn(len(mn.nodes))
  57. glog.V(2).Infof("current master nodes is %v", mn)
  58. break
  59. } else {
  60. glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
  61. }
  62. }
  63. }
  64. if mn.lastNode < 0 {
  65. return "", errors.New("No master node available!")
  66. }
  67. return mn.nodes[mn.lastNode], nil
  68. }
  69. /*
  70. * A VolumeServer contains one Store
  71. */
  72. type Store struct {
  73. Ip string
  74. Port int
  75. PublicUrl string
  76. Locations []*DiskLocation
  77. dataCenter string //optional informaton, overwriting master setting if exists
  78. rack string //optional information, overwriting master setting if exists
  79. connected bool
  80. volumeSizeLimit uint64 //read from the master
  81. masterNodes *MasterNodes
  82. }
  83. func (s *Store) String() (str string) {
  84. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  85. return
  86. }
  87. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  88. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  89. s.Locations = make([]*DiskLocation, 0)
  90. for i := 0; i < len(dirnames); i++ {
  91. location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
  92. location.volumes = make(map[VolumeId]*Volume)
  93. location.loadExistingVolumes(needleMapKind)
  94. s.Locations = append(s.Locations, location)
  95. }
  96. return
  97. }
  98. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
  99. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  100. if e != nil {
  101. return e
  102. }
  103. ttl, e := ReadTTL(ttlString)
  104. if e != nil {
  105. return e
  106. }
  107. for _, range_string := range strings.Split(volumeListString, ",") {
  108. if strings.Index(range_string, "-") < 0 {
  109. id_string := range_string
  110. id, err := NewVolumeId(id_string)
  111. if err != nil {
  112. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  113. }
  114. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
  115. } else {
  116. pair := strings.Split(range_string, "-")
  117. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  118. if start_err != nil {
  119. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  120. }
  121. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  122. if end_err != nil {
  123. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  124. }
  125. for id := start; id <= end; id++ {
  126. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
  127. e = err
  128. }
  129. }
  130. }
  131. }
  132. return e
  133. }
  134. func (s *Store) DeleteCollection(collection string) (e error) {
  135. for _, location := range s.Locations {
  136. for k, v := range location.volumes {
  137. if v.Collection == collection {
  138. e = v.Destroy()
  139. if e != nil {
  140. return
  141. }
  142. delete(location.volumes, k)
  143. }
  144. }
  145. }
  146. return
  147. }
  148. func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
  149. e = v.Destroy()
  150. if e != nil {
  151. return
  152. }
  153. delete(volumes, v.Id)
  154. return
  155. }
  156. func (s *Store) findVolume(vid VolumeId) *Volume {
  157. for _, location := range s.Locations {
  158. if v, found := location.volumes[vid]; found {
  159. return v
  160. }
  161. }
  162. return nil
  163. }
  164. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  165. max := 0
  166. for _, location := range s.Locations {
  167. currentFreeCount := location.MaxVolumeCount - len(location.volumes)
  168. if currentFreeCount > max {
  169. max = currentFreeCount
  170. ret = location
  171. }
  172. }
  173. return ret
  174. }
  175. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  176. if s.findVolume(vid) != nil {
  177. return fmt.Errorf("Volume Id %d already exists!", vid)
  178. }
  179. if location := s.findFreeLocation(); location != nil {
  180. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  181. location.Directory, vid, collection, replicaPlacement, ttl)
  182. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
  183. location.volumes[vid] = volume
  184. return nil
  185. } else {
  186. return err
  187. }
  188. }
  189. return fmt.Errorf("No more free space left")
  190. }
  191. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  192. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  193. for _, dir := range dirs {
  194. name := dir.Name()
  195. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  196. collection := ""
  197. base := name[:len(name)-len(".dat")]
  198. i := strings.Index(base, "_")
  199. if i > 0 {
  200. collection, base = base[0:i], base[i+1:]
  201. }
  202. if vid, err := NewVolumeId(base); err == nil {
  203. if l.volumes[vid] == nil {
  204. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
  205. l.volumes[vid] = v
  206. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  207. }
  208. }
  209. }
  210. }
  211. }
  212. }
  213. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  214. }
  215. func (s *Store) Status() []*VolumeInfo {
  216. var stats []*VolumeInfo
  217. for _, location := range s.Locations {
  218. for k, v := range location.volumes {
  219. s := &VolumeInfo{
  220. Id: VolumeId(k),
  221. Size: v.ContentSize(),
  222. Collection: v.Collection,
  223. ReplicaPlacement: v.ReplicaPlacement,
  224. Version: v.Version(),
  225. FileCount: v.nm.FileCount(),
  226. DeleteCount: v.nm.DeletedCount(),
  227. DeletedByteCount: v.nm.DeletedSize(),
  228. ReadOnly: v.readOnly,
  229. Ttl: v.Ttl}
  230. stats = append(stats, s)
  231. }
  232. }
  233. sortVolumeInfos(stats)
  234. return stats
  235. }
  236. func (s *Store) SetDataCenter(dataCenter string) {
  237. s.dataCenter = dataCenter
  238. }
  239. func (s *Store) SetRack(rack string) {
  240. s.rack = rack
  241. }
  242. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  243. s.masterNodes = NewMasterNodes(bootstrapMaster)
  244. }
  245. func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
  246. masterNode, e = s.masterNodes.findMaster()
  247. if e != nil {
  248. return
  249. }
  250. var volumeMessages []*operation.VolumeInformationMessage
  251. maxVolumeCount := 0
  252. var maxFileKey uint64
  253. for _, location := range s.Locations {
  254. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  255. for k, v := range location.volumes {
  256. if maxFileKey < v.nm.MaxFileKey() {
  257. maxFileKey = v.nm.MaxFileKey()
  258. }
  259. if !v.expired(s.volumeSizeLimit) {
  260. volumeMessage := &operation.VolumeInformationMessage{
  261. Id: proto.Uint32(uint32(k)),
  262. Size: proto.Uint64(uint64(v.Size())),
  263. Collection: proto.String(v.Collection),
  264. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  265. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  266. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  267. ReadOnly: proto.Bool(v.readOnly),
  268. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  269. Version: proto.Uint32(uint32(v.Version())),
  270. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  271. }
  272. volumeMessages = append(volumeMessages, volumeMessage)
  273. } else {
  274. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  275. s.DeleteVolume(location.volumes, v)
  276. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  277. } else {
  278. glog.V(0).Infoln("volume", v.Id, "is expired.")
  279. }
  280. }
  281. }
  282. }
  283. joinMessage := &operation.JoinMessage{
  284. IsInit: proto.Bool(!s.connected),
  285. Ip: proto.String(s.Ip),
  286. Port: proto.Uint32(uint32(s.Port)),
  287. PublicUrl: proto.String(s.PublicUrl),
  288. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  289. MaxFileKey: proto.Uint64(maxFileKey),
  290. DataCenter: proto.String(s.dataCenter),
  291. Rack: proto.String(s.rack),
  292. Volumes: volumeMessages,
  293. }
  294. data, err := proto.Marshal(joinMessage)
  295. if err != nil {
  296. return "", "", err
  297. }
  298. joinUrl := "http://" + masterNode + "/dir/join"
  299. glog.V(4).Infof("Connecting to %s ...", joinUrl)
  300. jsonBlob, err := util.PostBytes(joinUrl, data)
  301. if err != nil {
  302. s.masterNodes.reset()
  303. return "", "", err
  304. }
  305. var ret operation.JoinResult
  306. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  307. glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
  308. s.masterNodes.reset()
  309. return masterNode, "", err
  310. }
  311. if ret.Error != "" {
  312. s.masterNodes.reset()
  313. return masterNode, "", errors.New(ret.Error)
  314. }
  315. s.volumeSizeLimit = ret.VolumeSizeLimit
  316. secretKey = security.Secret(ret.SecretKey)
  317. s.connected = true
  318. return
  319. }
  320. func (s *Store) Close() {
  321. for _, location := range s.Locations {
  322. for _, v := range location.volumes {
  323. v.Close()
  324. }
  325. }
  326. }
  327. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  328. if v := s.findVolume(i); v != nil {
  329. if v.readOnly {
  330. err = fmt.Errorf("Volume %d is read only", i)
  331. return
  332. }
  333. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  334. size, err = v.write(n)
  335. } else {
  336. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  337. }
  338. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  339. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  340. if _, _, e := s.SendHeartbeatToMaster(); e != nil {
  341. glog.V(0).Infoln("error when reporting size:", e)
  342. }
  343. }
  344. return
  345. }
  346. glog.V(0).Infoln("volume", i, "not found!")
  347. err = fmt.Errorf("Volume %d not found!", i)
  348. return
  349. }
  350. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  351. if v := s.findVolume(i); v != nil && !v.readOnly {
  352. return v.delete(n)
  353. }
  354. return 0, nil
  355. }
  356. func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
  357. if v := s.findVolume(i); v != nil {
  358. return v.readNeedle(n)
  359. }
  360. return 0, fmt.Errorf("Volume %v not found!", i)
  361. }
  362. func (s *Store) GetVolume(i VolumeId) *Volume {
  363. return s.findVolume(i)
  364. }
  365. func (s *Store) HasVolume(i VolumeId) bool {
  366. v := s.findVolume(i)
  367. return v != nil
  368. }