You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
2.0 KiB

  1. package storage
  2. import (
  3. "io"
  4. "log"
  5. "os"
  6. "path"
  7. "strconv"
  8. "sync"
  9. )
  10. const (
  11. SuperBlockSize = 8
  12. )
  13. type Volume struct {
  14. Id uint32
  15. dir string
  16. dataFile *os.File
  17. nm *NeedleMap
  18. accessLock sync.Mutex
  19. }
  20. func NewVolume(dirname string, id uint32) (v *Volume) {
  21. var e error
  22. v = &Volume{dir: dirname, Id: id}
  23. fileName := strconv.FormatUint(uint64(v.Id), 10)
  24. v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
  25. if e != nil {
  26. log.Fatalf("New Volume [ERROR] %s\n", e)
  27. }
  28. v.maybeWriteSuperBlock()
  29. indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
  30. if ie != nil {
  31. log.Fatalf("Write Volume Index [ERROR] %s\n", ie)
  32. }
  33. v.nm = LoadNeedleMap(indexFile)
  34. return
  35. }
  36. func (v *Volume) Size() int64 {
  37. stat, e := v.dataFile.Stat()
  38. if e == nil {
  39. return stat.Size()
  40. }
  41. return -1
  42. }
  43. func (v *Volume) Close() {
  44. v.nm.Close()
  45. v.dataFile.Close()
  46. }
  47. func (v *Volume) maybeWriteSuperBlock() {
  48. stat, _ := v.dataFile.Stat()
  49. if stat.Size() == 0 {
  50. header := make([]byte, SuperBlockSize)
  51. header[0] = 1
  52. v.dataFile.Write(header)
  53. }
  54. }
  55. func (v *Volume) write(n *Needle) uint32 {
  56. v.accessLock.Lock()
  57. defer v.accessLock.Unlock()
  58. offset, _ := v.dataFile.Seek(0, 2)
  59. ret := n.Append(v.dataFile)
  60. nv, ok := v.nm.Get(n.Key)
  61. if !ok || int64(nv.Offset)*8 < offset {
  62. v.nm.Put(n.Key, uint32(offset/8), n.Size)
  63. }
  64. return ret
  65. }
  66. func (v *Volume) delete(n *Needle) uint32 {
  67. v.accessLock.Lock()
  68. defer v.accessLock.Unlock()
  69. nv, ok := v.nm.Get(n.Key)
  70. //log.Println("key", n.Key, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  71. if ok {
  72. v.nm.Delete(n.Key)
  73. v.dataFile.Seek(int64(nv.Offset*8), 0)
  74. n.Append(v.dataFile)
  75. return nv.Size
  76. }
  77. return 0
  78. }
  79. func (v *Volume) read(n *Needle) (int, error) {
  80. v.accessLock.Lock()
  81. defer v.accessLock.Unlock()
  82. nv, ok := v.nm.Get(n.Key)
  83. if ok && nv.Offset > 0 {
  84. v.dataFile.Seek(int64(nv.Offset)*8, 0)
  85. return n.Read(v.dataFile, nv.Size)
  86. }
  87. return -1, io.EOF
  88. }