You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
4.1 KiB

9 years ago
9 years ago
9 years ago
9 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "github.com/syndtr/goleveldb/leveldb/opt"
  5. "os"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. )
  9. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  10. v = &Volume{dir: dirname, Collection: collection, Id: id}
  11. v.SuperBlock = SuperBlock{}
  12. v.needleMapKind = needleMapKind
  13. e = v.load(false, false, needleMapKind, 0)
  14. return
  15. }
  16. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error {
  17. var e error
  18. fileName := v.FileName()
  19. alreadyHasSuperBlock := false
  20. if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
  21. if !canRead {
  22. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  23. }
  24. if canWrite {
  25. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  26. v.lastModifiedTime = uint64(modifiedTime.Unix())
  27. } else {
  28. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  29. v.dataFile, e = os.Open(fileName + ".dat")
  30. v.readOnly = true
  31. }
  32. if fileSize >= _SuperBlockSize {
  33. alreadyHasSuperBlock = true
  34. }
  35. } else {
  36. if createDatIfMissing {
  37. v.dataFile, e = createVolumeFile(fileName+".dat", preallocate)
  38. } else {
  39. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  40. }
  41. }
  42. if e != nil {
  43. if !os.IsPermission(e) {
  44. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  45. } else {
  46. return fmt.Errorf("load data file %s.dat: %v", fileName, e)
  47. }
  48. }
  49. if alreadyHasSuperBlock {
  50. e = v.readSuperBlock()
  51. } else {
  52. e = v.maybeWriteSuperBlock()
  53. }
  54. if e == nil && alsoLoadIndex {
  55. var indexFile *os.File
  56. if v.readOnly {
  57. glog.V(1).Infoln("open to read file", fileName+".idx")
  58. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  59. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  60. }
  61. } else {
  62. glog.V(1).Infoln("open to write file", fileName+".idx")
  63. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  64. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  65. }
  66. }
  67. if e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
  68. v.readOnly = true
  69. glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
  70. }
  71. switch needleMapKind {
  72. case NeedleMapInMemory:
  73. glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
  74. if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
  75. glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
  76. }
  77. case NeedleMapLevelDb:
  78. glog.V(0).Infoln("loading leveldb", fileName+".ldb")
  79. opts := &opt.Options{
  80. BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
  81. WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
  82. }
  83. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  84. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  85. }
  86. case NeedleMapLevelDbMedium:
  87. glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
  88. opts := &opt.Options{
  89. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  90. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  91. }
  92. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  93. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  94. }
  95. case NeedleMapLevelDbLarge:
  96. glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
  97. opts := &opt.Options{
  98. BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
  99. WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
  100. }
  101. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  102. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  103. }
  104. }
  105. }
  106. return e
  107. }
  108. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) {
  109. exists = true
  110. fi, err := os.Stat(filename)
  111. if os.IsNotExist(err) {
  112. exists = false
  113. return
  114. }
  115. if fi.Mode()&0400 != 0 {
  116. canRead = true
  117. }
  118. if fi.Mode()&0200 != 0 {
  119. canWrite = true
  120. }
  121. modTime = fi.ModTime()
  122. fileSize = fi.Size()
  123. return
  124. }