You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
9.1 KiB

12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/util"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. type DiskLocation struct {
  13. directory string
  14. maxVolumeCount int
  15. volumes map[VolumeId]*Volume
  16. }
  17. type Store struct {
  18. Port int
  19. Ip string
  20. PublicUrl string
  21. locations []*DiskLocation
  22. masterNode string
  23. dataCenter string //optional informaton, overwriting master setting if exists
  24. rack string //optional information, overwriting master setting if exists
  25. connected bool
  26. volumeSizeLimit uint64 //read from the master
  27. }
  28. func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
  29. s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
  30. s.locations = make([]*DiskLocation, 0)
  31. for i := 0; i < len(dirnames); i++ {
  32. location := &DiskLocation{directory: dirnames[i], maxVolumeCount: maxVolumeCounts[i]}
  33. location.volumes = make(map[VolumeId]*Volume)
  34. location.loadExistingVolumes()
  35. s.locations = append(s.locations, location)
  36. }
  37. return
  38. }
  39. func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error {
  40. rt, e := NewReplicationTypeFromString(replicationType)
  41. if e != nil {
  42. return e
  43. }
  44. for _, range_string := range strings.Split(volumeListString, ",") {
  45. if strings.Index(range_string, "-") < 0 {
  46. id_string := range_string
  47. id, err := NewVolumeId(id_string)
  48. if err != nil {
  49. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
  50. }
  51. e = s.addVolume(VolumeId(id), collection, rt)
  52. } else {
  53. pair := strings.Split(range_string, "-")
  54. start, start_err := strconv.ParseUint(pair[0], 10, 64)
  55. if start_err != nil {
  56. return fmt.Errorf("Volume Start Id %s is not a valid unsigned integer!", pair[0])
  57. }
  58. end, end_err := strconv.ParseUint(pair[1], 10, 64)
  59. if end_err != nil {
  60. return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
  61. }
  62. for id := start; id <= end; id++ {
  63. if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
  64. e = err
  65. }
  66. }
  67. }
  68. }
  69. return e
  70. }
  71. func (s *Store) findVolume(vid VolumeId) *Volume {
  72. for _, location := range s.locations {
  73. if v, found := location.volumes[vid]; found {
  74. return v
  75. }
  76. }
  77. return nil
  78. }
  79. func (s *Store) findFreeLocation() (ret *DiskLocation) {
  80. max := 0
  81. for _, location := range s.locations {
  82. currentFreeCount := location.maxVolumeCount - len(location.volumes)
  83. if currentFreeCount > max {
  84. max = currentFreeCount
  85. ret = location
  86. }
  87. }
  88. return ret
  89. }
  90. func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error {
  91. if s.findVolume(vid) != nil {
  92. return fmt.Errorf("Volume Id %s already exists!", vid)
  93. }
  94. if location := s.findFreeLocation(); location != nil {
  95. glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType)
  96. if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil {
  97. location.volumes[vid] = volume
  98. return nil
  99. } else {
  100. return err
  101. }
  102. }
  103. return fmt.Errorf("No more free space left")
  104. }
  105. func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
  106. vid, err := NewVolumeId(volumeIdString)
  107. if err != nil {
  108. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString), false
  109. }
  110. garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
  111. if e != nil {
  112. return fmt.Errorf("garbageThreshold %s is not a valid float number!", garbageThresholdString), false
  113. }
  114. if v := s.findVolume(vid); v != nil {
  115. return nil, garbageThreshold < v.garbageLevel()
  116. }
  117. return fmt.Errorf("volume id %s is not found during check compact!", vid), false
  118. }
  119. func (s *Store) CompactVolume(volumeIdString string) error {
  120. vid, err := NewVolumeId(volumeIdString)
  121. if err != nil {
  122. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  123. }
  124. if v := s.findVolume(vid); v != nil {
  125. return v.Compact()
  126. }
  127. return fmt.Errorf("volume id %s is not found during compact!", vid)
  128. }
  129. func (s *Store) CommitCompactVolume(volumeIdString string) error {
  130. vid, err := NewVolumeId(volumeIdString)
  131. if err != nil {
  132. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  133. }
  134. if v := s.findVolume(vid); v != nil {
  135. return v.commitCompact()
  136. }
  137. return fmt.Errorf("volume id %s is not found during commit compact!", vid)
  138. }
  139. func (s *Store) FreezeVolume(volumeIdString string) error {
  140. vid, err := NewVolumeId(volumeIdString)
  141. if err != nil {
  142. return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", volumeIdString)
  143. }
  144. if v := s.findVolume(vid); v != nil {
  145. if v.readOnly {
  146. return fmt.Errorf("Volume %s is already read-only", volumeIdString)
  147. }
  148. return v.freeze()
  149. }
  150. return fmt.Errorf("volume id %s is not found during freeze!", vid)
  151. }
  152. func (l *DiskLocation) loadExistingVolumes() {
  153. if dirs, err := ioutil.ReadDir(l.directory); err == nil {
  154. for _, dir := range dirs {
  155. name := dir.Name()
  156. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  157. collection := ""
  158. base := name[:len(name)-len(".dat")]
  159. i := strings.Index(base, "_")
  160. if i > 0 {
  161. collection, base = base[0:i], base[i+1:]
  162. }
  163. if vid, err := NewVolumeId(base); err == nil {
  164. if l.volumes[vid] == nil {
  165. if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil {
  166. l.volumes[vid] = v
  167. glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
  168. }
  169. }
  170. }
  171. }
  172. }
  173. }
  174. glog.V(0).Infoln("Store started on dir:", l.directory, "with", len(l.volumes), "volumes", "max", l.maxVolumeCount)
  175. }
  176. func (s *Store) Status() []*VolumeInfo {
  177. var stats []*VolumeInfo
  178. for _, location := range s.locations {
  179. for k, v := range location.volumes {
  180. s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
  181. Collection: v.Collection,
  182. RepType: v.ReplicaType,
  183. Version: v.Version(),
  184. FileCount: v.nm.FileCount(),
  185. DeleteCount: v.nm.DeletedCount(),
  186. DeletedByteCount: v.nm.DeletedSize(),
  187. ReadOnly: v.readOnly}
  188. stats = append(stats, s)
  189. }
  190. }
  191. return stats
  192. }
  193. type JoinResult struct {
  194. VolumeSizeLimit uint64
  195. }
  196. func (s *Store) SetMaster(mserver string) {
  197. s.masterNode = mserver
  198. }
  199. func (s *Store) SetDataCenter(dataCenter string) {
  200. s.dataCenter = dataCenter
  201. }
  202. func (s *Store) SetRack(rack string) {
  203. s.rack = rack
  204. }
  205. func (s *Store) Join() error {
  206. stats := new([]*VolumeInfo)
  207. maxVolumeCount := 0
  208. for _, location := range s.locations {
  209. maxVolumeCount = maxVolumeCount + location.maxVolumeCount
  210. for k, v := range location.volumes {
  211. s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
  212. Collection: v.Collection,
  213. RepType: v.ReplicaType,
  214. Version: v.Version(),
  215. FileCount: v.nm.FileCount(),
  216. DeleteCount: v.nm.DeletedCount(),
  217. DeletedByteCount: v.nm.DeletedSize(),
  218. ReadOnly: v.readOnly}
  219. *stats = append(*stats, s)
  220. }
  221. }
  222. bytes, _ := json.Marshal(stats)
  223. values := make(url.Values)
  224. if !s.connected {
  225. values.Add("init", "true")
  226. }
  227. values.Add("port", strconv.Itoa(s.Port))
  228. values.Add("ip", s.Ip)
  229. values.Add("publicUrl", s.PublicUrl)
  230. values.Add("volumes", string(bytes))
  231. values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
  232. values.Add("dataCenter", s.dataCenter)
  233. values.Add("rack", s.rack)
  234. jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
  235. if err != nil {
  236. return err
  237. }
  238. var ret JoinResult
  239. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  240. return err
  241. }
  242. s.volumeSizeLimit = ret.VolumeSizeLimit
  243. s.connected = true
  244. return nil
  245. }
  246. func (s *Store) Close() {
  247. for _, location := range s.locations {
  248. for _, v := range location.volumes {
  249. v.Close()
  250. }
  251. }
  252. }
  253. func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
  254. if v := s.findVolume(i); v != nil {
  255. if v.readOnly {
  256. err = fmt.Errorf("Volume %s is read only!", i)
  257. return
  258. } else {
  259. if s.volumeSizeLimit >= v.ContentSize()+uint64(size) {
  260. size, err = v.write(n)
  261. } else {
  262. err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
  263. }
  264. if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
  265. glog.V(0).Infoln("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
  266. if err = s.Join(); err != nil {
  267. glog.V(0).Infoln("error with Join:", err)
  268. }
  269. }
  270. }
  271. return
  272. }
  273. glog.V(0).Infoln("volume", i, "not found!")
  274. err = fmt.Errorf("Volume %s not found!", i)
  275. return
  276. }
  277. func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
  278. if v := s.findVolume(i); v != nil && !v.readOnly {
  279. return v.delete(n)
  280. }
  281. return 0, nil
  282. }
  283. func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
  284. if v := s.findVolume(i); v != nil {
  285. return v.read(n)
  286. }
  287. return 0, fmt.Errorf("Volume %s not found!", i)
  288. }
  289. func (s *Store) GetVolume(i VolumeId) *Volume {
  290. return s.findVolume(i)
  291. }
  292. func (s *Store) HasVolume(i VolumeId) bool {
  293. v := s.findVolume(i)
  294. return v != nil
  295. }