You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

345 lines
8.9 KiB

12 years ago
12 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. }
  23. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
  24. v = &Volume{dir: dirname, Collection: collection, Id: id}
  25. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
  26. e = v.load(true, true)
  27. return
  28. }
  29. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  30. v = &Volume{dir: dirname, Collection: collection, Id: id}
  31. v.SuperBlock = SuperBlock{}
  32. e = v.load(false, false)
  33. return
  34. }
  35. func (v *Volume) FileName() (fileName string) {
  36. if v.Collection == "" {
  37. fileName = path.Join(v.dir, v.Id.String())
  38. } else {
  39. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  40. }
  41. return
  42. }
  43. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  44. var e error
  45. fileName := v.FileName()
  46. if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
  47. if !canRead {
  48. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  49. }
  50. if canWrite {
  51. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  52. } else {
  53. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  54. v.dataFile, e = os.Open(fileName + ".dat")
  55. v.readOnly = true
  56. }
  57. } else {
  58. if createDatIfMissing {
  59. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  60. } else {
  61. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  62. }
  63. }
  64. if e != nil {
  65. if !os.IsPermission(e) {
  66. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  67. }
  68. }
  69. if v.ReplicaPlacement == nil {
  70. e = v.readSuperBlock()
  71. } else {
  72. e = v.maybeWriteSuperBlock()
  73. }
  74. if e == nil && alsoLoadIndex {
  75. if v.readOnly {
  76. if v.ensureConvertIdxToCdb(fileName) {
  77. v.nm, e = OpenCdbMap(fileName + ".cdb")
  78. return e
  79. }
  80. }
  81. var indexFile *os.File
  82. if v.readOnly {
  83. glog.V(1).Infoln("open to read file", fileName+".idx")
  84. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  85. return fmt.Errorf("cannot read Volume Index %s.idx: %s", fileName, e.Error())
  86. }
  87. } else {
  88. glog.V(1).Infoln("open to write file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  90. return fmt.Errorf("cannot write Volume Index %s.idx: %s", fileName, e.Error())
  91. }
  92. }
  93. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  94. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  95. glog.V(0).Infoln("loading error:", e)
  96. }
  97. }
  98. return e
  99. }
  100. func (v *Volume) Version() Version {
  101. return v.SuperBlock.Version()
  102. }
  103. func (v *Volume) Size() int64 {
  104. stat, e := v.dataFile.Stat()
  105. if e == nil {
  106. return stat.Size()
  107. }
  108. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  109. return -1
  110. }
  111. func (v *Volume) Close() {
  112. v.accessLock.Lock()
  113. defer v.accessLock.Unlock()
  114. v.nm.Close()
  115. _ = v.dataFile.Close()
  116. }
  117. func (v *Volume) NeedToReplicate() bool {
  118. return v.ReplicaPlacement.GetCopyCount() > 1
  119. }
  120. func (v *Volume) isFileUnchanged(n *Needle) bool {
  121. nv, ok := v.nm.Get(n.Id)
  122. if ok && nv.Offset > 0 {
  123. oldNeedle := new(Needle)
  124. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  125. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  126. n.Size = oldNeedle.Size
  127. return true
  128. }
  129. }
  130. return false
  131. }
  132. func (v *Volume) Destroy() (err error) {
  133. if v.readOnly {
  134. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  135. return
  136. }
  137. v.Close()
  138. err = os.Remove(v.dataFile.Name())
  139. if err != nil {
  140. return
  141. }
  142. err = v.nm.Destroy()
  143. return
  144. }
  145. func (v *Volume) write(n *Needle) (size uint32, err error) {
  146. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  147. if v.readOnly {
  148. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  149. return
  150. }
  151. v.accessLock.Lock()
  152. defer v.accessLock.Unlock()
  153. if v.isFileUnchanged(n) {
  154. size = n.Size
  155. glog.V(4).Infof("needle is unchanged!")
  156. return
  157. }
  158. var offset int64
  159. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  160. return
  161. }
  162. //ensure file writing starting from aligned positions
  163. if offset%NeedlePaddingSize != 0 {
  164. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  165. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  166. glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
  167. return
  168. }
  169. }
  170. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  171. if e := v.dataFile.Truncate(offset); e != nil {
  172. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile.Name(), e.Error())
  173. }
  174. return
  175. }
  176. nv, ok := v.nm.Get(n.Id)
  177. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  178. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  179. glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
  180. }
  181. }
  182. return
  183. }
  184. func (v *Volume) delete(n *Needle) (uint32, error) {
  185. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  186. if v.readOnly {
  187. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  188. }
  189. v.accessLock.Lock()
  190. defer v.accessLock.Unlock()
  191. nv, ok := v.nm.Get(n.Id)
  192. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  193. if ok {
  194. size := nv.Size
  195. if err := v.nm.Delete(n.Id); err != nil {
  196. return size, err
  197. }
  198. if _, err := v.dataFile.Seek(0, 2); err != nil {
  199. return size, err
  200. }
  201. n.Data = make([]byte, 0)
  202. _, err := n.Append(v.dataFile, v.Version())
  203. return size, err
  204. }
  205. return 0, nil
  206. }
  207. func (v *Volume) read(n *Needle) (int, error) {
  208. nv, ok := v.nm.Get(n.Id)
  209. if ok && nv.Offset > 0 {
  210. return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  211. }
  212. return -1, errors.New("Not Found")
  213. }
  214. func (v *Volume) freeze() error {
  215. if v.readOnly {
  216. return nil
  217. }
  218. nm, ok := v.nm.(*NeedleMap)
  219. if !ok {
  220. return nil
  221. }
  222. v.accessLock.Lock()
  223. defer v.accessLock.Unlock()
  224. bn, _ := baseFilename(v.dataFile.Name())
  225. cdbFn := bn + ".cdb"
  226. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  227. err := DumpNeedleMapToCdb(cdbFn, nm)
  228. if err != nil {
  229. return err
  230. }
  231. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  232. return err
  233. }
  234. nm.indexFile.Close()
  235. os.Remove(nm.indexFile.Name())
  236. v.readOnly = true
  237. return nil
  238. }
  239. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  240. visitSuperBlock func(SuperBlock) error,
  241. readNeedleBody bool,
  242. visitNeedle func(n *Needle, offset int64) error) (err error) {
  243. var v *Volume
  244. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  245. return errors.New("Failed to load volume:" + err.Error())
  246. }
  247. if err = visitSuperBlock(v.SuperBlock); err != nil {
  248. return errors.New("Failed to read super block:" + err.Error())
  249. }
  250. version := v.Version()
  251. offset := int64(SuperBlockSize)
  252. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  253. if e != nil {
  254. err = fmt.Errorf("cannot read needle header: %s", e)
  255. return
  256. }
  257. for n != nil {
  258. if readNeedleBody {
  259. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  260. err = fmt.Errorf("cannot read needle body: %s", err)
  261. return
  262. }
  263. }
  264. if err = visitNeedle(n, offset); err != nil {
  265. return
  266. }
  267. offset += int64(NeedleHeaderSize) + int64(rest)
  268. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  269. if err == io.EOF {
  270. return nil
  271. }
  272. return fmt.Errorf("cannot read needle header: %s", err)
  273. }
  274. }
  275. return
  276. }
  277. func (v *Volume) ContentSize() uint64 {
  278. return v.nm.ContentSize()
  279. }
  280. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  281. exists = true
  282. fi, err := os.Stat(filename)
  283. if os.IsNotExist(err) {
  284. exists = false
  285. return
  286. }
  287. if fi.Mode()&0400 != 0 {
  288. canRead = true
  289. }
  290. if fi.Mode()&0200 != 0 {
  291. canWrite = true
  292. }
  293. modTime = fi.ModTime()
  294. return
  295. }
  296. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  297. var indexFile *os.File
  298. var e error
  299. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  300. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  301. if cdbCanRead && cdbModTime.After(idxModeTime) {
  302. return true
  303. }
  304. if !cdbCanWrite {
  305. return false
  306. }
  307. if !idxCanRead {
  308. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  309. return false
  310. }
  311. glog.V(2).Infoln("opening file", fileName+".idx")
  312. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  313. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  314. return false
  315. }
  316. defer indexFile.Close()
  317. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  318. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  319. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  320. return false
  321. }
  322. return true
  323. }