You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
12 KiB

12 years ago
12 years ago
12 years ago
12 years ago
11 years ago
12 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. SuperBlockSize = 8
  15. )
  16. type SuperBlock struct {
  17. Version Version
  18. ReplicaType ReplicationType
  19. }
  20. func (s *SuperBlock) Bytes() []byte {
  21. header := make([]byte, SuperBlockSize)
  22. header[0] = byte(s.Version)
  23. header[1] = s.ReplicaType.Byte()
  24. return header
  25. }
  26. type Volume struct {
  27. Id VolumeId
  28. dir string
  29. Collection string
  30. dataFile *os.File
  31. nm NeedleMapper
  32. readOnly bool
  33. SuperBlock
  34. accessLock sync.Mutex
  35. }
  36. func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
  37. v = &Volume{dir: dirname, Collection: collection, Id: id}
  38. v.SuperBlock = SuperBlock{ReplicaType: replicationType}
  39. e = v.load(true)
  40. return
  41. }
  42. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  43. v = &Volume{dir: dirname, Collection: collection, Id: id}
  44. v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
  45. e = v.load(false)
  46. return
  47. }
  48. func (v *Volume) load(alsoLoadIndex bool) error {
  49. var e error
  50. var fileName string
  51. if v.Collection == "" {
  52. fileName = path.Join(v.dir, v.Id.String())
  53. } else {
  54. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  55. }
  56. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead {
  57. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  58. } else if !exists || canWrite {
  59. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  60. } else if exists && canRead {
  61. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  62. v.dataFile, e = os.Open(fileName + ".dat")
  63. v.readOnly = true
  64. } else {
  65. return fmt.Errorf("Unknown state about Volume Data file %s.dat", fileName)
  66. }
  67. if e != nil {
  68. if !os.IsPermission(e) {
  69. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  70. }
  71. }
  72. if v.ReplicaType == CopyNil {
  73. e = v.readSuperBlock()
  74. } else {
  75. e = v.maybeWriteSuperBlock()
  76. }
  77. if e == nil && alsoLoadIndex {
  78. if v.readOnly {
  79. if v.ensureConvertIdxToCdb(fileName) {
  80. v.nm, e = OpenCdbMap(fileName + ".cdb")
  81. return e
  82. }
  83. }
  84. var indexFile *os.File
  85. if v.readOnly {
  86. glog.V(1).Infoln("open to read file", fileName+".idx")
  87. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  88. return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error())
  89. }
  90. } else {
  91. glog.V(1).Infoln("open to write file", fileName+".idx")
  92. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  93. return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error())
  94. }
  95. }
  96. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  97. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  98. glog.V(0).Infoln("loading error:", e)
  99. }
  100. }
  101. return e
  102. }
  103. func (v *Volume) Version() Version {
  104. return v.SuperBlock.Version
  105. }
  106. func (v *Volume) Size() int64 {
  107. v.accessLock.Lock()
  108. defer v.accessLock.Unlock()
  109. stat, e := v.dataFile.Stat()
  110. if e == nil {
  111. return stat.Size()
  112. }
  113. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  114. return -1
  115. }
  116. func (v *Volume) Close() {
  117. v.accessLock.Lock()
  118. defer v.accessLock.Unlock()
  119. v.nm.Close()
  120. _ = v.dataFile.Close()
  121. }
  122. func (v *Volume) maybeWriteSuperBlock() error {
  123. stat, e := v.dataFile.Stat()
  124. if e != nil {
  125. glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
  126. return e
  127. }
  128. if stat.Size() == 0 {
  129. v.SuperBlock.Version = CurrentVersion
  130. _, e = v.dataFile.Write(v.SuperBlock.Bytes())
  131. if e != nil && os.IsPermission(e) {
  132. //read-only, but zero length - recreate it!
  133. if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
  134. if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
  135. v.readOnly = false
  136. }
  137. }
  138. }
  139. }
  140. return e
  141. }
  142. func (v *Volume) readSuperBlock() (err error) {
  143. if _, err = v.dataFile.Seek(0, 0); err != nil {
  144. return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
  145. }
  146. header := make([]byte, SuperBlockSize)
  147. if _, e := v.dataFile.Read(header); e != nil {
  148. return fmt.Errorf("cannot read superblock: %s", e.Error())
  149. }
  150. v.SuperBlock, err = ParseSuperBlock(header)
  151. return err
  152. }
  153. func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
  154. superBlock.Version = Version(header[0])
  155. if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
  156. err = fmt.Errorf("cannot read replica type: %s", err.Error())
  157. }
  158. return
  159. }
  160. func (v *Volume) NeedToReplicate() bool {
  161. return v.ReplicaType.GetCopyCount() > 1
  162. }
  163. func (v *Volume) isFileUnchanged(n *Needle) bool {
  164. nv, ok := v.nm.Get(n.Id)
  165. if ok && nv.Offset > 0 {
  166. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  167. return false
  168. }
  169. oldNeedle := new(Needle)
  170. oldNeedle.Read(v.dataFile, nv.Size, v.Version())
  171. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  172. n.Size = oldNeedle.Size
  173. return true
  174. }
  175. }
  176. return false
  177. }
  178. func (v *Volume) write(n *Needle) (size uint32, err error) {
  179. if v.readOnly {
  180. err = fmt.Errorf("%s is read-only", v.dataFile)
  181. return
  182. }
  183. v.accessLock.Lock()
  184. defer v.accessLock.Unlock()
  185. if v.isFileUnchanged(n) {
  186. size = n.Size
  187. glog.V(4).Infof("needle is unchanged!")
  188. return
  189. }
  190. var offset int64
  191. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  192. return
  193. }
  194. //ensure file writing starting from aligned positions
  195. if offset%NeedlePaddingSize != 0 {
  196. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  197. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  198. glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
  199. return
  200. }
  201. }
  202. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  203. if e := v.dataFile.Truncate(offset); e != nil {
  204. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error())
  205. }
  206. return
  207. }
  208. nv, ok := v.nm.Get(n.Id)
  209. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  210. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  211. glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
  212. }
  213. }
  214. return
  215. }
  216. func (v *Volume) delete(n *Needle) (uint32, error) {
  217. if v.readOnly {
  218. return 0, fmt.Errorf("%s is read-only", v.dataFile)
  219. }
  220. v.accessLock.Lock()
  221. defer v.accessLock.Unlock()
  222. nv, ok := v.nm.Get(n.Id)
  223. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  224. if ok {
  225. size := nv.Size
  226. if err := v.nm.Delete(n.Id); err != nil {
  227. return size, err
  228. }
  229. if _, err := v.dataFile.Seek(0, 2); err != nil {
  230. return size, err
  231. }
  232. n.Data = make([]byte, 0)
  233. _, err := n.Append(v.dataFile, v.Version())
  234. return size, err
  235. }
  236. return 0, nil
  237. }
  238. func (v *Volume) read(n *Needle) (int, error) {
  239. v.accessLock.Lock()
  240. defer v.accessLock.Unlock()
  241. nv, ok := v.nm.Get(n.Id)
  242. if ok && nv.Offset > 0 {
  243. if _, err := v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0); err != nil {
  244. return -1, err
  245. }
  246. return n.Read(v.dataFile, nv.Size, v.Version())
  247. }
  248. return -1, errors.New("Not Found")
  249. }
  250. func (v *Volume) garbageLevel() float64 {
  251. return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
  252. }
  253. func (v *Volume) Compact() error {
  254. v.accessLock.Lock()
  255. defer v.accessLock.Unlock()
  256. filePath := path.Join(v.dir, v.Id.String())
  257. return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
  258. }
  259. func (v *Volume) commitCompact() error {
  260. v.accessLock.Lock()
  261. defer v.accessLock.Unlock()
  262. _ = v.dataFile.Close()
  263. var e error
  264. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
  265. return e
  266. }
  267. if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
  268. return e
  269. }
  270. if e = v.load(true); e != nil {
  271. return e
  272. }
  273. return nil
  274. }
  275. func (v *Volume) freeze() error {
  276. if v.readOnly {
  277. return nil
  278. }
  279. nm, ok := v.nm.(*NeedleMap)
  280. if !ok {
  281. return nil
  282. }
  283. v.accessLock.Lock()
  284. defer v.accessLock.Unlock()
  285. bn, _ := baseFilename(v.dataFile.Name())
  286. cdbFn := bn + ".cdb"
  287. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  288. err := DumpNeedleMapToCdb(cdbFn, nm)
  289. if err != nil {
  290. return err
  291. }
  292. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  293. return err
  294. }
  295. nm.indexFile.Close()
  296. os.Remove(nm.indexFile.Name())
  297. v.readOnly = true
  298. return nil
  299. }
  300. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  301. visitSuperBlock func(SuperBlock) error,
  302. visitNeedle func(n *Needle, offset int64) error) (err error) {
  303. var v *Volume
  304. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  305. return
  306. }
  307. if err = visitSuperBlock(v.SuperBlock); err != nil {
  308. return
  309. }
  310. version := v.Version()
  311. offset := int64(SuperBlockSize)
  312. n, rest, e := ReadNeedleHeader(v.dataFile, version)
  313. if e != nil {
  314. err = fmt.Errorf("cannot read needle header: %s", e)
  315. return
  316. }
  317. for n != nil {
  318. if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
  319. err = fmt.Errorf("cannot read needle body: %s", err)
  320. return
  321. }
  322. if err = visitNeedle(n, offset); err != nil {
  323. return
  324. }
  325. offset += int64(NeedleHeaderSize + rest)
  326. if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
  327. if err == io.EOF {
  328. return nil
  329. }
  330. return fmt.Errorf("cannot read needle header: %s", err)
  331. }
  332. }
  333. return
  334. }
  335. func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
  336. var (
  337. dst, idx *os.File
  338. )
  339. if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  340. return
  341. }
  342. defer dst.Close()
  343. if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
  344. return
  345. }
  346. defer idx.Close()
  347. nm := NewNeedleMap(idx)
  348. new_offset := int64(SuperBlockSize)
  349. err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
  350. _, err = dst.Write(superBlock.Bytes())
  351. return err
  352. }, func(n *Needle, offset int64) error {
  353. nv, ok := v.nm.Get(n.Id)
  354. //glog.V(0).Infoln("file size is", n.Size, "rest", rest)
  355. if ok && int64(nv.Offset)*NeedlePaddingSize == offset {
  356. if nv.Size > 0 {
  357. if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
  358. return fmt.Errorf("cannot put needle: %s", err)
  359. }
  360. if _, err = n.Append(dst, v.Version()); err != nil {
  361. return fmt.Errorf("cannot append needle: %s", err)
  362. }
  363. new_offset += n.DiskSize()
  364. //glog.V(0).Infoln("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
  365. }
  366. }
  367. return nil
  368. })
  369. return
  370. }
  371. func (v *Volume) ContentSize() uint64 {
  372. return v.nm.ContentSize()
  373. }
  374. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  375. exists = true
  376. fi, err := os.Stat(filename)
  377. if os.IsNotExist(err) {
  378. exists = false
  379. return
  380. }
  381. if fi.Mode()&0400 != 0 {
  382. canRead = true
  383. }
  384. if fi.Mode()&0200 != 0 {
  385. canWrite = true
  386. }
  387. modTime = fi.ModTime()
  388. return
  389. }
  390. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  391. var indexFile *os.File
  392. var e error
  393. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  394. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  395. if cdbCanRead && cdbModTime.After(idxModeTime) {
  396. return true
  397. }
  398. if !cdbCanWrite {
  399. return false
  400. }
  401. if !idxCanRead {
  402. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  403. return false
  404. }
  405. glog.V(2).Infoln("opening file", fileName+".idx")
  406. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  407. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  408. return false
  409. }
  410. defer indexFile.Close()
  411. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  412. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  413. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  414. return false
  415. }
  416. return true
  417. }