You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

462 lines
12 KiB

12 years ago
11 years ago
12 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. SuperBlockSize = 8
  15. )
  16. type SuperBlock struct {
  17. Version Version
  18. ReplicaPlacement *ReplicaPlacement
  19. }
  20. func (s *SuperBlock) Bytes() []byte {
  21. header := make([]byte, SuperBlockSize)
  22. header[0] = byte(s.Version)
  23. header[1] = s.ReplicaPlacement.Byte()
  24. return header
  25. }
  26. type Volume struct {
  27. Id VolumeId
  28. dir string
  29. Collection string
  30. dataFile *os.File
  31. nm NeedleMapper
  32. readOnly bool
  33. SuperBlock
  34. accessLock sync.Mutex
  35. }
  36. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
  37. v = &Volume{dir: dirname, Collection: collection, Id: id}
  38. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
  39. e = v.load(true, true)
  40. return
  41. }
  42. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  43. v = &Volume{dir: dirname, Collection: collection, Id: id}
  44. v.SuperBlock = SuperBlock{}
  45. e = v.load(false, false)
  46. return
  47. }
  48. func (v *Volume) FileName() (fileName string) {
  49. if v.Collection == "" {
  50. fileName = path.Join(v.dir, v.Id.String())
  51. } else {
  52. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  53. }
  54. return
  55. }
  56. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  57. var e error
  58. fileName := v.FileName()
  59. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
  60. if !canRead {
  61. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  62. }
  63. if canWrite {
  64. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  65. } else {
  66. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  67. v.dataFile, e = os.Open(fileName + ".dat")
  68. v.readOnly = true
  69. }
  70. } else {
  71. if createDatIfMissing {
  72. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  73. } else {
  74. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  75. }
  76. }
  77. if e != nil {
  78. if !os.IsPermission(e) {
  79. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  80. }
  81. }
  82. if v.ReplicaPlacement == nil {
  83. e = v.readSuperBlock()
  84. } else {
  85. e = v.maybeWriteSuperBlock()
  86. }
  87. if e == nil && alsoLoadIndex {
  88. if v.readOnly {
  89. if v.ensureConvertIdxToCdb(fileName) {
  90. v.nm, e = OpenCdbMap(fileName + ".cdb")
  91. return e
  92. }
  93. }
  94. var indexFile *os.File
  95. if v.readOnly {
  96. glog.V(1).Infoln("open to read file", fileName+".idx")
  97. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  98. return fmt.Errorf("cannot read Volume Index %s.idx: %s", fileName, e.Error())
  99. }
  100. } else {
  101. glog.V(1).Infoln("open to write file", fileName+".idx")
  102. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  103. return fmt.Errorf("cannot write Volume Index %s.idx: %s", fileName, e.Error())
  104. }
  105. }
  106. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  107. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  108. glog.V(0).Infoln("loading error:", e)
  109. }
  110. }
  111. return e
  112. }
  113. func (v *Volume) Version() Version {
  114. return v.SuperBlock.Version
  115. }
  116. func (v *Volume) Size() int64 {
  117. stat, e := v.dataFile.Stat()
  118. if e == nil {
  119. return stat.Size()
  120. }
  121. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  122. return -1
  123. }
  124. func (v *Volume) Close() {
  125. v.accessLock.Lock()
  126. defer v.accessLock.Unlock()
  127. v.nm.Close()
  128. _ = v.dataFile.Close()
  129. }
  130. func (v *Volume) maybeWriteSuperBlock() error {
  131. stat, e := v.dataFile.Stat()
  132. if e != nil {
  133. glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
  134. return e
  135. }
  136. if stat.Size() == 0 {
  137. v.SuperBlock.Version = CurrentVersion
  138. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  139. if e != nil && os.IsPermission(e) {
  140. //read-only, but zero length - recreate it!
  141. if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
  142. if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
  143. v.readOnly = false
  144. }
  145. }
  146. }
  147. }
  148. return e
  149. }
  150. func (v *Volume) readSuperBlock() (err error) {
  151. if _, err = v.dataFile.Seek(0, 0); err != nil {
  152. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
  153. }
  154. header := make([]byte, SuperBlockSize)
  155. if _, e := v.dataFile.Read(header); e != nil {
  156. return fmt.Errorf("cannot read superblock: %s", e.Error())
  157. }
  158. v.SuperBlock, err = ParseSuperBlock(header)
  159. return err
  160. }
  161. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  162. superBlock.Version = Version(header[0])
  163. if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
  164. err = fmt.Errorf("cannot read replica type: %s", err.Error())
  165. }
  166. return
  167. }
  168. func (v *Volume) NeedToReplicate() bool {
  169. return v.ReplicaPlacement.GetCopyCount() > 1
  170. }
  171. func (v *Volume) isFileUnchanged(n *Needle) bool {
  172. nv, ok := v.nm.Get(n.Id)
  173. if ok && nv.Offset > 0 {
  174. oldNeedle := new(Needle)
  175. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  176. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  177. n.Size = oldNeedle.Size
  178. return true
  179. }
  180. }
  181. return false
  182. }
  183. func (v *Volume) Destroy() (err error) {
  184. if v.readOnly {
  185. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  186. return
  187. }
  188. v.Close()
  189. err = os.Remove(v.dataFile.Name())
  190. if err != nil {
  191. return
  192. }
  193. err = v.nm.Destroy()
  194. return
  195. }
  196. func (v *Volume) write(n *Needle) (size uint32, err error) {
  197. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  198. if v.readOnly {
  199. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  200. return
  201. }
  202. v.accessLock.Lock()
  203. defer v.accessLock.Unlock()
  204. if v.isFileUnchanged(n) {
  205. size = n.Size
  206. glog.V(4).Infof("needle is unchanged!")
  207. return
  208. }
  209. var offset int64
  210. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  211. return
  212. }
  213. //ensure file writing starting from aligned positions
  214. if offset%NeedlePaddingSize != 0 {
  215. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  216. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  217. glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
  218. return
  219. }
  220. }
  221. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  222. if e := v.dataFile.Truncate(offset); e != nil {
  223. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile.Name(), e.Error())
  224. }
  225. return
  226. }
  227. nv, ok := v.nm.Get(n.Id)
  228. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  229. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  230. glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
  231. }
  232. }
  233. return
  234. }
  235. func (v *Volume) delete(n *Needle) (uint32, error) {
  236. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  237. if v.readOnly {
  238. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  239. }
  240. v.accessLock.Lock()
  241. defer v.accessLock.Unlock()
  242. nv, ok := v.nm.Get(n.Id)
  243. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  244. if ok {
  245. size := nv.Size
  246. if err := v.nm.Delete(n.Id); err != nil {
  247. return size, err
  248. }
  249. if _, err := v.dataFile.Seek(0, 2); err != nil {
  250. return size, err
  251. }
  252. n.Data = make([]byte, 0)
  253. _, err := n.Append(v.dataFile, v.Version())
  254. return size, err
  255. }
  256. return 0, nil
  257. }
  258. func (v *Volume) read(n *Needle) (int, error) {
  259. nv, ok := v.nm.Get(n.Id)
  260. if ok && nv.Offset > 0 {
  261. return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  262. }
  263. return -1, errors.New("Not Found")
  264. }
  265. func (v *Volume) garbageLevel() float64 {
  266. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  267. }
  268. func (v *Volume) Compact() error {
  269. v.accessLock.Lock()
  270. defer v.accessLock.Unlock()
  271. filePath := v.FileName()
  272. glog.V(3).Infof("creating copies for volume %d ...", v.Id)
  273. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  274. }
  275. func (v *Volume) commitCompact() error {
  276. v.accessLock.Lock()
  277. defer v.accessLock.Unlock()
  278. _ = v.dataFile.Close()
  279. var e error
  280. if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
  281. return e
  282. }
  283. if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
  284. return e
  285. }
  286. if e = v.load(true, false); e != nil {
  287. return e
  288. }
  289. return nil
  290. }
  291. func (v *Volume) freeze() error {
  292. if v.readOnly {
  293. return nil
  294. }
  295. nm, ok := v.nm.(*NeedleMap)
  296. if !ok {
  297. return nil
  298. }
  299. v.accessLock.Lock()
  300. defer v.accessLock.Unlock()
  301. bn, _ := baseFilename(v.dataFile.Name())
  302. cdbFn := bn + ".cdb"
  303. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  304. err := DumpNeedleMapToCdb(cdbFn, nm)
  305. if err != nil {
  306. return err
  307. }
  308. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  309. return err
  310. }
  311. nm.indexFile.Close()
  312. os.Remove(nm.indexFile.Name())
  313. v.readOnly = true
  314. return nil
  315. }
  316. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  317. visitSuperBlock func(SuperBlock) error,
  318. visitNeedle func(n *Needle, offset int64) error) (err error) {
  319. var v *Volume
  320. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  321. return errors.New("Failed to load volume:" + err.Error())
  322. }
  323. if err = visitSuperBlock(v.SuperBlock); err != nil {
  324. return errors.New("Failed to read super block:" + err.Error())
  325. }
  326. version := v.Version()
  327. offset := int64(SuperBlockSize)
  328. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  329. if e != nil {
  330. err = fmt.Errorf("cannot read needle header: %s", e)
  331. return
  332. }
  333. for n != nil {
  334. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  335. err = fmt.Errorf("cannot read needle body: %s", err)
  336. return
  337. }
  338. if err = visitNeedle(n, offset); err != nil {
  339. return
  340. }
  341. offset += int64(NeedleHeaderSize) + int64(rest)
  342. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  343. if err == io.EOF {
  344. return nil
  345. }
  346. return fmt.Errorf("cannot read needle header: %s", err)
  347. }
  348. }
  349. return
  350. }
  351. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  352. var (
  353. dst, idx *os.File
  354. )
  355. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  356. return
  357. }
  358. defer dst.Close()
  359. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  360. return
  361. }
  362. defer idx.Close()
  363. nm := NewNeedleMap(idx)
  364. new_offset := int64(SuperBlockSize)
  365. err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
  366. _, err = dst.Write(superBlock.Bytes())
  367. return err
  368. }, func(n *Needle, offset int64) error {
  369. nv, ok := v.nm.Get(n.Id)
  370. glog.V(3).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
  371. if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
  372. if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  373. return fmt.Errorf("cannot put needle: %s", err)
  374. }
  375. if _, err = n.Append(dst, v.Version()); err != nil {
  376. return fmt.Errorf("cannot append needle: %s", err)
  377. }
  378. new_offset += n.DiskSize()
  379. glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size)
  380. }
  381. return nil
  382. })
  383. return
  384. }
  385. func (v *Volume) ContentSize() uint64 {
  386. return v.nm.ContentSize()
  387. }
  388. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  389. exists = true
  390. fi, err := os.Stat(filename)
  391. if os.IsNotExist(err) {
  392. exists = false
  393. return
  394. }
  395. if fi.Mode()&0400 != 0 {
  396. canRead = true
  397. }
  398. if fi.Mode()&0200 != 0 {
  399. canWrite = true
  400. }
  401. modTime = fi.ModTime()
  402. return
  403. }
  404. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  405. var indexFile *os.File
  406. var e error
  407. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  408. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  409. if cdbCanRead && cdbModTime.After(idxModeTime) {
  410. return true
  411. }
  412. if !cdbCanWrite {
  413. return false
  414. }
  415. if !idxCanRead {
  416. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  417. return false
  418. }
  419. glog.V(2).Infoln("opening file", fileName+".idx")
  420. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  421. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  422. return false
  423. }
  424. defer indexFile.Close()
  425. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  426. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  427. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  428. return false
  429. }
  430. return true
  431. }