You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
5.5 KiB

  1. package storage
  2. import (
  3. "github.com/chrislusf/weed-fs/go/util"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/tgulacsi/go-cdb"
  8. "os"
  9. "path/filepath"
  10. )
  11. // CDB-backed read-only needle map
  12. type cdbMap struct {
  13. c1, c2 *cdb.Cdb
  14. fn1, fn2 string
  15. mapMetric
  16. }
  17. const maxCdbRecCount = 100000000
  18. var errReadOnly = errors.New("cannot modify a read-only map")
  19. // opens the cdb file(s) (base.cdb OR base.1.cdb AND base.2.cdb)
  20. // in case of two files, the metric (at key 'M') must be in base.2.cdb
  21. func OpenCdbMap(fileName string) (m *cdbMap, err error) {
  22. m = new(cdbMap)
  23. if m.c1, err = cdb.Open(fileName); err == nil {
  24. m.fn1 = fileName
  25. err = getMetric(m.c1, &m.mapMetric)
  26. return
  27. }
  28. if os.IsNotExist(err) {
  29. bn, ext := baseFilename(fileName)
  30. m.fn1 = bn + ".1" + ext
  31. if m.c1, err = cdb.Open(m.fn1); err != nil {
  32. return nil, err
  33. }
  34. m.fn2 = bn + ".2" + ext
  35. if m.c2, err = cdb.Open(m.fn2); err != nil {
  36. return nil, err
  37. }
  38. err = getMetric(m.c2, &m.mapMetric)
  39. return
  40. }
  41. return nil, err
  42. }
  43. func (m *cdbMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  44. return -1, errReadOnly
  45. }
  46. func (m *cdbMap) Delete(key uint64) error {
  47. return errReadOnly
  48. }
  49. func (m *cdbMap) Close() {
  50. if m.c2 != nil {
  51. m.c2.Close()
  52. m.c2 = nil
  53. }
  54. if m.c1 != nil {
  55. m.c1.Close()
  56. m.c1 = nil
  57. }
  58. }
  59. func (m *cdbMap) Destroy() error {
  60. return errors.New("Can not delete readonly volumes")
  61. }
  62. func (m cdbMap) ContentSize() uint64 {
  63. return m.FileByteCounter
  64. }
  65. func (m cdbMap) DeletedSize() uint64 {
  66. return m.DeletionByteCounter
  67. }
  68. func (m cdbMap) FileCount() int {
  69. return m.FileCounter
  70. }
  71. func (m *cdbMap) DeletedCount() int {
  72. return m.DeletionCounter
  73. }
  74. func (m *cdbMap) MaxFileKey() uint64 {
  75. return m.MaximumFileKey
  76. }
  77. func getMetric(c *cdb.Cdb, m *mapMetric) error {
  78. data, err := c.Data([]byte{'M'})
  79. if err != nil {
  80. return err
  81. }
  82. return json.Unmarshal(data, m)
  83. }
  84. func (m cdbMap) Get(key uint64) (element *NeedleValue, ok bool) {
  85. var (
  86. data []byte
  87. k []byte = make([]byte, 8)
  88. err error
  89. )
  90. util.Uint64toBytes(k, key)
  91. if data, err = m.c1.Data(k); err != nil || data == nil {
  92. if m.c2 == nil {
  93. return nil, false
  94. }
  95. if data, err = m.c2.Data(k); err != nil || data == nil {
  96. return nil, false
  97. }
  98. }
  99. return &NeedleValue{Key: Key(key), Offset: util.BytesToUint32(data[:4]),
  100. Size: util.BytesToUint32(data[4:])}, true
  101. }
  102. func (m cdbMap) Visit(visit func(NeedleValue) error) (err error) {
  103. fh, err := os.Open(m.fn1)
  104. if err != nil {
  105. return fmt.Errorf("cannot open %s: %s", m.fn1, err)
  106. }
  107. defer fh.Close()
  108. walk := func(elt cdb.Element) error {
  109. if len(elt.Key) != 8 {
  110. return nil
  111. }
  112. return visit(NeedleValue{Key: Key(util.BytesToUint64(elt.Key)),
  113. Offset: util.BytesToUint32(elt.Data[:4]),
  114. Size: util.BytesToUint32(elt.Data[4:8])})
  115. }
  116. if err = cdb.DumpMap(fh, walk); err != nil {
  117. return err
  118. }
  119. if m.c2 == nil {
  120. return nil
  121. }
  122. fh.Close()
  123. if fh, err = os.Open(m.fn2); err != nil {
  124. return fmt.Errorf("cannot open %s: %s", m.fn2, err)
  125. }
  126. return cdb.DumpMap(fh, walk)
  127. }
  128. // converts an .idx index to a cdb
  129. func ConvertIndexToCdb(cdbName string, index *os.File) error {
  130. idx, err := LoadNeedleMap(index)
  131. if err != nil {
  132. return fmt.Errorf("error loading needle map %s: %s", index.Name(), err)
  133. }
  134. defer idx.Close()
  135. return DumpNeedleMapToCdb(cdbName, idx)
  136. }
  137. // dumps a NeedleMap into a cdb
  138. func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error {
  139. tempnam := cdbName + "t"
  140. fnames := make([]string, 1, 2)
  141. adder, closer, err := openTempCdb(tempnam)
  142. if err != nil {
  143. return fmt.Errorf("error creating factory: %s", err)
  144. }
  145. fnames[0] = tempnam
  146. elt := cdb.Element{Key: make([]byte, 8), Data: make([]byte, 8)}
  147. fcount := uint64(0)
  148. walk := func(key uint64, offset, size uint32) error {
  149. if fcount >= maxCdbRecCount {
  150. if err = closer(); err != nil {
  151. return err
  152. }
  153. tempnam = cdbName + "t2"
  154. if adder, closer, err = openTempCdb(tempnam); err != nil {
  155. return fmt.Errorf("error creating second factory: %s", err)
  156. }
  157. fnames = append(fnames, tempnam)
  158. fcount = 0
  159. }
  160. util.Uint64toBytes(elt.Key, key)
  161. util.Uint32toBytes(elt.Data[:4], offset)
  162. util.Uint32toBytes(elt.Data[4:], size)
  163. fcount++
  164. return adder(elt)
  165. }
  166. // and write out the cdb from there
  167. err = nm.Visit(func(nv NeedleValue) error {
  168. return walk(uint64(nv.Key), nv.Offset, nv.Size)
  169. })
  170. if err != nil {
  171. closer()
  172. return fmt.Errorf("error walking index %v: %s", nm, err)
  173. }
  174. // store fileBytes
  175. data, e := json.Marshal(nm.mapMetric)
  176. if e != nil {
  177. return fmt.Errorf("error marshaling metric %v: %s", nm.mapMetric, e)
  178. }
  179. if err = adder(cdb.Element{Key: []byte{'M'}, Data: data}); err != nil {
  180. return err
  181. }
  182. if err = closer(); err != nil {
  183. return err
  184. }
  185. os.Remove(cdbName)
  186. if len(fnames) == 1 {
  187. return os.Rename(fnames[0], cdbName)
  188. }
  189. bn, ext := baseFilename(cdbName)
  190. if err = os.Rename(fnames[0], bn+".1"+ext); err != nil {
  191. return err
  192. }
  193. return os.Rename(fnames[1], bn+".2"+ext)
  194. }
  195. func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) {
  196. fh, err := os.Create(fileName)
  197. if err != nil {
  198. return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err.Error())
  199. }
  200. adder, closer, err := cdb.MakeFactory(fh)
  201. if err != nil {
  202. fh.Close()
  203. return nil, nil, fmt.Errorf("error creating factory: %s", err.Error())
  204. }
  205. return adder, func() error {
  206. if e := closer(); e != nil {
  207. fh.Close()
  208. return e
  209. }
  210. fh.Close()
  211. return nil
  212. }, nil
  213. }
  214. // returns filename without extension, and the extension
  215. func baseFilename(fileName string) (string, string) {
  216. ext := filepath.Ext(fileName)
  217. return fileName[:len(fileName)-len(ext)], ext
  218. }