You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
8.7 KiB

12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/util"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type DiskLocation struct {
  13. directory string
  14. maxVolumeCount int
  15. volumes map[VolumeId]*Volume
  16. }
  17. type Store struct {
  18. Port int
  19. Ip string
  20. PublicUrl string
  21. locations []*DiskLocation
  22. masterNode string
  23. dataCenter string //optional informaton, overwriting master setting if exists
  24. rack string //optional information, overwriting master setting if exists
  25. connected bool
  26. volumeSizeLimit uint64 //read from the master
  27. }
  28. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
  29. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  30. s.locations = make([]*DiskLocation, 0)
  31. for i := 0; i < len(dirnames); i++ {
  32. location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
  33. location.volumes = make(map[VolumeId]*Volume)
  34. location.loadExistingVolumes()
  35. s.locations = append(s.locations, location)
  36. }
  37. return
  38. }
  39. func (s *Store) AddVolume(volumeListString string, replicationType string) error {
  40. rt, e := NewReplicationTypeFromString(replicationType)
  41. if e != nil {
  42. return e
  43. }
  44. for _, range_string := range strings.Split(volumeListString, ",") {
  45. if strings.Index(range_string, "-") < 0 {
  46. id_string := range_string
  47. id, err := NewVolumeId(id_string)
  48. if err != nil {
  49. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  50. }
  51. e = s.addVolume(VolumeId(id), rt)
  52. } else {
  53. pair := strings.Split(range_string, "-")
  54. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  55. if start_err != nil {
  56. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  57. }
  58. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  59. if end_err != nil {
  60. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  61. }
  62. for id := start; id <= end; id++ {
  63. if err := s.addVolume(VolumeId(id), rt); err != nil {
  64. e = err
  65. }
  66. }
  67. }
  68. }
  69. return e
  70. }
  71. func (s *Store) findVolume(vid VolumeId) *Volume {
  72. for _, location := range s.locations {
  73. if v, found := location.volumes[vid]; found {
  74. return v
  75. }
  76. }
  77. return nil
  78. }
  79. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  80. max := 0
  81. for _, location := range s.locations {
  82. currentFreeCount := location.maxVolumeCount - len(location.volumes)
  83. if currentFreeCount > max {
  84. max = currentFreeCount
  85. ret = location
  86. }
  87. }
  88. return ret
  89. }
  90. func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
  91. if s.findVolume(vid) != nil {
  92. return fmt.Errorf("Volume Id %s already exists!", vid)
  93. }
  94. if location := s.findFreeLocation(); location != nil {
  95. log.Println("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
  96. location.volumes[vid], err = NewVolume(location.directory, vid, replicationType)
  97. return err
  98. }
  99. return fmt.Errorf("No more free space left")
  100. }
  101. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  102. vid, err := NewVolumeId(volumeIdString)
  103. if err != nil {
  104. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString), false
  105. }
  106. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  107. if e != nil {
  108. return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
  109. }
  110. if v := s.findVolume(vid); v != nil {
  111. return nil, garbageThreshold < v.garbageLevel()
  112. }
  113. return fmt.Errorf("volume id %s is not found during check compact!", vid), false
  114. }
  115. func (s *Store) CompactVolume(volumeIdString string) error {
  116. vid, err := NewVolumeId(volumeIdString)
  117. if err != nil {
  118. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  119. }
  120. if v := s.findVolume(vid); v != nil {
  121. return v.compact()
  122. }
  123. return fmt.Errorf("volume id %s is not found during compact!", vid)
  124. }
  125. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  126. vid, err := NewVolumeId(volumeIdString)
  127. if err != nil {
  128. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  129. }
  130. if v := s.findVolume(vid); v != nil {
  131. return v.commitCompact()
  132. }
  133. return fmt.Errorf("volume id %s is not found during commit compact!", vid)
  134. }
  135. func (s *Store) FreezeVolume(volumeIdString string) error {
  136. vid, err := NewVolumeId(volumeIdString)
  137. if err != nil {
  138. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  139. }
  140. if v := s.findVolume(vid); v != nil {
  141. if v.readOnly {
  142. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  143. }
  144. return v.freeze()
  145. } else {
  146. return fmt.Errorf("volume id %s is not found during freeze!", vid)
  147. }
  148. }
  149. func (l *DiskLocation) loadExistingVolumes() {
  150. if dirs, err := ioutil.ReadDir(l.directory); err == nil {
  151. for _, dir := range dirs {
  152. name := dir.Name()
  153. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  154. base := name[:len(name)-len(".dat")]
  155. if vid, err := NewVolumeId(base); err == nil {
  156. if l.volumes[vid] == nil {
  157. if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
  158. l.volumes[vid] = v
  159. log.Println("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  160. }
  161. }
  162. }
  163. }
  164. }
  165. }
  166. log.Println("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
  167. }
  168. func (s *Store) Status() []*VolumeInfo {
  169. var stats []*VolumeInfo
  170. for _, location := range s.locations {
  171. for k, v := range location.volumes {
  172. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  173. RepType: v.ReplicaType, Version: v.Version(),
  174. FileCount: v.nm.FileCount(),
  175. DeleteCount: v.nm.DeletedCount(),
  176. DeletedByteCount: v.nm.DeletedSize(),
  177. ReadOnly: v.readOnly}
  178. stats = append(stats, s)
  179. }
  180. }
  181. return stats
  182. }
  183. type JoinResult struct {
  184. VolumeSizeLimit uint64
  185. }
  186. func (s *Store) SetMaster(mserver string) {
  187. s.masterNode = mserver
  188. }
  189. func (s *Store) SetDataCenter(dataCenter string) {
  190. s.dataCenter = dataCenter
  191. }
  192. func (s *Store) SetRack(rack string) {
  193. s.rack = rack
  194. }
  195. func (s *Store) Join() error {
  196. stats := new([]*VolumeInfo)
  197. maxVolumeCount := 0
  198. for _, location := range s.locations {
  199. maxVolumeCount = maxVolumeCount + location.maxVolumeCount
  200. for k, v := range location.volumes {
  201. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  202. RepType: v.ReplicaType, Version: v.Version(),
  203. FileCount: v.nm.FileCount(),
  204. DeleteCount: v.nm.DeletedCount(),
  205. DeletedByteCount: v.nm.DeletedSize(),
  206. ReadOnly: v.readOnly}
  207. *stats = append(*stats, s)
  208. }
  209. }
  210. bytes, _ := json.Marshal(stats)
  211. values := make(url.Values)
  212. if !s.connected {
  213. values.Add("init", "true")
  214. }
  215. values.Add("port", strconv.Itoa(s.Port))
  216. values.Add("ip", s.Ip)
  217. values.Add("publicUrl", s.PublicUrl)
  218. values.Add("volumes", string(bytes))
  219. values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
  220. values.Add("dataCenter", s.dataCenter)
  221. values.Add("rack", s.rack)
  222. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  223. if err != nil {
  224. return err
  225. }
  226. var ret JoinResult
  227. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  228. return err
  229. }
  230. s.volumeSizeLimit = ret.VolumeSizeLimit
  231. s.connected = true
  232. return nil
  233. }
  234. func (s *Store) Close() {
  235. for _, location := range s.locations {
  236. for _, v := range location.volumes {
  237. v.Close()
  238. }
  239. }
  240. }
  241. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  242. if v := s.findVolume(i); v != nil {
  243. if v.readOnly {
  244. err = fmt.Errorf("Volume %s is read only!", i)
  245. return
  246. } else {
  247. if s.volumeSizeLimit >= v.ContentSize()+uint64(size) {
  248. size, err = v.write(n)
  249. } else {
  250. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  251. }
  252. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  253. log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  254. if err = s.Join(); err != nil {
  255. log.Printf("error with Join: %s", err)
  256. }
  257. }
  258. }
  259. return
  260. }
  261. log.Println("volume", i, "not found!")
  262. err = fmt.Errorf("Volume %s not found!", i)
  263. return
  264. }
  265. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  266. if v := s.findVolume(i); v != nil && !v.readOnly {
  267. return v.delete(n)
  268. }
  269. return 0, nil
  270. }
  271. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  272. if v := s.findVolume(i); v != nil {
  273. return v.read(n)
  274. }
  275. return 0, fmt.Errorf("Volume %s not found!", i)
  276. }
  277. func (s *Store) GetVolume(i VolumeId) *Volume {
  278. return s.findVolume(i)
  279. }
  280. func (s *Store) HasVolume(i VolumeId) bool {
  281. v := s.findVolume(i)
  282. return v != nil
  283. }