You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
11 KiB

10 years ago
10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "strconv"
  9. "strings"
  10. "github.com/chrislusf/weed-fs/go/glog"
  11. "github.com/chrislusf/weed-fs/go/operation"
  12. "github.com/chrislusf/weed-fs/go/security"
  13. "github.com/chrislusf/weed-fs/go/util"
  14. "github.com/golang/protobuf/proto"
  15. )
  16. const (
  17. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. MaxVolumeCount int
  22. volumes map[VolumeId]*Volume
  23. }
  24. func (mn *DiskLocation) reset() {
  25. }
  26. type MasterNodes struct {
  27. nodes []string
  28. lastNode int
  29. }
  30. func (mn *MasterNodes) String() string {
  31. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  32. }
  33. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  34. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  35. return
  36. }
  37. func (mn *MasterNodes) reset() {
  38. if len(mn.nodes) > 1 && mn.lastNode > 0 {
  39. mn.lastNode = -mn.lastNode
  40. }
  41. }
  42. func (mn *MasterNodes) findMaster() (string, error) {
  43. if len(mn.nodes) == 0 {
  44. return "", errors.New("No master node found!")
  45. }
  46. if mn.lastNode < 0 {
  47. for _, m := range mn.nodes {
  48. if masters, e := operation.ListMasters(m); e == nil {
  49. if len(masters) == 0 {
  50. continue
  51. }
  52. mn.nodes = masters
  53. mn.lastNode = rand.Intn(len(mn.nodes))
  54. glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
  55. break
  56. }
  57. }
  58. }
  59. if mn.lastNode < 0 {
  60. return "", errors.New("No master node available!")
  61. }
  62. return mn.nodes[mn.lastNode], nil
  63. }
  64. /*
  65. * A VolumeServer contains one Store
  66. */
  67. type Store struct {
  68. Ip string
  69. Port int
  70. PublicUrl string
  71. Locations []*DiskLocation
  72. dataCenter string //optional informaton, overwriting master setting if exists
  73. rack string //optional information, overwriting master setting if exists
  74. connected bool
  75. volumeSizeLimit uint64 //read from the master
  76. masterNodes *MasterNodes
  77. }
  78. func (s *Store) String() (str string) {
  79. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  80. return
  81. }
  82. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  83. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  84. s.Locations = make([]*DiskLocation, 0)
  85. for i := 0; i < len(dirnames); i++ {
  86. location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
  87. location.volumes = make(map[VolumeId]*Volume)
  88. location.loadExistingVolumes(needleMapKind)
  89. s.Locations = append(s.Locations, location)
  90. }
  91. return
  92. }
  93. func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
  94. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  95. if e != nil {
  96. return e
  97. }
  98. ttl, e := ReadTTL(ttlString)
  99. if e != nil {
  100. return e
  101. }
  102. for _, range_string := range strings.Split(volumeListString, ",") {
  103. if strings.Index(range_string, "-") < 0 {
  104. id_string := range_string
  105. id, err := NewVolumeId(id_string)
  106. if err != nil {
  107. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  108. }
  109. e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
  110. } else {
  111. pair := strings.Split(range_string, "-")
  112. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  113. if start_err != nil {
  114. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  115. }
  116. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  117. if end_err != nil {
  118. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  119. }
  120. for id := start; id <= end; id++ {
  121. if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
  122. e = err
  123. }
  124. }
  125. }
  126. }
  127. return e
  128. }
  129. func (s *Store) DeleteCollection(collection string) (e error) {
  130. for _, location := range s.Locations {
  131. for k, v := range location.volumes {
  132. if v.Collection == collection {
  133. e = v.Destroy()
  134. if e != nil {
  135. return
  136. }
  137. delete(location.volumes, k)
  138. }
  139. }
  140. }
  141. return
  142. }
  143. func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
  144. e = v.Destroy()
  145. if e != nil {
  146. return
  147. }
  148. delete(volumes, v.Id)
  149. return
  150. }
  151. func (s *Store) findVolume(vid VolumeId) *Volume {
  152. for _, location := range s.Locations {
  153. if v, found := location.volumes[vid]; found {
  154. return v
  155. }
  156. }
  157. return nil
  158. }
  159. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  160. max := 0
  161. for _, location := range s.Locations {
  162. currentFreeCount := location.MaxVolumeCount - len(location.volumes)
  163. if currentFreeCount > max {
  164. max = currentFreeCount
  165. ret = location
  166. }
  167. }
  168. return ret
  169. }
  170. func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  171. if s.findVolume(vid) != nil {
  172. return fmt.Errorf("Volume Id %d already exists!", vid)
  173. }
  174. if location := s.findFreeLocation(); location != nil {
  175. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  176. location.Directory, vid, collection, replicaPlacement, ttl)
  177. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
  178. location.volumes[vid] = volume
  179. return nil
  180. } else {
  181. return err
  182. }
  183. }
  184. return fmt.Errorf("No more free space left")
  185. }
  186. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  187. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  188. for _, dir := range dirs {
  189. name := dir.Name()
  190. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  191. collection := ""
  192. base := name[:len(name)-len(".dat")]
  193. i := strings.Index(base, "_")
  194. if i > 0 {
  195. collection, base = base[0:i], base[i+1:]
  196. }
  197. if vid, err := NewVolumeId(base); err == nil {
  198. if l.volumes[vid] == nil {
  199. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
  200. l.volumes[vid] = v
  201. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  202. }
  203. }
  204. }
  205. }
  206. }
  207. }
  208. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  209. }
  210. func (s *Store) Status() []*VolumeInfo {
  211. var stats []*VolumeInfo
  212. for _, location := range s.Locations {
  213. for k, v := range location.volumes {
  214. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  215. Collection: v.Collection,
  216. ReplicaPlacement: v.ReplicaPlacement,
  217. Version: v.Version(),
  218. FileCount: v.nm.FileCount(),
  219. DeleteCount: v.nm.DeletedCount(),
  220. DeletedByteCount: v.nm.DeletedSize(),
  221. ReadOnly: v.readOnly,
  222. Ttl: v.Ttl}
  223. stats = append(stats, s)
  224. }
  225. }
  226. return stats
  227. }
  228. func (s *Store) SetDataCenter(dataCenter string) {
  229. s.dataCenter = dataCenter
  230. }
  231. func (s *Store) SetRack(rack string) {
  232. s.rack = rack
  233. }
  234. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  235. s.masterNodes = NewMasterNodes(bootstrapMaster)
  236. }
  237. func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
  238. masterNode, e = s.masterNodes.findMaster()
  239. if e != nil {
  240. return
  241. }
  242. var volumeMessages []*operation.VolumeInformationMessage
  243. maxVolumeCount := 0
  244. var maxFileKey uint64
  245. for _, location := range s.Locations {
  246. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  247. for k, v := range location.volumes {
  248. if maxFileKey < v.nm.MaxFileKey() {
  249. maxFileKey = v.nm.MaxFileKey()
  250. }
  251. if !v.expired(s.volumeSizeLimit) {
  252. volumeMessage := &operation.VolumeInformationMessage{
  253. Id: proto.Uint32(uint32(k)),
  254. Size: proto.Uint64(uint64(v.Size())),
  255. Collection: proto.String(v.Collection),
  256. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  257. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  258. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  259. ReadOnly: proto.Bool(v.readOnly),
  260. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  261. Version: proto.Uint32(uint32(v.Version())),
  262. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  263. }
  264. volumeMessages = append(volumeMessages, volumeMessage)
  265. } else {
  266. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  267. s.DeleteVolume(location.volumes, v)
  268. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  269. } else {
  270. glog.V(0).Infoln("volume", v.Id, "is expired.")
  271. }
  272. }
  273. }
  274. }
  275. joinMessage := &operation.JoinMessage{
  276. IsInit: proto.Bool(!s.connected),
  277. Ip: proto.String(s.Ip),
  278. Port: proto.Uint32(uint32(s.Port)),
  279. PublicUrl: proto.String(s.PublicUrl),
  280. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  281. MaxFileKey: proto.Uint64(maxFileKey),
  282. DataCenter: proto.String(s.dataCenter),
  283. Rack: proto.String(s.rack),
  284. Volumes: volumeMessages,
  285. }
  286. data, err := proto.Marshal(joinMessage)
  287. if err != nil {
  288. return "", "", err
  289. }
  290. joinUrl := "http://" + masterNode + "/dir/join"
  291. jsonBlob, err := util.PostBytes(joinUrl, data)
  292. if err != nil {
  293. s.masterNodes.reset()
  294. return "", "", err
  295. }
  296. var ret operation.JoinResult
  297. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  298. glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
  299. return masterNode, "", err
  300. }
  301. if ret.Error != "" {
  302. return masterNode, "", errors.New(ret.Error)
  303. }
  304. s.volumeSizeLimit = ret.VolumeSizeLimit
  305. secretKey = security.Secret(ret.SecretKey)
  306. s.connected = true
  307. return
  308. }
  309. func (s *Store) Close() {
  310. for _, location := range s.Locations {
  311. for _, v := range location.volumes {
  312. v.Close()
  313. }
  314. }
  315. }
  316. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  317. if v := s.findVolume(i); v != nil {
  318. if v.readOnly {
  319. err = fmt.Errorf("Volume %d is read only", i)
  320. return
  321. }
  322. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  323. size, err = v.write(n)
  324. } else {
  325. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  326. }
  327. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  328. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  329. if _, _, e := s.SendHeartbeatToMaster(); e != nil {
  330. glog.V(0).Infoln("error when reporting size:", e)
  331. }
  332. }
  333. return
  334. }
  335. glog.V(0).Infoln("volume", i, "not found!")
  336. err = fmt.Errorf("Volume %d not found!", i)
  337. return
  338. }
  339. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  340. if v := s.findVolume(i); v != nil && !v.readOnly {
  341. return v.delete(n)
  342. }
  343. return 0, nil
  344. }
  345. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  346. if v := s.findVolume(i); v != nil {
  347. return v.read(n)
  348. }
  349. return 0, fmt.Errorf("Volume %v not found!", i)
  350. }
  351. func (s *Store) GetVolume(i VolumeId) *Volume {
  352. return s.findVolume(i)
  353. }
  354. func (s *Store) HasVolume(i VolumeId) bool {
  355. v := s.findVolume(i)
  356. return v != nil
  357. }