You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
7.2 KiB

12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "sync"
  9. )
  10. const (
  11. SuperBlockSize = 8
  12. )
  13. type SuperBlock struct {
  14. Version Version
  15. ReplicaType ReplicationType
  16. }
  17. func (s *SuperBlock) Bytes() []byte {
  18. header := make([]byte, SuperBlockSize)
  19. header[0] = byte(s.Version)
  20. header[1] = s.ReplicaType.Byte()
  21. return header
  22. }
  23. type Volume struct {
  24. Id VolumeId
  25. dir string
  26. dataFile *os.File
  27. nm *NeedleMap
  28. SuperBlock
  29. accessLock sync.Mutex
  30. }
  31. func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  32. v = &Volume{dir: dirname, Id: id}
  33. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  34. e = v.load(true)
  35. return
  36. }
  37. func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
  38. v = &Volume{dir: dirname, Id: id}
  39. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  40. e = v.load(false)
  41. return
  42. }
  43. func (v *Volume) load(alsoLoadIndex bool) error {
  44. var e error
  45. fileName := path.Join(v.dir, v.Id.String())
  46. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  47. if e != nil {
  48. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  49. }
  50. if v.ReplicaType == CopyNil {
  51. e = v.readSuperBlock()
  52. } else {
  53. e = v.maybeWriteSuperBlock()
  54. }
  55. if e == nil && alsoLoadIndex {
  56. indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
  57. if ie != nil {
  58. return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
  59. }
  60. v.nm, e = LoadNeedleMap(indexFile)
  61. }
  62. return e
  63. }
  64. func (v *Volume) Version() Version {
  65. return v.SuperBlock.Version
  66. }
  67. func (v *Volume) Size() int64 {
  68. v.accessLock.Lock()
  69. defer v.accessLock.Unlock()
  70. stat, e := v.dataFile.Stat()
  71. if e == nil {
  72. return stat.Size()
  73. }
  74. fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
  75. return -1
  76. }
  77. func (v *Volume) Close() {
  78. v.accessLock.Lock()
  79. defer v.accessLock.Unlock()
  80. v.nm.Close()
  81. _ = v.dataFile.Close()
  82. }
  83. func (v *Volume) maybeWriteSuperBlock() error {
  84. stat, e := v.dataFile.Stat()
  85. if e != nil {
  86. fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e)
  87. return e
  88. }
  89. if stat.Size() == 0 {
  90. v.SuperBlock.Version = CurrentVersion
  91. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  92. }
  93. return e
  94. }
  95. func (v *Volume) readSuperBlock() (err error) {
  96. if _, err = v.dataFile.Seek(0, 0); err != nil {
  97. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err)
  98. }
  99. header := make([]byte, SuperBlockSize)
  100. if _, e := v.dataFile.Read(header); e != nil {
  101. return fmt.Errorf("cannot read superblock: %s", e)
  102. }
  103. v.SuperBlock, err = ParseSuperBlock(header)
  104. return err
  105. }
  106. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  107. superBlock.Version = Version(header[0])
  108. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  109. err = fmt.Errorf("cannot read replica type: %s", err)
  110. }
  111. return
  112. }
  113. func (v *Volume) NeedToReplicate() bool {
  114. return v.ReplicaType.GetCopyCount() > 1
  115. }
  116. func (v *Volume) write(n *Needle) (size uint32, err error) {
  117. v.accessLock.Lock()
  118. defer v.accessLock.Unlock()
  119. var offset int64
  120. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  121. return
  122. }
  123. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  124. if e := v.dataFile.Truncate(offset); e != nil {
  125. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e)
  126. }
  127. return
  128. }
  129. nv, ok := v.nm.Get(n.Id)
  130. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  131. _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
  132. }
  133. return
  134. }
  135. func (v *Volume) delete(n *Needle) (uint32, error) {
  136. v.accessLock.Lock()
  137. defer v.accessLock.Unlock()
  138. nv, ok := v.nm.Get(n.Id)
  139. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  140. if ok {
  141. var err error
  142. if err = v.nm.Delete(n.Id); err != nil {
  143. return nv.Size, err
  144. }
  145. if _, err = v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0); err != nil {
  146. return nv.Size, err
  147. }
  148. _, err = n.Append(v.dataFile, v.Version())
  149. return nv.Size, err
  150. }
  151. return 0, nil
  152. }
  153. func (v *Volume) read(n *Needle) (int, error) {
  154. v.accessLock.Lock()
  155. defer v.accessLock.Unlock()
  156. nv, ok := v.nm.Get(n.Id)
  157. if ok && nv.Offset > 0 {
  158. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  159. return -1, err
  160. }
  161. return n.Read(v.dataFile, nv.Size, v.Version())
  162. }
  163. return -1, errors.New("Not Found")
  164. }
  165. func (v *Volume) garbageLevel() float64 {
  166. return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
  167. }
  168. func (v *Volume) compact() error {
  169. v.accessLock.Lock()
  170. defer v.accessLock.Unlock()
  171. filePath := path.Join(v.dir, v.Id.String())
  172. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  173. }
  174. func (v *Volume) commitCompact() error {
  175. v.accessLock.Lock()
  176. defer v.accessLock.Unlock()
  177. _ = v.dataFile.Close()
  178. var e error
  179. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  180. return e
  181. }
  182. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  183. return e
  184. }
  185. if e = v.load(true); e != nil {
  186. return e
  187. }
  188. return nil
  189. }
  190. func ScanVolumeFile(dirname string, id VolumeId,
  191. visitSuperBlock func(SuperBlock) error,
  192. visitNeedle func(n *Needle, offset uint32) error) (err error) {
  193. var v *Volume
  194. if v, err = LoadVolumeOnly(dirname, id); err != nil {
  195. return
  196. }
  197. if err = visitSuperBlock(v.SuperBlock); err != nil {
  198. return
  199. }
  200. version := v.Version()
  201. offset := uint32(SuperBlockSize)
  202. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  203. if e != nil {
  204. err = fmt.Errorf("cannot read needle header: %s", e)
  205. return
  206. }
  207. for n != nil {
  208. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  209. err = fmt.Errorf("cannot read needle body: %s", err)
  210. return
  211. }
  212. if err = visitNeedle(n, offset); err != nil {
  213. return
  214. }
  215. offset += NeedleHeaderSize + rest
  216. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  217. if err == io.EOF {
  218. return nil
  219. }
  220. return fmt.Errorf("cannot read needle header: %s", err)
  221. }
  222. }
  223. return
  224. }
  225. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  226. var (
  227. dst, idx *os.File
  228. )
  229. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  230. return
  231. }
  232. defer dst.Close()
  233. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  234. return
  235. }
  236. defer idx.Close()
  237. nm := NewNeedleMap(idx)
  238. new_offset := uint32(SuperBlockSize)
  239. err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
  240. _, err = dst.Write(superBlock.Bytes())
  241. return err
  242. }, func(n *Needle, offset uint32) error {
  243. nv, ok := v.nm.Get(n.Id)
  244. //log.Println("file size is", n.Size, "rest", rest)
  245. if ok && nv.Offset*NeedlePaddingSize == offset {
  246. if nv.Size > 0 {
  247. if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
  248. return fmt.Errorf("cannot put needle: %s", err)
  249. }
  250. if _, err = n.Append(dst, v.Version()); err != nil {
  251. return fmt.Errorf("cannot append needle: %s", err)
  252. }
  253. new_offset += n.DiskSize()
  254. //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  255. }
  256. }
  257. return nil
  258. })
  259. return
  260. }
  261. func (v *Volume) ContentSize() uint64 {
  262. return v.nm.fileByteCounter
  263. }