You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

434 lines
11 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. SuperBlockSize = 8
  15. )
  16. type SuperBlock struct {
  17. Version Version
  18. ReplicaType ReplicationType
  19. }
  20. func (s *SuperBlock) Bytes() []byte {
  21. header := make([]byte, SuperBlockSize)
  22. header[0] = byte(s.Version)
  23. header[1] = s.ReplicaType.Byte()
  24. return header
  25. }
  26. type Volume struct {
  27. Id VolumeId
  28. dir string
  29. dataFile *os.File
  30. nm NeedleMapper
  31. readOnly bool
  32. SuperBlock
  33. accessLock sync.Mutex
  34. }
  35. func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Id: id}
  37. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  38. e = v.load(true)
  39. return
  40. }
  41. func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) {
  42. v = &Volume{dir: dirname, Id: id}
  43. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  44. e = v.load(false)
  45. return
  46. }
  47. func (v *Volume) load(alsoLoadIndex bool) error {
  48. var e error
  49. fileName := path.Join(v.dir, v.Id.String())
  50. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead {
  51. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  52. } else if !exists || canWrite {
  53. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  54. } else if exists && canRead {
  55. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  56. v.dataFile, e = os.Open(fileName + ".dat")
  57. v.readOnly = true
  58. } else {
  59. return fmt.Errorf("Unknown state about Volume Data file %s.dat", fileName)
  60. }
  61. if e != nil {
  62. if !os.IsPermission(e) {
  63. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  64. }
  65. }
  66. if v.ReplicaType == CopyNil {
  67. e = v.readSuperBlock()
  68. } else {
  69. e = v.maybeWriteSuperBlock()
  70. }
  71. if e == nil && alsoLoadIndex {
  72. if v.readOnly {
  73. if v.ensureConvertIdxToCdb(fileName) {
  74. v.nm, e = OpenCdbMap(fileName + ".cdb")
  75. return e
  76. }
  77. }
  78. var indexFile *os.File
  79. if v.readOnly {
  80. glog.V(1).Infoln("open to read file", fileName+".idx")
  81. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  82. return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error())
  83. }
  84. } else {
  85. glog.V(1).Infoln("open to write file", fileName+".idx")
  86. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  87. return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error())
  88. }
  89. }
  90. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  91. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  92. glog.V(0).Infoln("loading error:", e)
  93. }
  94. }
  95. return e
  96. }
  97. func (v *Volume) Version() Version {
  98. return v.SuperBlock.Version
  99. }
  100. func (v *Volume) Size() int64 {
  101. v.accessLock.Lock()
  102. defer v.accessLock.Unlock()
  103. stat, e := v.dataFile.Stat()
  104. if e == nil {
  105. return stat.Size()
  106. }
  107. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  108. return -1
  109. }
  110. func (v *Volume) Close() {
  111. v.accessLock.Lock()
  112. defer v.accessLock.Unlock()
  113. v.nm.Close()
  114. _ = v.dataFile.Close()
  115. }
  116. func (v *Volume) maybeWriteSuperBlock() error {
  117. stat, e := v.dataFile.Stat()
  118. if e != nil {
  119. glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
  120. return e
  121. }
  122. if stat.Size() == 0 {
  123. v.SuperBlock.Version = CurrentVersion
  124. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  125. if e != nil && os.IsPermission(e) {
  126. //read-only, but zero length - recreate it!
  127. if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
  128. if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
  129. v.readOnly = false
  130. }
  131. }
  132. }
  133. }
  134. return e
  135. }
  136. func (v *Volume) readSuperBlock() (err error) {
  137. if _, err = v.dataFile.Seek(0, 0); err != nil {
  138. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err.Error())
  139. }
  140. header := make([]byte, SuperBlockSize)
  141. if _, e := v.dataFile.Read(header); e != nil {
  142. return fmt.Errorf("cannot read superblock: %s", e.Error())
  143. }
  144. v.SuperBlock, err = ParseSuperBlock(header)
  145. return err
  146. }
  147. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  148. superBlock.Version = Version(header[0])
  149. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  150. err = fmt.Errorf("cannot read replica type: %s", err.Error())
  151. }
  152. return
  153. }
  154. func (v *Volume) NeedToReplicate() bool {
  155. return v.ReplicaType.GetCopyCount() > 1
  156. }
  157. func (v *Volume) isFileUnchanged(n *Needle) bool {
  158. nv, ok := v.nm.Get(n.Id)
  159. if ok && nv.Offset > 0 {
  160. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  161. return false
  162. }
  163. oldNeedle := new(Needle)
  164. oldNeedle.Read(v.dataFile, nv.Size, v.Version())
  165. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  166. n.Size = oldNeedle.Size
  167. return true
  168. }
  169. }
  170. return false
  171. }
  172. func (v *Volume) write(n *Needle) (size uint32, err error) {
  173. if v.readOnly {
  174. err = fmt.Errorf("%s is read-only", v.dataFile)
  175. return
  176. }
  177. v.accessLock.Lock()
  178. defer v.accessLock.Unlock()
  179. if v.isFileUnchanged(n) {
  180. size = n.Size
  181. return
  182. }
  183. var offset int64
  184. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  185. return
  186. }
  187. //ensure file writing starting from aligned positions
  188. if offset%NeedlePaddingSize != 0 {
  189. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  190. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  191. return
  192. }
  193. }
  194. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  195. if e := v.dataFile.Truncate(offset); e != nil {
  196. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error())
  197. }
  198. return
  199. }
  200. nv, ok := v.nm.Get(n.Id)
  201. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  202. _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
  203. }
  204. return
  205. }
  206. func (v *Volume) delete(n *Needle) (uint32, error) {
  207. if v.readOnly {
  208. return 0, fmt.Errorf("%s is read-only", v.dataFile)
  209. }
  210. v.accessLock.Lock()
  211. defer v.accessLock.Unlock()
  212. nv, ok := v.nm.Get(n.Id)
  213. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  214. if ok {
  215. size := nv.Size
  216. if err := v.nm.Delete(n.Id); err != nil {
  217. return size, err
  218. }
  219. if _, err := v.dataFile.Seek(0, 2); err != nil {
  220. return size, err
  221. }
  222. n.Data = make([]byte, 0)
  223. _, err := n.Append(v.dataFile, v.Version())
  224. return size, err
  225. }
  226. return 0, nil
  227. }
  228. func (v *Volume) read(n *Needle) (int, error) {
  229. v.accessLock.Lock()
  230. defer v.accessLock.Unlock()
  231. nv, ok := v.nm.Get(n.Id)
  232. if ok && nv.Offset > 0 {
  233. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  234. return -1, err
  235. }
  236. return n.Read(v.dataFile, nv.Size, v.Version())
  237. }
  238. return -1, errors.New("Not Found")
  239. }
  240. func (v *Volume) garbageLevel() float64 {
  241. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  242. }
  243. func (v *Volume) Compact() error {
  244. v.accessLock.Lock()
  245. defer v.accessLock.Unlock()
  246. filePath := path.Join(v.dir, v.Id.String())
  247. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  248. }
  249. func (v *Volume) commitCompact() error {
  250. v.accessLock.Lock()
  251. defer v.accessLock.Unlock()
  252. _ = v.dataFile.Close()
  253. var e error
  254. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  255. return e
  256. }
  257. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  258. return e
  259. }
  260. if e = v.load(true); e != nil {
  261. return e
  262. }
  263. return nil
  264. }
  265. func (v *Volume) freeze() error {
  266. if v.readOnly {
  267. return nil
  268. }
  269. nm, ok := v.nm.(*NeedleMap)
  270. if !ok {
  271. return nil
  272. }
  273. v.accessLock.Lock()
  274. defer v.accessLock.Unlock()
  275. bn, _ := nakeFilename(v.dataFile.Name())
  276. cdbFn := bn + ".cdb"
  277. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  278. err := DumpNeedleMapToCdb(cdbFn, nm)
  279. if err != nil {
  280. return err
  281. }
  282. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  283. return err
  284. }
  285. nm.indexFile.Close()
  286. os.Remove(nm.indexFile.Name())
  287. v.readOnly = true
  288. return nil
  289. }
  290. func ScanVolumeFile(dirname string, id VolumeId,
  291. visitSuperBlock func(SuperBlock) error,
  292. visitNeedle func(n *Needle, offset int64) error) (err error) {
  293. var v *Volume
  294. if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
  295. return
  296. }
  297. if err = visitSuperBlock(v.SuperBlock); err != nil {
  298. return
  299. }
  300. version := v.Version()
  301. offset := int64(SuperBlockSize)
  302. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  303. if e != nil {
  304. err = fmt.Errorf("cannot read needle header: %s", e)
  305. return
  306. }
  307. for n != nil {
  308. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  309. err = fmt.Errorf("cannot read needle body: %s", err)
  310. return
  311. }
  312. if err = visitNeedle(n, offset); err != nil {
  313. return
  314. }
  315. offset += int64(NeedleHeaderSize + rest)
  316. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  317. if err == io.EOF {
  318. return nil
  319. }
  320. return fmt.Errorf("cannot read needle header: %s", err)
  321. }
  322. }
  323. return
  324. }
  325. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  326. var (
  327. dst, idx *os.File
  328. )
  329. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  330. return
  331. }
  332. defer dst.Close()
  333. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  334. return
  335. }
  336. defer idx.Close()
  337. nm := NewNeedleMap(idx)
  338. new_offset := int64(SuperBlockSize)
  339. err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
  340. _, err = dst.Write(superBlock.Bytes())
  341. return err
  342. }, func(n *Needle, offset int64) error {
  343. nv, ok := v.nm.Get(n.Id)
  344. //glog.V(0).Infoln("file size is", n.Size, "rest", rest)
  345. if ok && int64(nv.Offset)*NeedlePaddingSize == offset {
  346. if nv.Size > 0 {
  347. if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  348. return fmt.Errorf("cannot put needle: %s", err)
  349. }
  350. if _, err = n.Append(dst, v.Version()); err != nil {
  351. return fmt.Errorf("cannot append needle: %s", err)
  352. }
  353. new_offset += n.DiskSize()
  354. //glog.V(0).Infoln("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  355. }
  356. }
  357. return nil
  358. })
  359. return
  360. }
  361. func (v *Volume) ContentSize() uint64 {
  362. return v.nm.ContentSize()
  363. }
  364. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  365. exists = true
  366. fi, err := os.Stat(filename)
  367. if os.IsNotExist(err) {
  368. exists = false
  369. return
  370. }
  371. if fi.Mode()&0400 != 0 {
  372. canRead = true
  373. }
  374. if fi.Mode()&0200 != 0 {
  375. canWrite = true
  376. }
  377. modTime = fi.ModTime()
  378. return
  379. }
  380. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  381. var indexFile *os.File
  382. var e error
  383. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  384. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  385. if cdbCanRead && cdbModTime.After(idxModeTime) {
  386. return true
  387. }
  388. if !cdbCanWrite {
  389. return false
  390. }
  391. if !idxCanRead {
  392. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  393. return false
  394. }
  395. glog.V(2).Infoln("opening file", fileName+".idx")
  396. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  397. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  398. return false
  399. }
  400. defer indexFile.Close()
  401. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  402. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  403. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  404. return false
  405. }
  406. return true
  407. }