You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
6.8 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "sync"
  9. )
  10. const (
  11. SuperBlockSize = 8
  12. )
  13. type SuperBlock struct {
  14. Version Version
  15. ReplicaType ReplicationType
  16. }
  17. func (s *SuperBlock) Bytes() []byte {
  18. header := make([]byte, SuperBlockSize)
  19. header[0] = byte(s.Version)
  20. header[1] = s.ReplicaType.Byte()
  21. return header
  22. }
  23. type Volume struct {
  24. Id VolumeId
  25. dir string
  26. dataFile *os.File
  27. nm *NeedleMap
  28. SuperBlock
  29. accessLock sync.Mutex
  30. }
  31. func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  32. v = &Volume{dir: dirname, Id: id}
  33. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  34. e = v.load(true)
  35. return
  36. }
  37. func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
  38. v = &Volume{dir: dirname, Id: id}
  39. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  40. e = v.load(false)
  41. return
  42. }
  43. func (v *Volume) load(alsoLoadIndex bool) error {
  44. var e error
  45. fileName := path.Join(v.dir, v.Id.String())
  46. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  47. if e != nil {
  48. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  49. }
  50. if v.ReplicaType == CopyNil {
  51. if e = v.readSuperBlock(); e != nil {
  52. return e
  53. }
  54. } else {
  55. v.maybeWriteSuperBlock()
  56. }
  57. if alsoLoadIndex {
  58. indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
  59. if ie != nil {
  60. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  61. }
  62. v.nm = LoadNeedleMap(indexFile)
  63. }
  64. return nil
  65. }
  66. func (v *Volume) Version() Version {
  67. return v.SuperBlock.Version
  68. }
  69. func (v *Volume) Size() int64 {
  70. v.accessLock.Lock()
  71. defer v.accessLock.Unlock()
  72. stat, e := v.dataFile.Stat()
  73. if e == nil {
  74. return stat.Size()
  75. }
  76. fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
  77. return -1
  78. }
  79. func (v *Volume) Close() {
  80. v.accessLock.Lock()
  81. defer v.accessLock.Unlock()
  82. v.nm.Close()
  83. v.dataFile.Close()
  84. }
  85. func (v *Volume) maybeWriteSuperBlock() {
  86. stat, e := v.dataFile.Stat()
  87. if e != nil {
  88. fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e)
  89. return
  90. }
  91. if stat.Size() == 0 {
  92. v.SuperBlock.Version = CurrentVersion
  93. v.dataFile.Write(v.SuperBlock.Bytes())
  94. }
  95. }
  96. func (v *Volume) readSuperBlock() (err error) {
  97. v.dataFile.Seek(0, 0)
  98. header := make([]byte, SuperBlockSize)
  99. if _, e := v.dataFile.Read(header); e != nil {
  100. return fmt.Errorf("cannot read superblock: %s", e)
  101. }
  102. v.SuperBlock, err = ParseSuperBlock(header)
  103. return err
  104. }
  105. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  106. superBlock.Version = Version(header[0])
  107. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  108. err = fmt.Errorf("cannot read replica type: %s", err)
  109. }
  110. return
  111. }
  112. func (v *Volume) NeedToReplicate() bool {
  113. return v.ReplicaType.GetCopyCount() > 1
  114. }
  115. func (v *Volume) write(n *Needle) (size uint32, err error) {
  116. v.accessLock.Lock()
  117. defer v.accessLock.Unlock()
  118. var offset int64
  119. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  120. return
  121. }
  122. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  123. return
  124. }
  125. nv, ok := v.nm.Get(n.Id)
  126. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  127. _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
  128. }
  129. return
  130. }
  131. func (v *Volume) delete(n *Needle) (uint32, error) {
  132. v.accessLock.Lock()
  133. defer v.accessLock.Unlock()
  134. nv, ok := v.nm.Get(n.Id)
  135. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  136. if ok {
  137. v.nm.Delete(n.Id)
  138. v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
  139. _, err := n.Append(v.dataFile, v.Version())
  140. return nv.Size, err
  141. }
  142. return 0, nil
  143. }
  144. func (v *Volume) read(n *Needle) (int, error) {
  145. v.accessLock.Lock()
  146. defer v.accessLock.Unlock()
  147. nv, ok := v.nm.Get(n.Id)
  148. if ok && nv.Offset > 0 {
  149. v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
  150. return n.Read(v.dataFile, nv.Size, v.Version())
  151. }
  152. return -1, errors.New("Not Found")
  153. }
  154. func (v *Volume) garbageLevel() float64 {
  155. return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
  156. }
  157. func (v *Volume) compact() error {
  158. v.accessLock.Lock()
  159. defer v.accessLock.Unlock()
  160. filePath := path.Join(v.dir, v.Id.String())
  161. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  162. }
  163. func (v *Volume) commitCompact() error {
  164. v.accessLock.Lock()
  165. defer v.accessLock.Unlock()
  166. v.dataFile.Close()
  167. var e error
  168. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  169. return e
  170. }
  171. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  172. return e
  173. }
  174. if e = v.load(true); e != nil {
  175. return e
  176. }
  177. return nil
  178. }
  179. func ScanVolumeFile(dirname string, id VolumeId,
  180. visitSuperBlock func(SuperBlock) error,
  181. visitNeedle func(n *Needle, offset uint32) error) (err error) {
  182. var v *Volume
  183. if v, err = LoadVolumeOnly(dirname, id); err != nil {
  184. return
  185. }
  186. if err = visitSuperBlock(v.SuperBlock); err != nil {
  187. return
  188. }
  189. version := v.Version()
  190. offset := uint32(SuperBlockSize)
  191. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  192. if e != nil {
  193. err = fmt.Errorf("cannot read needle header: %s", e)
  194. return
  195. }
  196. for n != nil {
  197. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  198. err = fmt.Errorf("cannot read needle body: %s", err)
  199. return
  200. }
  201. if err = visitNeedle(n, offset); err != nil {
  202. return
  203. }
  204. offset += NeedleHeaderSize + rest
  205. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  206. if err == io.EOF {
  207. return nil
  208. }
  209. return fmt.Errorf("cannot read needle header: %s", err)
  210. }
  211. }
  212. return
  213. }
  214. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  215. var (
  216. dst, idx *os.File
  217. )
  218. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  219. return
  220. }
  221. defer dst.Close()
  222. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  223. return
  224. }
  225. defer idx.Close()
  226. nm := NewNeedleMap(idx)
  227. new_offset := uint32(SuperBlockSize)
  228. err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
  229. _, err = dst.Write(superBlock.Bytes())
  230. return err
  231. }, func(n *Needle, offset uint32) error {
  232. nv, ok := v.nm.Get(n.Id)
  233. //log.Println("file size is", n.Size, "rest", rest)
  234. if ok && nv.Offset*NeedlePaddingSize == offset {
  235. if nv.Size > 0 {
  236. if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
  237. return fmt.Errorf("cannot put needle: %s", err)
  238. }
  239. if _, err = n.Append(dst, v.Version()); err != nil {
  240. return fmt.Errorf("cannot append needle: %s", err)
  241. }
  242. new_offset += n.DiskSize()
  243. //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  244. }
  245. }
  246. return nil
  247. })
  248. return
  249. }
  250. func (v *Volume) ContentSize() uint64 {
  251. return v.nm.fileByteCounter
  252. }