You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
6.3 KiB

12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/util"
  4. "encoding/json"
  5. "errors"
  6. "io/ioutil"
  7. "log"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type Store struct {
  13. volumes map[VolumeId]*Volume
  14. dir string
  15. Port int
  16. Ip string
  17. PublicUrl string
  18. MaxVolumeCount int
  19. masterNode string
  20. connected bool
  21. volumeSizeLimit uint64 //read from the master
  22. }
  23. func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
  24. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
  25. s.volumes = make(map[VolumeId]*Volume)
  26. s.loadExistingVolumes()
  27. log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
  28. return
  29. }
  30. func (s *Store) AddVolume(volumeListString string, replicationType string) error {
  31. rt, e := NewReplicationTypeFromString(replicationType)
  32. if e != nil {
  33. return e
  34. }
  35. for _, range_string := range strings.Split(volumeListString, ",") {
  36. if strings.Index(range_string, "-") < 0 {
  37. id_string := range_string
  38. id, err := NewVolumeId(id_string)
  39. if err != nil {
  40. return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
  41. }
  42. e = s.addVolume(VolumeId(id), rt)
  43. } else {
  44. pair := strings.Split(range_string, "-")
  45. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  46. if start_err != nil {
  47. return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!")
  48. }
  49. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  50. if end_err != nil {
  51. return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
  52. }
  53. for id := start; id <= end; id++ {
  54. if err := s.addVolume(VolumeId(id), rt); err != nil {
  55. e = err
  56. }
  57. }
  58. }
  59. }
  60. return e
  61. }
  62. func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
  63. if s.volumes[vid] != nil {
  64. return errors.New("Volume Id " + vid.String() + " already exists!")
  65. }
  66. log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
  67. s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
  68. return err
  69. }
  70. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  71. vid, err := NewVolumeId(volumeIdString)
  72. if err != nil {
  73. return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
  74. }
  75. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  76. if e != nil {
  77. return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
  78. }
  79. return nil, garbageThreshold < s.volumes[vid].garbageLevel()
  80. }
  81. func (s *Store) CompactVolume(volumeIdString string) error {
  82. vid, err := NewVolumeId(volumeIdString)
  83. if err != nil {
  84. return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
  85. }
  86. return s.volumes[vid].compact()
  87. }
  88. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  89. vid, err := NewVolumeId(volumeIdString)
  90. if err != nil {
  91. return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
  92. }
  93. return s.volumes[vid].commitCompact()
  94. }
  95. func (s *Store) loadExistingVolumes() {
  96. if dirs, err := ioutil.ReadDir(s.dir); err == nil {
  97. for _, dir := range dirs {
  98. name := dir.Name()
  99. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  100. base := name[:len(name)-len(".dat")]
  101. if vid, err := NewVolumeId(base); err == nil {
  102. if s.volumes[vid] == nil {
  103. if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
  104. s.volumes[vid] = v
  105. log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  106. }
  107. }
  108. }
  109. }
  110. }
  111. }
  112. }
  113. func (s *Store) Status() []*VolumeInfo {
  114. var stats []*VolumeInfo
  115. for k, v := range s.volumes {
  116. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  117. RepType: v.ReplicaType, Version: v.Version(),
  118. FileCount: v.nm.FileCount(),
  119. DeleteCount: v.nm.DeletedCount(),
  120. DeletedByteCount: v.nm.DeletedSize(),
  121. ReadOnly: v.readOnly}
  122. stats = append(stats, s)
  123. }
  124. return stats
  125. }
  126. type JoinResult struct {
  127. VolumeSizeLimit uint64
  128. }
  129. func (s *Store) SetMaster(mserver string) {
  130. s.masterNode = mserver
  131. }
  132. func (s *Store) Join() error {
  133. stats := new([]*VolumeInfo)
  134. for k, v := range s.volumes {
  135. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  136. RepType: v.ReplicaType, Version: v.Version(),
  137. FileCount: v.nm.FileCount(),
  138. DeleteCount: v.nm.DeletedCount(),
  139. DeletedByteCount: v.nm.DeletedSize(),
  140. ReadOnly: v.readOnly}
  141. *stats = append(*stats, s)
  142. }
  143. bytes, _ := json.Marshal(stats)
  144. values := make(url.Values)
  145. if !s.connected {
  146. values.Add("init", "true")
  147. }
  148. values.Add("port", strconv.Itoa(s.Port))
  149. values.Add("ip", s.Ip)
  150. values.Add("publicUrl", s.PublicUrl)
  151. values.Add("volumes", string(bytes))
  152. values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
  153. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  154. if err != nil {
  155. return err
  156. }
  157. var ret JoinResult
  158. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  159. return err
  160. }
  161. s.volumeSizeLimit = ret.VolumeSizeLimit
  162. s.connected = true
  163. return nil
  164. }
  165. func (s *Store) Close() {
  166. for _, v := range s.volumes {
  167. v.Close()
  168. }
  169. }
  170. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  171. if v := s.volumes[i]; v != nil {
  172. if v.readOnly {
  173. err = errors.New("Volume " + i.String() + " is read only!")
  174. return
  175. } else {
  176. size, err = v.write(n)
  177. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  178. log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  179. if err = s.Join(); err != nil {
  180. log.Printf("error with Join: %s", err)
  181. }
  182. }
  183. }
  184. return
  185. }
  186. log.Println("volume", i, "not found!")
  187. err = errors.New("Volume " + i.String() + " not found!")
  188. return
  189. }
  190. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  191. if v := s.volumes[i]; v != nil && !v.readOnly {
  192. return v.delete(n)
  193. }
  194. return 0, nil
  195. }
  196. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  197. if v := s.volumes[i]; v != nil {
  198. return v.read(n)
  199. }
  200. return 0, errors.New("Not Found")
  201. }
  202. func (s *Store) GetVolume(i VolumeId) *Volume {
  203. return s.volumes[i]
  204. }
  205. func (s *Store) HasVolume(i VolumeId) bool {
  206. _, ok := s.volumes[i]
  207. return ok
  208. }