You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
11 KiB

10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "strconv"
  9. "strings"
  10. "github.com/chrislusf/seaweedfs/go/glog"
  11. "github.com/chrislusf/seaweedfs/go/operation"
  12. "github.com/chrislusf/seaweedfs/go/security"
  13. "github.com/chrislusf/seaweedfs/go/util"
  14. "github.com/golang/protobuf/proto"
  15. )
  16. const (
  17. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. MaxVolumeCount int
  22. volumes map[VolumeId]*Volume
  23. }
  24. func (mn *DiskLocation) reset() {
  25. }
  26. type MasterNodes struct {
  27. nodes []string
  28. lastNode int
  29. }
  30. func (mn *MasterNodes) String() string {
  31. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  32. }
  33. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  34. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  35. return
  36. }
  37. func (mn *MasterNodes) reset() {
  38. if len(mn.nodes) > 1 && mn.lastNode > 0 {
  39. mn.lastNode = -mn.lastNode
  40. }
  41. }
  42. func (mn *MasterNodes) findMaster() (string, error) {
  43. if len(mn.nodes) == 0 {
  44. return "", errors.New("No master node found!")
  45. }
  46. if mn.lastNode < 0 {
  47. for _, m := range mn.nodes {
  48. glog.V(4).Infof("Listing masters on %s", m)
  49. if masters, e := operation.ListMasters(m); e == nil {
  50. if len(masters) == 0 {
  51. continue
  52. }
  53. mn.nodes = masters
  54. mn.lastNode = rand.Intn(len(mn.nodes))
  55. glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
  56. break
  57. } else {
  58. glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
  59. }
  60. }
  61. }
  62. if mn.lastNode < 0 {
  63. return "", errors.New("No master node available!")
  64. }
  65. return mn.nodes[mn.lastNode], nil
  66. }
  67. /*
  68. * A VolumeServer contains one Store
  69. */
  70. type Store struct {
  71. Ip string
  72. Port int
  73. PublicUrl string
  74. Locations []*DiskLocation
  75. dataCenter string //optional informaton, overwriting master setting if exists
  76. rack string //optional information, overwriting master setting if exists
  77. connected bool
  78. volumeSizeLimit uint64 //read from the master
  79. masterNodes *MasterNodes
  80. }
  81. func (s *Store) String() (str string) {
  82. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  83. return
  84. }
  85. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  86. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  87. s.Locations = make([]*DiskLocation, 0)
  88. for i := 0; i < len(dirnames); i++ {
  89. location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
  90. location.volumes = make(map[VolumeId]*Volume)
  91. location.loadExistingVolumes(needleMapKind)
  92. s.Locations = append(s.Locations, location)
  93. }
  94. return
  95. }
  96. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
  97. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  98. if e != nil {
  99. return e
  100. }
  101. ttl, e := ReadTTL(ttlString)
  102. if e != nil {
  103. return e
  104. }
  105. for _, range_string := range strings.Split(volumeListString, ",") {
  106. if strings.Index(range_string, "-") < 0 {
  107. id_string := range_string
  108. id, err := NewVolumeId(id_string)
  109. if err != nil {
  110. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  111. }
  112. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
  113. } else {
  114. pair := strings.Split(range_string, "-")
  115. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  116. if start_err != nil {
  117. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  118. }
  119. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  120. if end_err != nil {
  121. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  122. }
  123. for id := start; id <= end; id++ {
  124. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
  125. e = err
  126. }
  127. }
  128. }
  129. }
  130. return e
  131. }
  132. func (s *Store) DeleteCollection(collection string) (e error) {
  133. for _, location := range s.Locations {
  134. for k, v := range location.volumes {
  135. if v.Collection == collection {
  136. e = v.Destroy()
  137. if e != nil {
  138. return
  139. }
  140. delete(location.volumes, k)
  141. }
  142. }
  143. }
  144. return
  145. }
  146. func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
  147. e = v.Destroy()
  148. if e != nil {
  149. return
  150. }
  151. delete(volumes, v.Id)
  152. return
  153. }
  154. func (s *Store) findVolume(vid VolumeId) *Volume {
  155. for _, location := range s.Locations {
  156. if v, found := location.volumes[vid]; found {
  157. return v
  158. }
  159. }
  160. return nil
  161. }
  162. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  163. max := 0
  164. for _, location := range s.Locations {
  165. currentFreeCount := location.MaxVolumeCount - len(location.volumes)
  166. if currentFreeCount > max {
  167. max = currentFreeCount
  168. ret = location
  169. }
  170. }
  171. return ret
  172. }
  173. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  174. if s.findVolume(vid) != nil {
  175. return fmt.Errorf("Volume Id %d already exists!", vid)
  176. }
  177. if location := s.findFreeLocation(); location != nil {
  178. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  179. location.Directory, vid, collection, replicaPlacement, ttl)
  180. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
  181. location.volumes[vid] = volume
  182. return nil
  183. } else {
  184. return err
  185. }
  186. }
  187. return fmt.Errorf("No more free space left")
  188. }
  189. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  190. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  191. for _, dir := range dirs {
  192. name := dir.Name()
  193. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  194. collection := ""
  195. base := name[:len(name)-len(".dat")]
  196. i := strings.Index(base, "_")
  197. if i > 0 {
  198. collection, base = base[0:i], base[i+1:]
  199. }
  200. if vid, err := NewVolumeId(base); err == nil {
  201. if l.volumes[vid] == nil {
  202. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
  203. l.volumes[vid] = v
  204. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  205. }
  206. }
  207. }
  208. }
  209. }
  210. }
  211. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  212. }
  213. func (s *Store) Status() []*VolumeInfo {
  214. var stats []*VolumeInfo
  215. for _, location := range s.Locations {
  216. for k, v := range location.volumes {
  217. s := &VolumeInfo{
  218. Id: VolumeId(k),
  219. Size: v.ContentSize(),
  220. Collection: v.Collection,
  221. ReplicaPlacement: v.ReplicaPlacement,
  222. Version: v.Version(),
  223. FileCount: v.nm.FileCount(),
  224. DeleteCount: v.nm.DeletedCount(),
  225. DeletedByteCount: v.nm.DeletedSize(),
  226. ReadOnly: v.readOnly,
  227. Ttl: v.Ttl}
  228. stats = append(stats, s)
  229. }
  230. }
  231. sortVolumeInfos(stats)
  232. return stats
  233. }
  234. func (s *Store) SetDataCenter(dataCenter string) {
  235. s.dataCenter = dataCenter
  236. }
  237. func (s *Store) SetRack(rack string) {
  238. s.rack = rack
  239. }
  240. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  241. s.masterNodes = NewMasterNodes(bootstrapMaster)
  242. }
  243. func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
  244. masterNode, e = s.masterNodes.findMaster()
  245. if e != nil {
  246. return
  247. }
  248. var volumeMessages []*operation.VolumeInformationMessage
  249. maxVolumeCount := 0
  250. var maxFileKey uint64
  251. for _, location := range s.Locations {
  252. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  253. for k, v := range location.volumes {
  254. if maxFileKey < v.nm.MaxFileKey() {
  255. maxFileKey = v.nm.MaxFileKey()
  256. }
  257. if !v.expired(s.volumeSizeLimit) {
  258. volumeMessage := &operation.VolumeInformationMessage{
  259. Id: proto.Uint32(uint32(k)),
  260. Size: proto.Uint64(uint64(v.Size())),
  261. Collection: proto.String(v.Collection),
  262. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  263. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  264. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  265. ReadOnly: proto.Bool(v.readOnly),
  266. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  267. Version: proto.Uint32(uint32(v.Version())),
  268. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  269. }
  270. volumeMessages = append(volumeMessages, volumeMessage)
  271. } else {
  272. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  273. s.DeleteVolume(location.volumes, v)
  274. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  275. } else {
  276. glog.V(0).Infoln("volume", v.Id, "is expired.")
  277. }
  278. }
  279. }
  280. }
  281. joinMessage := &operation.JoinMessage{
  282. IsInit: proto.Bool(!s.connected),
  283. Ip: proto.String(s.Ip),
  284. Port: proto.Uint32(uint32(s.Port)),
  285. PublicUrl: proto.String(s.PublicUrl),
  286. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  287. MaxFileKey: proto.Uint64(maxFileKey),
  288. DataCenter: proto.String(s.dataCenter),
  289. Rack: proto.String(s.rack),
  290. Volumes: volumeMessages,
  291. }
  292. data, err := proto.Marshal(joinMessage)
  293. if err != nil {
  294. return "", "", err
  295. }
  296. joinUrl := "http://" + masterNode + "/dir/join"
  297. glog.V(4).Infof("Connecting to %s ...", joinUrl)
  298. jsonBlob, err := util.PostBytes(joinUrl, data)
  299. if err != nil {
  300. s.masterNodes.reset()
  301. return "", "", err
  302. }
  303. var ret operation.JoinResult
  304. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  305. glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
  306. return masterNode, "", err
  307. }
  308. if ret.Error != "" {
  309. return masterNode, "", errors.New(ret.Error)
  310. }
  311. s.volumeSizeLimit = ret.VolumeSizeLimit
  312. secretKey = security.Secret(ret.SecretKey)
  313. s.connected = true
  314. return
  315. }
  316. func (s *Store) Close() {
  317. for _, location := range s.Locations {
  318. for _, v := range location.volumes {
  319. v.Close()
  320. }
  321. }
  322. }
  323. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  324. if v := s.findVolume(i); v != nil {
  325. if v.readOnly {
  326. err = fmt.Errorf("Volume %d is read only", i)
  327. return
  328. }
  329. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  330. size, err = v.write(n)
  331. } else {
  332. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  333. }
  334. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  335. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  336. if _, _, e := s.SendHeartbeatToMaster(); e != nil {
  337. glog.V(0).Infoln("error when reporting size:", e)
  338. }
  339. }
  340. return
  341. }
  342. glog.V(0).Infoln("volume", i, "not found!")
  343. err = fmt.Errorf("Volume %d not found!", i)
  344. return
  345. }
  346. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  347. if v := s.findVolume(i); v != nil && !v.readOnly {
  348. return v.delete(n)
  349. }
  350. return 0, nil
  351. }
  352. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  353. if v := s.findVolume(i); v != nil {
  354. return v.read(n)
  355. }
  356. return 0, fmt.Errorf("Volume %v not found!", i)
  357. }
  358. func (s *Store) GetVolume(i VolumeId) *Volume {
  359. return s.findVolume(i)
  360. }
  361. func (s *Store) HasVolume(i VolumeId) bool {
  362. v := s.findVolume(i)
  363. return v != nil
  364. }