You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
11 KiB

12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. SuperBlockSize = 8
  15. )
  16. type SuperBlock struct {
  17. Version Version
  18. ReplicaType ReplicationType
  19. }
  20. func (s *SuperBlock) Bytes() []byte {
  21. header := make([]byte, SuperBlockSize)
  22. header[0] = byte(s.Version)
  23. header[1] = s.ReplicaType.Byte()
  24. return header
  25. }
  26. type Volume struct {
  27. Id VolumeId
  28. dir string
  29. dataFile *os.File
  30. nm NeedleMapper
  31. readOnly bool
  32. SuperBlock
  33. accessLock sync.Mutex
  34. }
  35. func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Id: id}
  37. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  38. e = v.load(true)
  39. return
  40. }
  41. func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) {
  42. v = &Volume{dir: dirname, Id: id}
  43. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  44. e = v.load(false)
  45. return
  46. }
  47. func (v *Volume) load(alsoLoadIndex bool) error {
  48. var e error
  49. fileName := path.Join(v.dir, v.Id.String())
  50. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead {
  51. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  52. } else if canWrite {
  53. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  54. } else {
  55. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  56. v.dataFile, e = os.Open(fileName + ".dat")
  57. v.readOnly = true
  58. }
  59. if e != nil {
  60. if !os.IsPermission(e) {
  61. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  62. }
  63. }
  64. if v.ReplicaType == CopyNil {
  65. e = v.readSuperBlock()
  66. } else {
  67. e = v.maybeWriteSuperBlock()
  68. }
  69. if e == nil && alsoLoadIndex {
  70. if v.readOnly {
  71. if v.ensureConvertIdxToCdb(fileName) {
  72. v.nm, e = OpenCdbMap(fileName + ".cdb")
  73. return e
  74. }
  75. }
  76. var indexFile *os.File
  77. if v.readOnly {
  78. glog.V(1).Infoln("open to read file", fileName+".idx")
  79. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  80. return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error())
  81. }
  82. } else {
  83. glog.V(1).Infoln("open to write file", fileName+".idx")
  84. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  85. return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error())
  86. }
  87. }
  88. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  89. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  90. glog.V(0).Infoln("loading error:", e)
  91. }
  92. }
  93. return e
  94. }
  95. func (v *Volume) Version() Version {
  96. return v.SuperBlock.Version
  97. }
  98. func (v *Volume) Size() int64 {
  99. v.accessLock.Lock()
  100. defer v.accessLock.Unlock()
  101. stat, e := v.dataFile.Stat()
  102. if e == nil {
  103. return stat.Size()
  104. }
  105. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  106. return -1
  107. }
  108. func (v *Volume) Close() {
  109. v.accessLock.Lock()
  110. defer v.accessLock.Unlock()
  111. v.nm.Close()
  112. _ = v.dataFile.Close()
  113. }
  114. func (v *Volume) maybeWriteSuperBlock() error {
  115. stat, e := v.dataFile.Stat()
  116. if e != nil {
  117. glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
  118. return e
  119. }
  120. if stat.Size() == 0 {
  121. v.SuperBlock.Version = CurrentVersion
  122. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  123. if e != nil && os.IsPermission(e) {
  124. //read-only, but zero length - recreate it!
  125. if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
  126. if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
  127. v.readOnly = false
  128. }
  129. }
  130. }
  131. }
  132. return e
  133. }
  134. func (v *Volume) readSuperBlock() (err error) {
  135. if _, err = v.dataFile.Seek(0, 0); err != nil {
  136. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err.Error())
  137. }
  138. header := make([]byte, SuperBlockSize)
  139. if _, e := v.dataFile.Read(header); e != nil {
  140. return fmt.Errorf("cannot read superblock: %s", e.Error())
  141. }
  142. v.SuperBlock, err = ParseSuperBlock(header)
  143. return err
  144. }
  145. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  146. superBlock.Version = Version(header[0])
  147. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  148. err = fmt.Errorf("cannot read replica type: %s", err.Error())
  149. }
  150. return
  151. }
  152. func (v *Volume) NeedToReplicate() bool {
  153. return v.ReplicaType.GetCopyCount() > 1
  154. }
  155. func (v *Volume) isFileUnchanged(n *Needle) bool {
  156. nv, ok := v.nm.Get(n.Id)
  157. if ok && nv.Offset > 0 {
  158. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  159. return false
  160. }
  161. oldNeedle := new(Needle)
  162. oldNeedle.Read(v.dataFile, nv.Size, v.Version())
  163. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  164. n.Size = oldNeedle.Size
  165. return true
  166. }
  167. }
  168. return false
  169. }
  170. func (v *Volume) write(n *Needle) (size uint32, err error) {
  171. if v.readOnly {
  172. err = fmt.Errorf("%s is read-only", v.dataFile)
  173. return
  174. }
  175. v.accessLock.Lock()
  176. defer v.accessLock.Unlock()
  177. if v.isFileUnchanged(n) {
  178. size = n.Size
  179. return
  180. }
  181. var offset int64
  182. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  183. return
  184. }
  185. //ensure file writing starting from aligned positions
  186. if offset%NeedlePaddingSize != 0 {
  187. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  188. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  189. return
  190. }
  191. }
  192. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  193. if e := v.dataFile.Truncate(offset); e != nil {
  194. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error())
  195. }
  196. return
  197. }
  198. nv, ok := v.nm.Get(n.Id)
  199. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  200. _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
  201. }
  202. return
  203. }
  204. func (v *Volume) delete(n *Needle) (uint32, error) {
  205. if v.readOnly {
  206. return 0, fmt.Errorf("%s is read-only", v.dataFile)
  207. }
  208. v.accessLock.Lock()
  209. defer v.accessLock.Unlock()
  210. nv, ok := v.nm.Get(n.Id)
  211. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  212. if ok {
  213. size := nv.Size
  214. if err := v.nm.Delete(n.Id); err != nil {
  215. return size, err
  216. }
  217. if _, err := v.dataFile.Seek(0, 2); err != nil {
  218. return size, err
  219. }
  220. n.Data = make([]byte, 0)
  221. _, err := n.Append(v.dataFile, v.Version())
  222. return size, err
  223. }
  224. return 0, nil
  225. }
  226. func (v *Volume) read(n *Needle) (int, error) {
  227. v.accessLock.Lock()
  228. defer v.accessLock.Unlock()
  229. nv, ok := v.nm.Get(n.Id)
  230. if ok && nv.Offset > 0 {
  231. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  232. return -1, err
  233. }
  234. return n.Read(v.dataFile, nv.Size, v.Version())
  235. }
  236. return -1, errors.New("Not Found")
  237. }
  238. func (v *Volume) garbageLevel() float64 {
  239. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  240. }
  241. func (v *Volume) compact() error {
  242. v.accessLock.Lock()
  243. defer v.accessLock.Unlock()
  244. filePath := path.Join(v.dir, v.Id.String())
  245. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  246. }
  247. func (v *Volume) commitCompact() error {
  248. v.accessLock.Lock()
  249. defer v.accessLock.Unlock()
  250. _ = v.dataFile.Close()
  251. var e error
  252. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  253. return e
  254. }
  255. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  256. return e
  257. }
  258. if e = v.load(true); e != nil {
  259. return e
  260. }
  261. return nil
  262. }
  263. func (v *Volume) freeze() error {
  264. if v.readOnly {
  265. return nil
  266. }
  267. nm, ok := v.nm.(*NeedleMap)
  268. if !ok {
  269. return nil
  270. }
  271. v.accessLock.Lock()
  272. defer v.accessLock.Unlock()
  273. bn, _ := nakeFilename(v.dataFile.Name())
  274. cdbFn := bn + ".cdb"
  275. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  276. err := DumpNeedleMapToCdb(cdbFn, nm)
  277. if err != nil {
  278. return err
  279. }
  280. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  281. return err
  282. }
  283. nm.indexFile.Close()
  284. os.Remove(nm.indexFile.Name())
  285. v.readOnly = true
  286. return nil
  287. }
  288. func ScanVolumeFile(dirname string, id VolumeId,
  289. visitSuperBlock func(SuperBlock) error,
  290. visitNeedle func(n *Needle, offset uint32) error) (err error) {
  291. var v *Volume
  292. if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
  293. return
  294. }
  295. if err = visitSuperBlock(v.SuperBlock); err != nil {
  296. return
  297. }
  298. version := v.Version()
  299. offset := uint32(SuperBlockSize)
  300. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  301. if e != nil {
  302. err = fmt.Errorf("cannot read needle header: %s", e)
  303. return
  304. }
  305. for n != nil {
  306. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  307. err = fmt.Errorf("cannot read needle body: %s", err)
  308. return
  309. }
  310. if err = visitNeedle(n, offset); err != nil {
  311. return
  312. }
  313. offset += NeedleHeaderSize + rest
  314. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  315. if err == io.EOF {
  316. return nil
  317. }
  318. return fmt.Errorf("cannot read needle header: %s", err)
  319. }
  320. }
  321. return
  322. }
  323. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  324. var (
  325. dst, idx *os.File
  326. )
  327. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  328. return
  329. }
  330. defer dst.Close()
  331. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  332. return
  333. }
  334. defer idx.Close()
  335. nm := NewNeedleMap(idx)
  336. new_offset := uint32(SuperBlockSize)
  337. err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
  338. _, err = dst.Write(superBlock.Bytes())
  339. return err
  340. }, func(n *Needle, offset uint32) error {
  341. nv, ok := v.nm.Get(n.Id)
  342. //glog.V(0).Infoln("file size is", n.Size, "rest", rest)
  343. if ok && nv.Offset*NeedlePaddingSize == offset {
  344. if nv.Size > 0 {
  345. if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
  346. return fmt.Errorf("cannot put needle: %s", err)
  347. }
  348. if _, err = n.Append(dst, v.Version()); err != nil {
  349. return fmt.Errorf("cannot append needle: %s", err)
  350. }
  351. new_offset += n.DiskSize()
  352. //glog.V(0).Infoln("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  353. }
  354. }
  355. return nil
  356. })
  357. return
  358. }
  359. func (v *Volume) ContentSize() uint64 {
  360. return v.nm.ContentSize()
  361. }
  362. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  363. exists = true
  364. fi, err := os.Stat(filename)
  365. if os.IsNotExist(err) {
  366. exists = false
  367. return
  368. }
  369. if fi.Mode()&0400 != 0 {
  370. canRead = true
  371. }
  372. if fi.Mode()&0200 != 0 {
  373. canWrite = true
  374. }
  375. modTime = fi.ModTime()
  376. return
  377. }
  378. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  379. var indexFile *os.File
  380. var e error
  381. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  382. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  383. if cdbCanRead && cdbModTime.After(idxModeTime) {
  384. return true
  385. }
  386. if !cdbCanWrite {
  387. return false
  388. }
  389. if !idxCanRead {
  390. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  391. return false
  392. }
  393. glog.V(2).Infoln("opening file", fileName+".idx")
  394. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  395. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  396. return false
  397. }
  398. defer indexFile.Close()
  399. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  400. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  401. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  402. return false
  403. }
  404. return true
  405. }