You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

471 lines
12 KiB

12 years ago
12 years ago
11 years ago
12 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. SuperBlockSize = 8
  15. )
  16. type SuperBlock struct {
  17. Version Version
  18. ReplicaPlacement *ReplicaPlacement
  19. }
  20. func (s *SuperBlock) Bytes() []byte {
  21. header := make([]byte, SuperBlockSize)
  22. header[0] = byte(s.Version)
  23. header[1] = s.ReplicaPlacement.Byte()
  24. return header
  25. }
  26. type Volume struct {
  27. Id VolumeId
  28. dir string
  29. Collection string
  30. dataFile *os.File
  31. nm NeedleMapper
  32. readOnly bool
  33. SuperBlock
  34. accessLock sync.Mutex
  35. }
  36. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
  37. v = &Volume{dir: dirname, Collection: collection, Id: id}
  38. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
  39. e = v.load(true, true)
  40. return
  41. }
  42. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  43. v = &Volume{dir: dirname, Collection: collection, Id: id}
  44. v.SuperBlock = SuperBlock{}
  45. e = v.load(false, false)
  46. return
  47. }
  48. func (v *Volume) FileName() (fileName string) {
  49. if v.Collection == "" {
  50. fileName = path.Join(v.dir, v.Id.String())
  51. } else {
  52. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  53. }
  54. return
  55. }
  56. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  57. var e error
  58. fileName := v.FileName()
  59. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
  60. if !canRead {
  61. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  62. }
  63. if canWrite {
  64. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  65. } else {
  66. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  67. v.dataFile, e = os.Open(fileName + ".dat")
  68. v.readOnly = true
  69. }
  70. } else {
  71. if createDatIfMissing {
  72. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  73. } else {
  74. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  75. }
  76. }
  77. if e != nil {
  78. if !os.IsPermission(e) {
  79. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  80. }
  81. }
  82. if v.ReplicaPlacement == nil {
  83. e = v.readSuperBlock()
  84. } else {
  85. e = v.maybeWriteSuperBlock()
  86. }
  87. if e == nil && alsoLoadIndex {
  88. if v.readOnly {
  89. if v.ensureConvertIdxToCdb(fileName) {
  90. v.nm, e = OpenCdbMap(fileName + ".cdb")
  91. return e
  92. }
  93. }
  94. var indexFile *os.File
  95. if v.readOnly {
  96. glog.V(1).Infoln("open to read file", fileName+".idx")
  97. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  98. return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error())
  99. }
  100. } else {
  101. glog.V(1).Infoln("open to write file", fileName+".idx")
  102. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  103. return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error())
  104. }
  105. }
  106. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  107. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  108. glog.V(0).Infoln("loading error:", e)
  109. }
  110. }
  111. return e
  112. }
  113. func (v *Volume) Version() Version {
  114. return v.SuperBlock.Version
  115. }
  116. func (v *Volume) Size() int64 {
  117. v.accessLock.Lock()
  118. defer v.accessLock.Unlock()
  119. stat, e := v.dataFile.Stat()
  120. if e == nil {
  121. return stat.Size()
  122. }
  123. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  124. return -1
  125. }
  126. func (v *Volume) Close() {
  127. v.accessLock.Lock()
  128. defer v.accessLock.Unlock()
  129. v.nm.Close()
  130. _ = v.dataFile.Close()
  131. }
  132. func (v *Volume) maybeWriteSuperBlock() error {
  133. stat, e := v.dataFile.Stat()
  134. if e != nil {
  135. glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
  136. return e
  137. }
  138. if stat.Size() == 0 {
  139. v.SuperBlock.Version = CurrentVersion
  140. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  141. if e != nil && os.IsPermission(e) {
  142. //read-only, but zero length - recreate it!
  143. if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
  144. if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
  145. v.readOnly = false
  146. }
  147. }
  148. }
  149. }
  150. return e
  151. }
  152. func (v *Volume) readSuperBlock() (err error) {
  153. if _, err = v.dataFile.Seek(0, 0); err != nil {
  154. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
  155. }
  156. header := make([]byte, SuperBlockSize)
  157. if _, e := v.dataFile.Read(header); e != nil {
  158. return fmt.Errorf("cannot read superblock: %s", e.Error())
  159. }
  160. v.SuperBlock, err = ParseSuperBlock(header)
  161. return err
  162. }
  163. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  164. superBlock.Version = Version(header[0])
  165. if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
  166. err = fmt.Errorf("cannot read replica type: %s", err.Error())
  167. }
  168. return
  169. }
  170. func (v *Volume) NeedToReplicate() bool {
  171. return v.ReplicaPlacement.GetCopyCount() > 1
  172. }
  173. func (v *Volume) isFileUnchanged(n *Needle) bool {
  174. nv, ok := v.nm.Get(n.Id)
  175. if ok && nv.Offset > 0 {
  176. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  177. return false
  178. }
  179. oldNeedle := new(Needle)
  180. oldNeedle.Read(v.dataFile, nv.Size, v.Version())
  181. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  182. n.Size = oldNeedle.Size
  183. return true
  184. }
  185. }
  186. return false
  187. }
  188. func (v *Volume) Destroy() (err error) {
  189. if v.readOnly {
  190. err = fmt.Errorf("%s is read-only", v.dataFile)
  191. return
  192. }
  193. v.Close()
  194. err = os.Remove(v.dataFile.Name())
  195. if err != nil {
  196. return
  197. }
  198. err = v.nm.Destroy()
  199. return
  200. }
  201. func (v *Volume) write(n *Needle) (size uint32, err error) {
  202. if v.readOnly {
  203. err = fmt.Errorf("%s is read-only", v.dataFile)
  204. return
  205. }
  206. v.accessLock.Lock()
  207. defer v.accessLock.Unlock()
  208. if v.isFileUnchanged(n) {
  209. size = n.Size
  210. glog.V(4).Infof("needle is unchanged!")
  211. return
  212. }
  213. var offset int64
  214. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  215. return
  216. }
  217. //ensure file writing starting from aligned positions
  218. if offset%NeedlePaddingSize != 0 {
  219. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  220. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  221. glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
  222. return
  223. }
  224. }
  225. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  226. if e := v.dataFile.Truncate(offset); e != nil {
  227. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error())
  228. }
  229. return
  230. }
  231. nv, ok := v.nm.Get(n.Id)
  232. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  233. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  234. glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
  235. }
  236. }
  237. return
  238. }
  239. func (v *Volume) delete(n *Needle) (uint32, error) {
  240. if v.readOnly {
  241. return 0, fmt.Errorf("%s is read-only", v.dataFile)
  242. }
  243. v.accessLock.Lock()
  244. defer v.accessLock.Unlock()
  245. nv, ok := v.nm.Get(n.Id)
  246. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  247. if ok {
  248. size := nv.Size
  249. if err := v.nm.Delete(n.Id); err != nil {
  250. return size, err
  251. }
  252. if _, err := v.dataFile.Seek(0, 2); err != nil {
  253. return size, err
  254. }
  255. n.Data = make([]byte, 0)
  256. _, err := n.Append(v.dataFile, v.Version())
  257. return size, err
  258. }
  259. return 0, nil
  260. }
  261. func (v *Volume) read(n *Needle) (int, error) {
  262. v.accessLock.Lock()
  263. defer v.accessLock.Unlock()
  264. nv, ok := v.nm.Get(n.Id)
  265. if ok && nv.Offset > 0 {
  266. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  267. return -1, err
  268. }
  269. return n.Read(v.dataFile, nv.Size, v.Version())
  270. }
  271. return -1, errors.New("Not Found")
  272. }
  273. func (v *Volume) garbageLevel() float64 {
  274. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  275. }
  276. func (v *Volume) Compact() error {
  277. v.accessLock.Lock()
  278. defer v.accessLock.Unlock()
  279. filePath := path.Join(v.dir, v.Id.String())
  280. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  281. }
  282. func (v *Volume) commitCompact() error {
  283. v.accessLock.Lock()
  284. defer v.accessLock.Unlock()
  285. _ = v.dataFile.Close()
  286. var e error
  287. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  288. return e
  289. }
  290. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  291. return e
  292. }
  293. if e = v.load(true, false); e != nil {
  294. return e
  295. }
  296. return nil
  297. }
  298. func (v *Volume) freeze() error {
  299. if v.readOnly {
  300. return nil
  301. }
  302. nm, ok := v.nm.(*NeedleMap)
  303. if !ok {
  304. return nil
  305. }
  306. v.accessLock.Lock()
  307. defer v.accessLock.Unlock()
  308. bn, _ := baseFilename(v.dataFile.Name())
  309. cdbFn := bn + ".cdb"
  310. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  311. err := DumpNeedleMapToCdb(cdbFn, nm)
  312. if err != nil {
  313. return err
  314. }
  315. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  316. return err
  317. }
  318. nm.indexFile.Close()
  319. os.Remove(nm.indexFile.Name())
  320. v.readOnly = true
  321. return nil
  322. }
  323. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  324. visitSuperBlock func(SuperBlock) error,
  325. visitNeedle func(n *Needle, offset int64) error) (err error) {
  326. var v *Volume
  327. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  328. return
  329. }
  330. if err = visitSuperBlock(v.SuperBlock); err != nil {
  331. return
  332. }
  333. version := v.Version()
  334. offset := int64(SuperBlockSize)
  335. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  336. if e != nil {
  337. err = fmt.Errorf("cannot read needle header: %s", e)
  338. return
  339. }
  340. for n != nil {
  341. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  342. err = fmt.Errorf("cannot read needle body: %s", err)
  343. return
  344. }
  345. if err = visitNeedle(n, offset); err != nil {
  346. return
  347. }
  348. offset += int64(NeedleHeaderSize + rest)
  349. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  350. if err == io.EOF {
  351. return nil
  352. }
  353. return fmt.Errorf("cannot read needle header: %s", err)
  354. }
  355. }
  356. return
  357. }
  358. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  359. var (
  360. dst, idx *os.File
  361. )
  362. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  363. return
  364. }
  365. defer dst.Close()
  366. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
  367. return
  368. }
  369. defer idx.Close()
  370. nm := NewNeedleMap(idx)
  371. new_offset := int64(SuperBlockSize)
  372. err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
  373. _, err = dst.Write(superBlock.Bytes())
  374. return err
  375. }, func(n *Needle, offset int64) error {
  376. nv, ok := v.nm.Get(n.Id)
  377. //glog.V(0).Infoln("file size is", n.Size, "rest", rest)
  378. if ok && int64(nv.Offset)*NeedlePaddingSize == offset {
  379. if nv.Size > 0 {
  380. if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  381. return fmt.Errorf("cannot put needle: %s", err)
  382. }
  383. if _, err = n.Append(dst, v.Version()); err != nil {
  384. return fmt.Errorf("cannot append needle: %s", err)
  385. }
  386. new_offset += n.DiskSize()
  387. //glog.V(0).Infoln("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  388. }
  389. }
  390. return nil
  391. })
  392. return
  393. }
  394. func (v *Volume) ContentSize() uint64 {
  395. return v.nm.ContentSize()
  396. }
  397. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  398. exists = true
  399. fi, err := os.Stat(filename)
  400. if os.IsNotExist(err) {
  401. exists = false
  402. return
  403. }
  404. if fi.Mode()&0400 != 0 {
  405. canRead = true
  406. }
  407. if fi.Mode()&0200 != 0 {
  408. canWrite = true
  409. }
  410. modTime = fi.ModTime()
  411. return
  412. }
  413. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  414. var indexFile *os.File
  415. var e error
  416. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  417. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  418. if cdbCanRead && cdbModTime.After(idxModeTime) {
  419. return true
  420. }
  421. if !cdbCanWrite {
  422. return false
  423. }
  424. if !idxCanRead {
  425. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  426. return false
  427. }
  428. glog.V(2).Infoln("opening file", fileName+".idx")
  429. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  430. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  431. return false
  432. }
  433. defer indexFile.Close()
  434. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  435. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  436. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  437. return false
  438. }
  439. return true
  440. }