You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
6.8 KiB

12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "sync"
  9. )
  10. const (
  11. SuperBlockSize = 8
  12. )
  13. type SuperBlock struct {
  14. Version Version
  15. ReplicaType ReplicationType
  16. }
  17. func (s *SuperBlock) Bytes() []byte {
  18. header := make([]byte, SuperBlockSize)
  19. header[0] = byte(s.Version)
  20. header[1] = s.ReplicaType.Byte()
  21. return header
  22. }
  23. type Volume struct {
  24. Id VolumeId
  25. dir string
  26. dataFile *os.File
  27. nm *NeedleMap
  28. SuperBlock
  29. accessLock sync.Mutex
  30. }
  31. func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  32. v = &Volume{dir: dirname, Id: id}
  33. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  34. e = v.load(true)
  35. return
  36. }
  37. func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
  38. v = &Volume{dir: dirname, Id: id}
  39. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  40. e = v.load(false)
  41. return
  42. }
  43. func (v *Volume) load(alsoLoadIndex bool) error {
  44. var e error
  45. fileName := path.Join(v.dir, v.Id.String())
  46. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  47. if e != nil {
  48. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  49. }
  50. if v.ReplicaType == CopyNil {
  51. e = v.readSuperBlock()
  52. } else {
  53. e = v.maybeWriteSuperBlock()
  54. }
  55. if e == nil && alsoLoadIndex {
  56. indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
  57. if ie != nil {
  58. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  59. }
  60. v.nm, e = LoadNeedleMap(indexFile)
  61. }
  62. return e
  63. }
  64. func (v *Volume) Version() Version {
  65. return v.SuperBlock.Version
  66. }
  67. func (v *Volume) Size() int64 {
  68. v.accessLock.Lock()
  69. defer v.accessLock.Unlock()
  70. stat, e := v.dataFile.Stat()
  71. if e == nil {
  72. return stat.Size()
  73. }
  74. fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
  75. return -1
  76. }
  77. func (v *Volume) Close() {
  78. v.accessLock.Lock()
  79. defer v.accessLock.Unlock()
  80. v.nm.Close()
  81. v.dataFile.Close()
  82. }
  83. func (v *Volume) maybeWriteSuperBlock() error {
  84. stat, e := v.dataFile.Stat()
  85. if e != nil {
  86. fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e)
  87. return e
  88. }
  89. if stat.Size() == 0 {
  90. v.SuperBlock.Version = CurrentVersion
  91. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  92. }
  93. return e
  94. }
  95. func (v *Volume) readSuperBlock() (err error) {
  96. v.dataFile.Seek(0, 0)
  97. header := make([]byte, SuperBlockSize)
  98. if _, e := v.dataFile.Read(header); e != nil {
  99. return fmt.Errorf("cannot read superblock: %s", e)
  100. }
  101. v.SuperBlock, err = ParseSuperBlock(header)
  102. return err
  103. }
  104. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  105. superBlock.Version = Version(header[0])
  106. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  107. err = fmt.Errorf("cannot read replica type: %s", err)
  108. }
  109. return
  110. }
  111. func (v *Volume) NeedToReplicate() bool {
  112. return v.ReplicaType.GetCopyCount() > 1
  113. }
  114. func (v *Volume) write(n *Needle) (size uint32, err error) {
  115. v.accessLock.Lock()
  116. defer v.accessLock.Unlock()
  117. var offset int64
  118. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  119. return
  120. }
  121. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  122. v.dataFile.Truncate(offset)
  123. return
  124. }
  125. nv, ok := v.nm.Get(n.Id)
  126. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  127. _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
  128. }
  129. return
  130. }
  131. func (v *Volume) delete(n *Needle) (uint32, error) {
  132. v.accessLock.Lock()
  133. defer v.accessLock.Unlock()
  134. nv, ok := v.nm.Get(n.Id)
  135. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  136. if ok {
  137. v.nm.Delete(n.Id)
  138. v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
  139. _, err := n.Append(v.dataFile, v.Version())
  140. return nv.Size, err
  141. }
  142. return 0, nil
  143. }
  144. func (v *Volume) read(n *Needle) (int, error) {
  145. v.accessLock.Lock()
  146. defer v.accessLock.Unlock()
  147. nv, ok := v.nm.Get(n.Id)
  148. if ok && nv.Offset > 0 {
  149. v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
  150. return n.Read(v.dataFile, nv.Size, v.Version())
  151. }
  152. return -1, errors.New("Not Found")
  153. }
  154. func (v *Volume) garbageLevel() float64 {
  155. return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
  156. }
  157. func (v *Volume) compact() error {
  158. v.accessLock.Lock()
  159. defer v.accessLock.Unlock()
  160. filePath := path.Join(v.dir, v.Id.String())
  161. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  162. }
  163. func (v *Volume) commitCompact() error {
  164. v.accessLock.Lock()
  165. defer v.accessLock.Unlock()
  166. v.dataFile.Close()
  167. var e error
  168. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  169. return e
  170. }
  171. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  172. return e
  173. }
  174. if e = v.load(true); e != nil {
  175. return e
  176. }
  177. return nil
  178. }
  179. func ScanVolumeFile(dirname string, id VolumeId,
  180. visitSuperBlock func(SuperBlock) error,
  181. visitNeedle func(n *Needle, offset uint32) error) (err error) {
  182. var v *Volume
  183. if v, err = LoadVolumeOnly(dirname, id); err != nil {
  184. return
  185. }
  186. if err = visitSuperBlock(v.SuperBlock); err != nil {
  187. return
  188. }
  189. version := v.Version()
  190. offset := uint32(SuperBlockSize)
  191. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  192. if e != nil {
  193. err = fmt.Errorf("cannot read needle header: %s", e)
  194. return
  195. }
  196. for n != nil {
  197. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  198. err = fmt.Errorf("cannot read needle body: %s", err)
  199. return
  200. }
  201. if err = visitNeedle(n, offset); err != nil {
  202. return
  203. }
  204. offset += NeedleHeaderSize + rest
  205. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  206. if err == io.EOF {
  207. return nil
  208. }
  209. return fmt.Errorf("cannot read needle header: %s", err)
  210. }
  211. }
  212. return
  213. }
  214. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  215. var (
  216. dst, idx *os.File
  217. )
  218. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  219. return
  220. }
  221. defer dst.Close()
  222. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  223. return
  224. }
  225. defer idx.Close()
  226. nm := NewNeedleMap(idx)
  227. new_offset := uint32(SuperBlockSize)
  228. err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
  229. _, err = dst.Write(superBlock.Bytes())
  230. return err
  231. }, func(n *Needle, offset uint32) error {
  232. nv, ok := v.nm.Get(n.Id)
  233. //log.Println("file size is", n.Size, "rest", rest)
  234. if ok && nv.Offset*NeedlePaddingSize == offset {
  235. if nv.Size > 0 {
  236. if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
  237. return fmt.Errorf("cannot put needle: %s", err)
  238. }
  239. if _, err = n.Append(dst, v.Version()); err != nil {
  240. return fmt.Errorf("cannot append needle: %s", err)
  241. }
  242. new_offset += n.DiskSize()
  243. //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  244. }
  245. }
  246. return nil
  247. })
  248. return
  249. }
  250. func (v *Volume) ContentSize() uint64 {
  251. return v.nm.fileByteCounter
  252. }