You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

388 lines
11 KiB

10 years ago
10 years ago
12 years ago
12 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "math/rand"
  8. "strconv"
  9. "strings"
  10. "github.com/chrislusf/weed-fs/go/glog"
  11. "github.com/chrislusf/weed-fs/go/operation"
  12. "github.com/chrislusf/weed-fs/go/security"
  13. "github.com/chrislusf/weed-fs/go/util"
  14. "github.com/golang/protobuf/proto"
  15. )
  16. const (
  17. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  18. )
  19. type DiskLocation struct {
  20. Directory string
  21. MaxVolumeCount int
  22. volumes map[VolumeId]*Volume
  23. }
  24. func (mn *DiskLocation) reset() {
  25. }
  26. type MasterNodes struct {
  27. nodes []string
  28. lastNode int
  29. }
  30. func (mn *MasterNodes) String() string {
  31. return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
  32. }
  33. func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
  34. mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
  35. return
  36. }
  37. func (mn *MasterNodes) reset() {
  38. if len(mn.nodes) > 1 && mn.lastNode > 0 {
  39. mn.lastNode = -mn.lastNode
  40. }
  41. }
  42. func (mn *MasterNodes) findMaster() (string, error) {
  43. if len(mn.nodes) == 0 {
  44. return "", errors.New("No master node found!")
  45. }
  46. if mn.lastNode < 0 {
  47. for _, m := range mn.nodes {
  48. if masters, e := operation.ListMasters(m); e == nil {
  49. if len(masters) == 0 {
  50. continue
  51. }
  52. mn.nodes = masters
  53. mn.lastNode = rand.Intn(len(mn.nodes))
  54. glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
  55. break
  56. }
  57. }
  58. }
  59. if mn.lastNode < 0 {
  60. return "", errors.New("No master node available!")
  61. }
  62. return mn.nodes[mn.lastNode], nil
  63. }
  64. /*
  65. * A VolumeServer contains one Store
  66. */
  67. type Store struct {
  68. Ip string
  69. Port int
  70. AdminPort int
  71. PublicUrl string
  72. Locations []*DiskLocation
  73. dataCenter string //optional informaton, overwriting master setting if exists
  74. rack string //optional information, overwriting master setting if exists
  75. connected bool
  76. volumeSizeLimit uint64 //read from the master
  77. masterNodes *MasterNodes
  78. }
  79. func (s *Store) String() (str string) {
  80. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
  81. return
  82. }
  83. func NewStore(port, adminPort int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
  84. s = &Store{Port: port, AdminPort: adminPort, Ip: ip, PublicUrl: publicUrl}
  85. s.Locations = make([]*DiskLocation, 0)
  86. for i := 0; i < len(dirnames); i++ {
  87. location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
  88. location.volumes = make(map[VolumeId]*Volume)
  89. location.loadExistingVolumes()
  90. s.Locations = append(s.Locations, location)
  91. }
  92. return
  93. }
  94. func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
  95. rt, e := NewReplicaPlacementFromString(replicaPlacement)
  96. if e != nil {
  97. return e
  98. }
  99. ttl, e := ReadTTL(ttlString)
  100. if e != nil {
  101. return e
  102. }
  103. for _, range_string := range strings.Split(volumeListString, ",") {
  104. if strings.Index(range_string, "-") < 0 {
  105. id_string := range_string
  106. id, err := NewVolumeId(id_string)
  107. if err != nil {
  108. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  109. }
  110. e = s.addVolume(VolumeId(id), collection, rt, ttl)
  111. } else {
  112. pair := strings.Split(range_string, "-")
  113. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  114. if start_err != nil {
  115. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  116. }
  117. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  118. if end_err != nil {
  119. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  120. }
  121. for id := start; id <= end; id++ {
  122. if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
  123. e = err
  124. }
  125. }
  126. }
  127. }
  128. return e
  129. }
  130. func (s *Store) DeleteCollection(collection string) (e error) {
  131. for _, location := range s.Locations {
  132. for k, v := range location.volumes {
  133. if v.Collection == collection {
  134. e = v.Destroy()
  135. if e != nil {
  136. return
  137. }
  138. delete(location.volumes, k)
  139. }
  140. }
  141. }
  142. return
  143. }
  144. func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
  145. e = v.Destroy()
  146. if e != nil {
  147. return
  148. }
  149. delete(volumes, v.Id)
  150. return
  151. }
  152. func (s *Store) findVolume(vid VolumeId) *Volume {
  153. for _, location := range s.Locations {
  154. if v, found := location.volumes[vid]; found {
  155. return v
  156. }
  157. }
  158. return nil
  159. }
  160. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  161. max := 0
  162. for _, location := range s.Locations {
  163. currentFreeCount := location.MaxVolumeCount - len(location.volumes)
  164. if currentFreeCount > max {
  165. max = currentFreeCount
  166. ret = location
  167. }
  168. }
  169. return ret
  170. }
  171. func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
  172. if s.findVolume(vid) != nil {
  173. return fmt.Errorf("Volume Id %d already exists!", vid)
  174. }
  175. if location := s.findFreeLocation(); location != nil {
  176. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  177. location.Directory, vid, collection, replicaPlacement, ttl)
  178. if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
  179. location.volumes[vid] = volume
  180. return nil
  181. } else {
  182. return err
  183. }
  184. }
  185. return fmt.Errorf("No more free space left")
  186. }
  187. func (s *Store) FreezeVolume(volumeIdString string) error {
  188. vid, err := NewVolumeId(volumeIdString)
  189. if err != nil {
  190. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  191. }
  192. if v := s.findVolume(vid); v != nil {
  193. if v.readOnly {
  194. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  195. }
  196. return v.freeze()
  197. }
  198. return fmt.Errorf("volume id %d is not found during freeze!", vid)
  199. }
  200. func (l *DiskLocation) loadExistingVolumes() {
  201. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  202. for _, dir := range dirs {
  203. name := dir.Name()
  204. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  205. collection := ""
  206. base := name[:len(name)-len(".dat")]
  207. i := strings.Index(base, "_")
  208. if i > 0 {
  209. collection, base = base[0:i], base[i+1:]
  210. }
  211. if vid, err := NewVolumeId(base); err == nil {
  212. if l.volumes[vid] == nil {
  213. if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
  214. l.volumes[vid] = v
  215. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  216. }
  217. }
  218. }
  219. }
  220. }
  221. }
  222. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  223. }
  224. func (s *Store) Status() []*VolumeInfo {
  225. var stats []*VolumeInfo
  226. for _, location := range s.Locations {
  227. for k, v := range location.volumes {
  228. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  229. Collection: v.Collection,
  230. ReplicaPlacement: v.ReplicaPlacement,
  231. Version: v.Version(),
  232. FileCount: v.nm.FileCount(),
  233. DeleteCount: v.nm.DeletedCount(),
  234. DeletedByteCount: v.nm.DeletedSize(),
  235. ReadOnly: v.readOnly}
  236. stats = append(stats, s)
  237. }
  238. }
  239. return stats
  240. }
  241. func (s *Store) SetDataCenter(dataCenter string) {
  242. s.dataCenter = dataCenter
  243. }
  244. func (s *Store) SetRack(rack string) {
  245. s.rack = rack
  246. }
  247. func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
  248. s.masterNodes = NewMasterNodes(bootstrapMaster)
  249. }
  250. func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
  251. masterNode, e = s.masterNodes.findMaster()
  252. if e != nil {
  253. return
  254. }
  255. var volumeMessages []*operation.VolumeInformationMessage
  256. maxVolumeCount := 0
  257. var maxFileKey uint64
  258. for _, location := range s.Locations {
  259. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  260. for k, v := range location.volumes {
  261. if maxFileKey < v.nm.MaxFileKey() {
  262. maxFileKey = v.nm.MaxFileKey()
  263. }
  264. if !v.expired(s.volumeSizeLimit) {
  265. volumeMessage := &operation.VolumeInformationMessage{
  266. Id: proto.Uint32(uint32(k)),
  267. Size: proto.Uint64(uint64(v.Size())),
  268. Collection: proto.String(v.Collection),
  269. FileCount: proto.Uint64(uint64(v.nm.FileCount())),
  270. DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
  271. DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
  272. ReadOnly: proto.Bool(v.readOnly),
  273. ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
  274. Version: proto.Uint32(uint32(v.Version())),
  275. Ttl: proto.Uint32(v.Ttl.ToUint32()),
  276. }
  277. volumeMessages = append(volumeMessages, volumeMessage)
  278. } else {
  279. if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  280. s.DeleteVolume(location.volumes, v)
  281. glog.V(0).Infoln("volume", v.Id, "is deleted.")
  282. } else {
  283. glog.V(0).Infoln("volume", v.Id, "is expired.")
  284. }
  285. }
  286. }
  287. }
  288. joinMessage := &operation.JoinMessage{
  289. IsInit: proto.Bool(!s.connected),
  290. Ip: proto.String(s.Ip),
  291. Port: proto.Uint32(uint32(s.Port)),
  292. PublicUrl: proto.String(s.PublicUrl),
  293. MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
  294. MaxFileKey: proto.Uint64(maxFileKey),
  295. DataCenter: proto.String(s.dataCenter),
  296. Rack: proto.String(s.rack),
  297. Volumes: volumeMessages,
  298. AdminPort: proto.Uint32(uint32(s.AdminPort)),
  299. }
  300. data, err := proto.Marshal(joinMessage)
  301. if err != nil {
  302. return "", "", err
  303. }
  304. jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
  305. if err != nil {
  306. s.masterNodes.reset()
  307. return "", "", err
  308. }
  309. var ret operation.JoinResult
  310. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  311. return masterNode, "", err
  312. }
  313. if ret.Error != "" {
  314. return masterNode, "", errors.New(ret.Error)
  315. }
  316. s.volumeSizeLimit = ret.VolumeSizeLimit
  317. secretKey = security.Secret(ret.SecretKey)
  318. s.connected = true
  319. return
  320. }
  321. func (s *Store) Close() {
  322. for _, location := range s.Locations {
  323. for _, v := range location.volumes {
  324. v.Close()
  325. }
  326. }
  327. }
  328. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  329. if v := s.findVolume(i); v != nil {
  330. if v.readOnly {
  331. err = fmt.Errorf("Volume %d is read only!", i)
  332. return
  333. } else {
  334. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
  335. size, err = v.write(n)
  336. } else {
  337. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  338. }
  339. if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
  340. glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
  341. if _, _, e := s.Join(); e != nil {
  342. glog.V(0).Infoln("error when reporting size:", e)
  343. }
  344. }
  345. }
  346. return
  347. }
  348. glog.V(0).Infoln("volume", i, "not found!")
  349. err = fmt.Errorf("Volume %d not found!", i)
  350. return
  351. }
  352. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  353. if v := s.findVolume(i); v != nil && !v.readOnly {
  354. return v.delete(n)
  355. }
  356. return 0, nil
  357. }
  358. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  359. if v := s.findVolume(i); v != nil {
  360. return v.read(n)
  361. }
  362. return 0, fmt.Errorf("Volume %v not found!", i)
  363. }
  364. func (s *Store) GetVolume(i VolumeId) *Volume {
  365. return s.findVolume(i)
  366. }
  367. func (s *Store) HasVolume(i VolumeId) bool {
  368. v := s.findVolume(i)
  369. return v != nil
  370. }