You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
8.8 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/util"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type DiskLocation struct {
  13. directory string
  14. maxVolumeCount int
  15. volumes map[VolumeId]*Volume
  16. }
  17. type Store struct {
  18. Port int
  19. Ip string
  20. PublicUrl string
  21. locations []*DiskLocation
  22. masterNode string
  23. dataCenter string //optional informaton, overwriting master setting if exists
  24. rack string //optional information, overwriting master setting if exists
  25. connected bool
  26. volumeSizeLimit uint64 //read from the master
  27. }
  28. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
  29. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  30. s.locations = make([]*DiskLocation, 0)
  31. for i := 0; i < len(dirnames); i++ {
  32. location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
  33. location.volumes = make(map[VolumeId]*Volume)
  34. location.loadExistingVolumes()
  35. s.locations = append(s.locations, location)
  36. }
  37. return
  38. }
  39. func (s *Store) AddVolume(volumeListString string, replicationType string) error {
  40. rt, e := NewReplicationTypeFromString(replicationType)
  41. if e != nil {
  42. return e
  43. }
  44. for _, range_string := range strings.Split(volumeListString, ",") {
  45. if strings.Index(range_string, "-") < 0 {
  46. id_string := range_string
  47. id, err := NewVolumeId(id_string)
  48. if err != nil {
  49. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  50. }
  51. e = s.addVolume(VolumeId(id), rt)
  52. } else {
  53. pair := strings.Split(range_string, "-")
  54. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  55. if start_err != nil {
  56. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  57. }
  58. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  59. if end_err != nil {
  60. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  61. }
  62. for id := start; id <= end; id++ {
  63. if err := s.addVolume(VolumeId(id), rt); err != nil {
  64. e = err
  65. }
  66. }
  67. }
  68. }
  69. return e
  70. }
  71. func (s *Store) findVolume(vid VolumeId) *Volume {
  72. for _, location := range s.locations {
  73. if v, found := location.volumes[vid]; found {
  74. return v
  75. }
  76. }
  77. return nil
  78. }
  79. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  80. max := 0
  81. for _, location := range s.locations {
  82. currentFreeCount := location.maxVolumeCount - len(location.volumes)
  83. if currentFreeCount > max {
  84. max = currentFreeCount
  85. ret = location
  86. }
  87. }
  88. return ret
  89. }
  90. func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
  91. if s.findVolume(vid) != nil {
  92. return fmt.Errorf("Volume Id %s already exists!", vid)
  93. }
  94. if location := s.findFreeLocation(); location != nil {
  95. glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
  96. if volume, err := NewVolume(location.directory, vid, replicationType); err == nil {
  97. location.volumes[vid] = volume
  98. return nil
  99. } else {
  100. return err
  101. }
  102. }
  103. return fmt.Errorf("No more free space left")
  104. }
  105. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  106. vid, err := NewVolumeId(volumeIdString)
  107. if err != nil {
  108. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString), false
  109. }
  110. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  111. if e != nil {
  112. return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
  113. }
  114. if v := s.findVolume(vid); v != nil {
  115. return nil, garbageThreshold < v.garbageLevel()
  116. }
  117. return fmt.Errorf("volume id %s is not found during check compact!", vid), false
  118. }
  119. func (s *Store) CompactVolume(volumeIdString string) error {
  120. vid, err := NewVolumeId(volumeIdString)
  121. if err != nil {
  122. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  123. }
  124. if v := s.findVolume(vid); v != nil {
  125. return v.Compact()
  126. }
  127. return fmt.Errorf("volume id %s is not found during compact!", vid)
  128. }
  129. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  130. vid, err := NewVolumeId(volumeIdString)
  131. if err != nil {
  132. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  133. }
  134. if v := s.findVolume(vid); v != nil {
  135. return v.commitCompact()
  136. }
  137. return fmt.Errorf("volume id %s is not found during commit compact!", vid)
  138. }
  139. func (s *Store) FreezeVolume(volumeIdString string) error {
  140. vid, err := NewVolumeId(volumeIdString)
  141. if err != nil {
  142. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  143. }
  144. if v := s.findVolume(vid); v != nil {
  145. if v.readOnly {
  146. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  147. }
  148. return v.freeze()
  149. }
  150. return fmt.Errorf("volume id %s is not found during freeze!", vid)
  151. }
  152. func (l *DiskLocation) loadExistingVolumes() {
  153. if dirs, err := ioutil.ReadDir(l.directory); err == nil {
  154. for _, dir := range dirs {
  155. name := dir.Name()
  156. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  157. base := name[:len(name)-len(".dat")]
  158. if vid, err := NewVolumeId(base); err == nil {
  159. if l.volumes[vid] == nil {
  160. if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
  161. l.volumes[vid] = v
  162. glog.V(0).Infoln("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  163. }
  164. }
  165. }
  166. }
  167. }
  168. }
  169. glog.V(0).Infoln("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
  170. }
  171. func (s *Store) Status() []*VolumeInfo {
  172. var stats []*VolumeInfo
  173. for _, location := range s.locations {
  174. for k, v := range location.volumes {
  175. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  176. RepType: v.ReplicaType, Version: v.Version(),
  177. FileCount: v.nm.FileCount(),
  178. DeleteCount: v.nm.DeletedCount(),
  179. DeletedByteCount: v.nm.DeletedSize(),
  180. ReadOnly: v.readOnly}
  181. stats = append(stats, s)
  182. }
  183. }
  184. return stats
  185. }
  186. type JoinResult struct {
  187. VolumeSizeLimit uint64
  188. }
  189. func (s *Store) SetMaster(mserver string) {
  190. s.masterNode = mserver
  191. }
  192. func (s *Store) SetDataCenter(dataCenter string) {
  193. s.dataCenter = dataCenter
  194. }
  195. func (s *Store) SetRack(rack string) {
  196. s.rack = rack
  197. }
  198. func (s *Store) Join() error {
  199. stats := new([]*VolumeInfo)
  200. maxVolumeCount := 0
  201. for _, location := range s.locations {
  202. maxVolumeCount = maxVolumeCount + location.maxVolumeCount
  203. for k, v := range location.volumes {
  204. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  205. RepType: v.ReplicaType, Version: v.Version(),
  206. FileCount: v.nm.FileCount(),
  207. DeleteCount: v.nm.DeletedCount(),
  208. DeletedByteCount: v.nm.DeletedSize(),
  209. ReadOnly: v.readOnly}
  210. *stats = append(*stats, s)
  211. }
  212. }
  213. bytes, _ := json.Marshal(stats)
  214. values := make(url.Values)
  215. if !s.connected {
  216. values.Add("init", "true")
  217. }
  218. values.Add("port", strconv.Itoa(s.Port))
  219. values.Add("ip", s.Ip)
  220. values.Add("publicUrl", s.PublicUrl)
  221. values.Add("volumes", string(bytes))
  222. values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
  223. values.Add("dataCenter", s.dataCenter)
  224. values.Add("rack", s.rack)
  225. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  226. if err != nil {
  227. return err
  228. }
  229. var ret JoinResult
  230. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  231. return err
  232. }
  233. s.volumeSizeLimit = ret.VolumeSizeLimit
  234. s.connected = true
  235. return nil
  236. }
  237. func (s *Store) Close() {
  238. for _, location := range s.locations {
  239. for _, v := range location.volumes {
  240. v.Close()
  241. }
  242. }
  243. }
  244. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  245. if v := s.findVolume(i); v != nil {
  246. if v.readOnly {
  247. err = fmt.Errorf("Volume %s is read only!", i)
  248. return
  249. } else {
  250. if s.volumeSizeLimit >= v.ContentSize()+uint64(size) {
  251. size, err = v.write(n)
  252. } else {
  253. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  254. }
  255. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  256. glog.V(0).Infoln("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  257. if err = s.Join(); err != nil {
  258. glog.V(0).Infoln("error with Join:", err)
  259. }
  260. }
  261. }
  262. return
  263. }
  264. glog.V(0).Infoln("volume", i, "not found!")
  265. err = fmt.Errorf("Volume %s not found!", i)
  266. return
  267. }
  268. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  269. if v := s.findVolume(i); v != nil && !v.readOnly {
  270. return v.delete(n)
  271. }
  272. return 0, nil
  273. }
  274. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  275. if v := s.findVolume(i); v != nil {
  276. return v.read(n)
  277. }
  278. return 0, fmt.Errorf("Volume %s not found!", i)
  279. }
  280. func (s *Store) GetVolume(i VolumeId) *Volume {
  281. return s.findVolume(i)
  282. }
  283. func (s *Store) HasVolume(i VolumeId) bool {
  284. v := s.findVolume(i)
  285. return v != nil
  286. }