You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
7.1 KiB

12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/util"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type Store struct {
  13. volumes map[VolumeId]*Volume
  14. dir string
  15. Port int
  16. Ip string
  17. PublicUrl string
  18. MaxVolumeCount int
  19. masterNode string
  20. dataCenter string //optional informaton, overwriting master setting if exists
  21. rack string //optional information, overwriting master setting if exists
  22. connected bool
  23. volumeSizeLimit uint64 //read from the master
  24. }
  25. func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
  26. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
  27. s.volumes = make(map[VolumeId]*Volume)
  28. s.loadExistingVolumes()
  29. log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
  30. return
  31. }
  32. func (s *Store) AddVolume(volumeListString string, replicationType string) error {
  33. rt, e := NewReplicationTypeFromString(replicationType)
  34. if e != nil {
  35. return e
  36. }
  37. for _, range_string := range strings.Split(volumeListString, ",") {
  38. if strings.Index(range_string, "-") < 0 {
  39. id_string := range_string
  40. id, err := NewVolumeId(id_string)
  41. if err != nil {
  42. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  43. }
  44. e = s.addVolume(VolumeId(id), rt)
  45. } else {
  46. pair := strings.Split(range_string, "-")
  47. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  48. if start_err != nil {
  49. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  50. }
  51. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  52. if end_err != nil {
  53. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1] )
  54. }
  55. for id := start; id <= end; id++ {
  56. if err := s.addVolume(VolumeId(id), rt); err != nil {
  57. e = err
  58. }
  59. }
  60. }
  61. }
  62. return e
  63. }
  64. func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
  65. if s.volumes[vid] != nil {
  66. return fmt.Errorf("Volume Id %s already exists!", vid)
  67. }
  68. log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
  69. s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
  70. return err
  71. }
  72. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  73. vid, err := NewVolumeId(volumeIdString)
  74. if err != nil {
  75. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString), false
  76. }
  77. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  78. if e != nil {
  79. return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
  80. }
  81. return nil, garbageThreshold < s.volumes[vid].garbageLevel()
  82. }
  83. func (s *Store) CompactVolume(volumeIdString string) error {
  84. vid, err := NewVolumeId(volumeIdString)
  85. if err != nil {
  86. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  87. }
  88. return s.volumes[vid].compact()
  89. }
  90. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  91. vid, err := NewVolumeId(volumeIdString)
  92. if err != nil {
  93. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  94. }
  95. return s.volumes[vid].commitCompact()
  96. }
  97. func (s *Store) FreezeVolume(volumeIdString string) error {
  98. vid, err := NewVolumeId(volumeIdString)
  99. if err != nil {
  100. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  101. }
  102. if s.volumes[vid].readOnly {
  103. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  104. }
  105. return s.volumes[vid].freeze()
  106. }
  107. func (s *Store) loadExistingVolumes() {
  108. if dirs, err := ioutil.ReadDir(s.dir); err == nil {
  109. for _, dir := range dirs {
  110. name := dir.Name()
  111. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  112. base := name[:len(name)-len(".dat")]
  113. if vid, err := NewVolumeId(base); err == nil {
  114. if s.volumes[vid] == nil {
  115. if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
  116. s.volumes[vid] = v
  117. log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  118. }
  119. }
  120. }
  121. }
  122. }
  123. }
  124. }
  125. func (s *Store) Status() []*VolumeInfo {
  126. var stats []*VolumeInfo
  127. for k, v := range s.volumes {
  128. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  129. RepType: v.ReplicaType, Version: v.Version(),
  130. FileCount: v.nm.FileCount(),
  131. DeleteCount: v.nm.DeletedCount(),
  132. DeletedByteCount: v.nm.DeletedSize(),
  133. ReadOnly: v.readOnly}
  134. stats = append(stats, s)
  135. }
  136. return stats
  137. }
  138. type JoinResult struct {
  139. VolumeSizeLimit uint64
  140. }
  141. func (s *Store) SetMaster(mserver string) {
  142. s.masterNode = mserver
  143. }
  144. func (s *Store) SetDataCenter(dataCenter string) {
  145. s.dataCenter = dataCenter
  146. }
  147. func (s *Store) SetRack(rack string) {
  148. s.rack = rack
  149. }
  150. func (s *Store) Join() error {
  151. stats := new([]*VolumeInfo)
  152. for k, v := range s.volumes {
  153. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  154. RepType: v.ReplicaType, Version: v.Version(),
  155. FileCount: v.nm.FileCount(),
  156. DeleteCount: v.nm.DeletedCount(),
  157. DeletedByteCount: v.nm.DeletedSize(),
  158. ReadOnly: v.readOnly}
  159. *stats = append(*stats, s)
  160. }
  161. bytes, _ := json.Marshal(stats)
  162. values := make(url.Values)
  163. if !s.connected {
  164. values.Add("init", "true")
  165. }
  166. values.Add("port", strconv.Itoa(s.Port))
  167. values.Add("ip", s.Ip)
  168. values.Add("publicUrl", s.PublicUrl)
  169. values.Add("volumes", string(bytes))
  170. values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
  171. values.Add("dataCenter", s.dataCenter)
  172. values.Add("rack", s.rack)
  173. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  174. if err != nil {
  175. return err
  176. }
  177. var ret JoinResult
  178. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  179. return err
  180. }
  181. s.volumeSizeLimit = ret.VolumeSizeLimit
  182. s.connected = true
  183. return nil
  184. }
  185. func (s *Store) Close() {
  186. for _, v := range s.volumes {
  187. v.Close()
  188. }
  189. }
  190. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  191. if v := s.volumes[i]; v != nil {
  192. if v.readOnly {
  193. err = fmt.Errorf("Volume %s is read only!", i)
  194. return
  195. } else {
  196. if s.volumeSizeLimit >= v.ContentSize()+uint64(size) {
  197. size, err = v.write(n)
  198. } else {
  199. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  200. }
  201. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  202. log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  203. if err = s.Join(); err != nil {
  204. log.Printf("error with Join: %s", err)
  205. }
  206. }
  207. }
  208. return
  209. }
  210. log.Println("volume", i, "not found!")
  211. err = fmt.Errorf("Volume %s not found!", i)
  212. return
  213. }
  214. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  215. if v := s.volumes[i]; v != nil && !v.readOnly {
  216. return v.delete(n)
  217. }
  218. return 0, nil
  219. }
  220. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  221. if v := s.volumes[i]; v != nil {
  222. return v.read(n)
  223. }
  224. return 0, fmt.Errorf("Volume %s not found!", i)
  225. }
  226. func (s *Store) GetVolume(i VolumeId) *Volume {
  227. return s.volumes[i]
  228. }
  229. func (s *Store) HasVolume(i VolumeId) bool {
  230. _, ok := s.volumes[i]
  231. return ok
  232. }