You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.3 KiB

  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/util"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/tgulacsi/go-cdb"
  8. "os"
  9. "path/filepath"
  10. )
  11. // CDB-backed read-only needle map
  12. type cdbMap struct {
  13. c1, c2 *cdb.Cdb
  14. fn1, fn2 string
  15. mapMetric
  16. }
  17. const maxCdbRecCount = 100000000
  18. var errReadOnly = errors.New("cannot modify a read-only map")
  19. // opens the cdb file(s) (base.cdb OR base.1.cdb AND base.2.cdb)
  20. // in case of two files, the metric (at key 'M') must be in base.2.cdb
  21. func OpenCdbMap(fileName string) (m *cdbMap, err error) {
  22. m = new(cdbMap)
  23. if m.c1, err = cdb.Open(fileName); err == nil {
  24. m.fn1 = fileName
  25. err = getMetric(m.c1, &m.mapMetric)
  26. return
  27. }
  28. if os.IsNotExist(err) {
  29. bn, ext := nakeFilename(fileName)
  30. m.fn1 = bn + ".1" + ext
  31. if m.c1, err = cdb.Open(m.fn1); err != nil {
  32. return nil, err
  33. }
  34. m.fn2 = bn + ".2" + ext
  35. if m.c2, err = cdb.Open(m.fn2); err != nil {
  36. return nil, err
  37. }
  38. err = getMetric(m.c2, &m.mapMetric)
  39. return
  40. }
  41. return nil, err
  42. }
  43. func (m *cdbMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  44. return -1, errReadOnly
  45. }
  46. func (m *cdbMap) Delete(key uint64) error {
  47. return errReadOnly
  48. }
  49. func (m *cdbMap) Close() {
  50. if m.c2 != nil {
  51. m.c2.Close()
  52. m.c2 = nil
  53. }
  54. if m.c1 != nil {
  55. m.c1.Close()
  56. m.c1 = nil
  57. }
  58. }
  59. func (m cdbMap) ContentSize() uint64 {
  60. return m.FileByteCounter
  61. }
  62. func (m cdbMap) DeletedSize() uint64 {
  63. return m.DeletionByteCounter
  64. }
  65. func (m cdbMap) FileCount() int {
  66. return m.FileCounter
  67. }
  68. func (m *cdbMap) DeletedCount() int {
  69. return m.DeletionCounter
  70. }
  71. func getMetric(c *cdb.Cdb, m *mapMetric) error {
  72. data, err := c.Data([]byte{'M'})
  73. if err != nil {
  74. return err
  75. }
  76. return json.Unmarshal(data, m)
  77. }
  78. func (m cdbMap) Get(key uint64) (element *NeedleValue, ok bool) {
  79. var (
  80. data []byte
  81. k []byte = make([]byte, 8)
  82. err error
  83. )
  84. util.Uint64toBytes(k, key)
  85. if data, err = m.c1.Data(k); err != nil || data == nil {
  86. if m.c2 == nil {
  87. return nil, false
  88. }
  89. if data, err = m.c2.Data(k); err != nil || data == nil {
  90. return nil, false
  91. }
  92. }
  93. return &NeedleValue{Key: Key(key), Offset: util.BytesToUint32(data[:4]),
  94. Size: util.BytesToUint32(data[4:])}, true
  95. }
  96. func (m cdbMap) Visit(visit func(NeedleValue) error) (err error) {
  97. fh, err := os.Open(m.fn1)
  98. if err != nil {
  99. return fmt.Errorf("cannot open %s: %s", m.fn1, err)
  100. }
  101. defer fh.Close()
  102. walk := func(elt cdb.Element) error {
  103. if len(elt.Key) != 8 {
  104. return nil
  105. }
  106. return visit(NeedleValue{Key: Key(util.BytesToUint64(elt.Key)),
  107. Offset: util.BytesToUint32(elt.Data[:4]),
  108. Size: util.BytesToUint32(elt.Data[4:8])})
  109. }
  110. if err = cdb.DumpMap(fh, walk); err != nil {
  111. return err
  112. }
  113. if m.c2 == nil {
  114. return nil
  115. }
  116. fh.Close()
  117. if fh, err = os.Open(m.fn2); err != nil {
  118. return fmt.Errorf("cannot open %s: %s", m.fn2, err)
  119. }
  120. return cdb.DumpMap(fh, walk)
  121. }
  122. // converts an .idx index to a cdb
  123. func ConvertIndexToCdb(cdbName string, index *os.File) error {
  124. idx, err := LoadNeedleMap(index)
  125. if err != nil {
  126. return fmt.Errorf("error loading needle map %s: %s", index, err)
  127. }
  128. defer idx.Close()
  129. return DumpNeedleMapToCdb(cdbName, idx)
  130. }
  131. // dumps a NeedleMap into a cdb
  132. func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error {
  133. tempnam := cdbName + "t"
  134. fnames := make([]string, 1, 2)
  135. adder, closer, err := openTempCdb(tempnam)
  136. if err != nil {
  137. return fmt.Errorf("error creating factory: %s", err)
  138. }
  139. fnames[0] = tempnam
  140. elt := cdb.Element{Key: make([]byte, 8), Data: make([]byte, 8)}
  141. fcount := uint64(0)
  142. walk := func(key uint64, offset, size uint32) error {
  143. if fcount >= maxCdbRecCount {
  144. if err = closer(); err != nil {
  145. return err
  146. }
  147. tempnam = cdbName + "t2"
  148. if adder, closer, err = openTempCdb(tempnam); err != nil {
  149. return fmt.Errorf("error creating second factory: %s", err)
  150. }
  151. fnames = append(fnames, tempnam)
  152. fcount = 0
  153. }
  154. util.Uint64toBytes(elt.Key, key)
  155. util.Uint32toBytes(elt.Data[:4], offset)
  156. util.Uint32toBytes(elt.Data[4:], size)
  157. fcount++
  158. return adder(elt)
  159. }
  160. // and write out the cdb from there
  161. err = nm.Visit(func(nv NeedleValue) error {
  162. return walk(uint64(nv.Key), nv.Offset, nv.Size)
  163. })
  164. if err != nil {
  165. closer()
  166. return fmt.Errorf("error walking index %s: %s", nm, err)
  167. }
  168. // store fileBytes
  169. data, e := json.Marshal(nm.mapMetric)
  170. if e != nil {
  171. return fmt.Errorf("error marshaling metric %s: %s", nm.mapMetric, e)
  172. }
  173. if err = adder(cdb.Element{Key: []byte{'M'}, Data: data}); err != nil {
  174. return err
  175. }
  176. if err = closer(); err != nil {
  177. return err
  178. }
  179. os.Remove(cdbName)
  180. if len(fnames) == 1 {
  181. return os.Rename(fnames[0], cdbName)
  182. }
  183. bn, ext := nakeFilename(cdbName)
  184. if err = os.Rename(fnames[0], bn+".1"+ext); err != nil {
  185. return err
  186. }
  187. return os.Rename(fnames[1], bn+".2"+ext)
  188. }
  189. func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) {
  190. fh, err := os.Create(fileName)
  191. if err != nil {
  192. return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err)
  193. }
  194. adder, closer, err := cdb.MakeFactory(fh)
  195. if err != nil {
  196. fh.Close()
  197. return nil, nil, fmt.Errorf("error creating factory: %s", err)
  198. }
  199. return adder, func() error {
  200. if e := closer(); e != nil {
  201. fh.Close()
  202. return e
  203. }
  204. fh.Close()
  205. return nil
  206. }, nil
  207. }
  208. // returns filename without extension, and the extension
  209. func nakeFilename(fileName string) (string, string) {
  210. ext := filepath.Ext(fileName)
  211. return fileName[:len(fileName)-len(ext)], ext
  212. }