You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
8.7 KiB

12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/util"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type DiskLocation struct {
  13. directory string
  14. maxVolumeCount int
  15. volumes map[VolumeId]*Volume
  16. }
  17. type Store struct {
  18. Port int
  19. Ip string
  20. PublicUrl string
  21. locations []*DiskLocation
  22. masterNode string
  23. dataCenter string //optional informaton, overwriting master setting if exists
  24. rack string //optional information, overwriting master setting if exists
  25. connected bool
  26. volumeSizeLimit uint64 //read from the master
  27. }
  28. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
  29. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  30. s.locations = make([]*DiskLocation, 0)
  31. for i := 0; i < len(dirnames); i++ {
  32. location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
  33. location.volumes = make(map[VolumeId]*Volume)
  34. location.loadExistingVolumes()
  35. s.locations = append(s.locations, location)
  36. }
  37. return
  38. }
  39. func (s *Store) AddVolume(volumeListString string, replicationType string) error {
  40. rt, e := NewReplicationTypeFromString(replicationType)
  41. if e != nil {
  42. return e
  43. }
  44. for _, range_string := range strings.Split(volumeListString, ",") {
  45. if strings.Index(range_string, "-") < 0 {
  46. id_string := range_string
  47. id, err := NewVolumeId(id_string)
  48. if err != nil {
  49. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  50. }
  51. e = s.addVolume(VolumeId(id), rt)
  52. } else {
  53. pair := strings.Split(range_string, "-")
  54. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  55. if start_err != nil {
  56. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  57. }
  58. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  59. if end_err != nil {
  60. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  61. }
  62. for id := start; id <= end; id++ {
  63. if err := s.addVolume(VolumeId(id), rt); err != nil {
  64. e = err
  65. }
  66. }
  67. }
  68. }
  69. return e
  70. }
  71. func (s *Store) findVolume(vid VolumeId) *Volume {
  72. for _, location := range s.locations {
  73. if v, found := location.volumes[vid]; found {
  74. return v
  75. }
  76. }
  77. return nil
  78. }
  79. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  80. max := 0
  81. for _, location := range s.locations {
  82. currentFreeCount := location.maxVolumeCount - len(location.volumes)
  83. if currentFreeCount > max {
  84. max = currentFreeCount
  85. ret = location
  86. }
  87. }
  88. return ret
  89. }
  90. func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
  91. if s.findVolume(vid) != nil {
  92. return fmt.Errorf("Volume Id %s already exists!", vid)
  93. }
  94. if location := s.findFreeLocation(); location != nil {
  95. log.Println("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType)
  96. if volume, err := NewVolume(location.directory, vid, replicationType); err == nil {
  97. location.volumes[vid] = volume
  98. return nil
  99. } else {
  100. return err
  101. }
  102. }
  103. return fmt.Errorf("No more free space left")
  104. }
  105. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  106. vid, err := NewVolumeId(volumeIdString)
  107. if err != nil {
  108. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString), false
  109. }
  110. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  111. if e != nil {
  112. return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
  113. }
  114. if v := s.findVolume(vid); v != nil {
  115. return nil, garbageThreshold < v.garbageLevel()
  116. }
  117. return fmt.Errorf("volume id %s is not found during check compact!", vid), false
  118. }
  119. func (s *Store) CompactVolume(volumeIdString string) error {
  120. vid, err := NewVolumeId(volumeIdString)
  121. if err != nil {
  122. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  123. }
  124. if v := s.findVolume(vid); v != nil {
  125. return v.compact()
  126. }
  127. return fmt.Errorf("volume id %s is not found during compact!", vid)
  128. }
  129. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  130. vid, err := NewVolumeId(volumeIdString)
  131. if err != nil {
  132. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  133. }
  134. if v := s.findVolume(vid); v != nil {
  135. return v.commitCompact()
  136. }
  137. return fmt.Errorf("volume id %s is not found during commit compact!", vid)
  138. }
  139. func (s *Store) FreezeVolume(volumeIdString string) error {
  140. vid, err := NewVolumeId(volumeIdString)
  141. if err != nil {
  142. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  143. }
  144. if v := s.findVolume(vid); v != nil {
  145. if v.readOnly {
  146. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  147. }
  148. return v.freeze()
  149. }
  150. return fmt.Errorf("volume id %s is not found during freeze!", vid)
  151. }
  152. func (l *DiskLocation) loadExistingVolumes() {
  153. if dirs, err := ioutil.ReadDir(l.directory); err == nil {
  154. for _, dir := range dirs {
  155. name := dir.Name()
  156. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  157. base := name[:len(name)-len(".dat")]
  158. if vid, err := NewVolumeId(base); err == nil {
  159. if l.volumes[vid] == nil {
  160. if v, e := NewVolume(l.directory, vid, CopyNil); e == nil {
  161. l.volumes[vid] = v
  162. log.Println("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  163. }
  164. }
  165. }
  166. }
  167. }
  168. }
  169. log.Println("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
  170. }
  171. func (s *Store) Status() []*VolumeInfo {
  172. var stats []*VolumeInfo
  173. for _, location := range s.locations {
  174. for k, v := range location.volumes {
  175. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  176. RepType: v.ReplicaType, Version: v.Version(),
  177. FileCount: v.nm.FileCount(),
  178. DeleteCount: v.nm.DeletedCount(),
  179. DeletedByteCount: v.nm.DeletedSize(),
  180. ReadOnly: v.readOnly}
  181. stats = append(stats, s)
  182. }
  183. }
  184. return stats
  185. }
  186. type JoinResult struct {
  187. VolumeSizeLimit uint64
  188. }
  189. func (s *Store) SetMaster(mserver string) {
  190. s.masterNode = mserver
  191. }
  192. func (s *Store) SetDataCenter(dataCenter string) {
  193. s.dataCenter = dataCenter
  194. }
  195. func (s *Store) SetRack(rack string) {
  196. s.rack = rack
  197. }
  198. func (s *Store) Join() error {
  199. stats := new([]*VolumeInfo)
  200. maxVolumeCount := 0
  201. for _, location := range s.locations {
  202. maxVolumeCount = maxVolumeCount + location.maxVolumeCount
  203. for k, v := range location.volumes {
  204. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  205. RepType: v.ReplicaType, Version: v.Version(),
  206. FileCount: v.nm.FileCount(),
  207. DeleteCount: v.nm.DeletedCount(),
  208. DeletedByteCount: v.nm.DeletedSize(),
  209. ReadOnly: v.readOnly}
  210. *stats = append(*stats, s)
  211. }
  212. }
  213. bytes, _ := json.Marshal(stats)
  214. values := make(url.Values)
  215. if !s.connected {
  216. values.Add("init", "true")
  217. }
  218. values.Add("port", strconv.Itoa(s.Port))
  219. values.Add("ip", s.Ip)
  220. values.Add("publicUrl", s.PublicUrl)
  221. values.Add("volumes", string(bytes))
  222. values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
  223. values.Add("dataCenter", s.dataCenter)
  224. values.Add("rack", s.rack)
  225. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  226. if err != nil {
  227. return err
  228. }
  229. var ret JoinResult
  230. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  231. return err
  232. }
  233. s.volumeSizeLimit = ret.VolumeSizeLimit
  234. s.connected = true
  235. return nil
  236. }
  237. func (s *Store) Close() {
  238. for _, location := range s.locations {
  239. for _, v := range location.volumes {
  240. v.Close()
  241. }
  242. }
  243. }
  244. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  245. if v := s.findVolume(i); v != nil {
  246. if v.readOnly {
  247. err = fmt.Errorf("Volume %s is read only!", i)
  248. return
  249. } else {
  250. if s.volumeSizeLimit >= v.ContentSize()+uint64(size) {
  251. size, err = v.write(n)
  252. } else {
  253. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  254. }
  255. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  256. log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  257. if err = s.Join(); err != nil {
  258. log.Printf("error with Join: %s", err)
  259. }
  260. }
  261. }
  262. return
  263. }
  264. log.Println("volume", i, "not found!")
  265. err = fmt.Errorf("Volume %s not found!", i)
  266. return
  267. }
  268. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  269. if v := s.findVolume(i); v != nil && !v.readOnly {
  270. return v.delete(n)
  271. }
  272. return 0, nil
  273. }
  274. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  275. if v := s.findVolume(i); v != nil {
  276. return v.read(n)
  277. }
  278. return 0, fmt.Errorf("Volume %s not found!", i)
  279. }
  280. func (s *Store) GetVolume(i VolumeId) *Volume {
  281. return s.findVolume(i)
  282. }
  283. func (s *Store) HasVolume(i VolumeId) bool {
  284. v := s.findVolume(i)
  285. return v != nil
  286. }