You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

435 lines
12 KiB

12 years ago
12 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. lastModifiedTime uint64 //unix time in seconds
  23. }
  24. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  25. v = &Volume{dir: dirname, Collection: collection, Id: id}
  26. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  27. e = v.load(true, true)
  28. return
  29. }
  30. func (v *Volume) String() string {
  31. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  32. }
  33. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  34. v = &Volume{dir: dirname, Collection: collection, Id: id}
  35. v.SuperBlock = SuperBlock{}
  36. e = v.load(false, false)
  37. return
  38. }
  39. func (v *Volume) FileName() (fileName string) {
  40. if v.Collection == "" {
  41. fileName = path.Join(v.dir, v.Id.String())
  42. } else {
  43. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  44. }
  45. return
  46. }
  47. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  48. var e error
  49. fileName := v.FileName()
  50. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  51. if !canRead {
  52. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  53. }
  54. if canWrite {
  55. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  56. v.lastModifiedTime = uint64(modifiedTime.Unix())
  57. } else {
  58. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  59. v.dataFile, e = os.Open(fileName + ".dat")
  60. v.readOnly = true
  61. }
  62. } else {
  63. if createDatIfMissing {
  64. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  65. } else {
  66. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  67. }
  68. }
  69. if e != nil {
  70. if !os.IsPermission(e) {
  71. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  72. }
  73. }
  74. if v.ReplicaPlacement == nil {
  75. e = v.readSuperBlock()
  76. } else {
  77. e = v.maybeWriteSuperBlock()
  78. }
  79. if e == nil && alsoLoadIndex {
  80. if v.readOnly {
  81. if v.ensureConvertIdxToCdb(fileName) {
  82. v.nm, e = OpenCdbMap(fileName + ".cdb")
  83. return e
  84. }
  85. }
  86. var indexFile *os.File
  87. if v.readOnly {
  88. glog.V(1).Infoln("open to read file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  90. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  91. }
  92. } else {
  93. glog.V(1).Infoln("open to write file", fileName+".idx")
  94. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  95. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  96. }
  97. }
  98. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  99. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  100. glog.V(0).Infoln("loading error:", e)
  101. }
  102. }
  103. return e
  104. }
  105. func (v *Volume) Version() Version {
  106. return v.SuperBlock.Version()
  107. }
  108. func (v *Volume) Size() int64 {
  109. stat, e := v.dataFile.Stat()
  110. if e == nil {
  111. return stat.Size()
  112. }
  113. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  114. return -1
  115. }
  116. // Close cleanly shuts down this volume
  117. func (v *Volume) Close() {
  118. v.accessLock.Lock()
  119. defer v.accessLock.Unlock()
  120. v.nm.Close()
  121. _ = v.dataFile.Close()
  122. }
  123. func (v *Volume) NeedToReplicate() bool {
  124. return v.ReplicaPlacement.GetCopyCount() > 1
  125. }
  126. // isFileUnchanged checks whether this needle to write is same as last one.
  127. // It requires serialized access in the same volume.
  128. func (v *Volume) isFileUnchanged(n *Needle) bool {
  129. nv, ok := v.nm.Get(n.Id)
  130. if ok && nv.Offset > 0 {
  131. oldNeedle := new(Needle)
  132. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  133. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  134. n.DataSize = oldNeedle.DataSize
  135. return true
  136. }
  137. }
  138. return false
  139. }
  140. // Destroy removes everything related to this volume
  141. func (v *Volume) Destroy() (err error) {
  142. if v.readOnly {
  143. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  144. return
  145. }
  146. v.Close()
  147. err = os.Remove(v.dataFile.Name())
  148. if err != nil {
  149. return
  150. }
  151. err = v.nm.Destroy()
  152. return
  153. }
  154. func (v *Volume) write(n *Needle) (size uint32, err error) {
  155. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  156. if v.readOnly {
  157. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  158. return
  159. }
  160. v.accessLock.Lock()
  161. defer v.accessLock.Unlock()
  162. if v.isFileUnchanged(n) {
  163. size = n.DataSize
  164. glog.V(4).Infof("needle is unchanged!")
  165. return
  166. }
  167. var offset int64
  168. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  169. glog.V(0).Infof("failed to seek the end of file: %v", err)
  170. return
  171. }
  172. //ensure file writing starting from aligned positions
  173. if offset%NeedlePaddingSize != 0 {
  174. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  175. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  176. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  177. return
  178. }
  179. }
  180. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  181. if e := v.dataFile.Truncate(offset); e != nil {
  182. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  183. }
  184. return
  185. }
  186. nv, ok := v.nm.Get(n.Id)
  187. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  188. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  189. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  190. }
  191. }
  192. if v.lastModifiedTime < n.LastModified {
  193. v.lastModifiedTime = n.LastModified
  194. }
  195. return
  196. }
  197. func (v *Volume) delete(n *Needle) (uint32, error) {
  198. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  199. if v.readOnly {
  200. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  201. }
  202. v.accessLock.Lock()
  203. defer v.accessLock.Unlock()
  204. nv, ok := v.nm.Get(n.Id)
  205. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  206. if ok {
  207. size := nv.Size
  208. if err := v.nm.Delete(n.Id); err != nil {
  209. return size, err
  210. }
  211. if _, err := v.dataFile.Seek(0, 2); err != nil {
  212. return size, err
  213. }
  214. n.Data = nil
  215. _, err := n.Append(v.dataFile, v.Version())
  216. return size, err
  217. }
  218. return 0, nil
  219. }
  220. func (v *Volume) read(n *Needle) (int, error) {
  221. nv, ok := v.nm.Get(n.Id)
  222. if !ok || nv.Offset == 0 {
  223. return -1, errors.New("Not Found")
  224. }
  225. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  226. if err != nil {
  227. return bytesRead, err
  228. }
  229. if !n.HasTtl() {
  230. return bytesRead, err
  231. }
  232. ttlMinutes := n.Ttl.Minutes()
  233. if ttlMinutes == 0 {
  234. return bytesRead, nil
  235. }
  236. if !n.HasLastModifiedDate() {
  237. return bytesRead, nil
  238. }
  239. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  240. return bytesRead, nil
  241. }
  242. return -1, errors.New("Not Found")
  243. }
  244. func (v *Volume) freeze() error {
  245. if v.readOnly {
  246. return nil
  247. }
  248. nm, ok := v.nm.(*NeedleMap)
  249. if !ok {
  250. return nil
  251. }
  252. v.accessLock.Lock()
  253. defer v.accessLock.Unlock()
  254. bn, _ := baseFilename(v.dataFile.Name())
  255. cdbFn := bn + ".cdb"
  256. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  257. err := DumpNeedleMapToCdb(cdbFn, nm)
  258. if err != nil {
  259. return err
  260. }
  261. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  262. return err
  263. }
  264. nm.indexFile.Close()
  265. os.Remove(nm.indexFile.Name())
  266. v.readOnly = true
  267. return nil
  268. }
  269. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  270. visitSuperBlock func(SuperBlock) error,
  271. readNeedleBody bool,
  272. visitNeedle func(n *Needle, offset int64) error) (err error) {
  273. var v *Volume
  274. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  275. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  276. }
  277. if err = visitSuperBlock(v.SuperBlock); err != nil {
  278. return fmt.Errorf("Failed to read volume %d super block: %v", id, err)
  279. }
  280. version := v.Version()
  281. offset := int64(SuperBlockSize)
  282. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  283. if e != nil {
  284. err = fmt.Errorf("cannot read needle header: %v", e)
  285. return
  286. }
  287. for n != nil {
  288. if readNeedleBody {
  289. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  290. glog.V(0).Infof("cannot read needle body: %v", err)
  291. //err = fmt.Errorf("cannot read needle body: %v", err)
  292. //return
  293. }
  294. if n.DataSize >= n.Size {
  295. // this should come from a bug reported on #87 and #93
  296. // fixed in v0.69
  297. // remove this whole "if" clause later, long after 0.69
  298. oldRest, oldSize := rest, n.Size
  299. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  300. n.Size = 0
  301. rest = n.Size + NeedleChecksumSize + padding
  302. if rest%NeedlePaddingSize != 0 {
  303. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  304. }
  305. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  306. }
  307. }
  308. if err = visitNeedle(n, offset); err != nil {
  309. glog.V(0).Infof("visit needle error: %v", err)
  310. }
  311. offset += int64(NeedleHeaderSize) + int64(rest)
  312. glog.V(4).Infof("==> new entry offset %d", offset)
  313. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  314. if err == io.EOF {
  315. return nil
  316. }
  317. return fmt.Errorf("cannot read needle header: %v", err)
  318. }
  319. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  320. }
  321. return
  322. }
  323. func (v *Volume) ContentSize() uint64 {
  324. return v.nm.ContentSize()
  325. }
  326. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  327. exists = true
  328. fi, err := os.Stat(filename)
  329. if os.IsNotExist(err) {
  330. exists = false
  331. return
  332. }
  333. if fi.Mode()&0400 != 0 {
  334. canRead = true
  335. }
  336. if fi.Mode()&0200 != 0 {
  337. canWrite = true
  338. }
  339. modTime = fi.ModTime()
  340. return
  341. }
  342. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  343. var indexFile *os.File
  344. var e error
  345. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  346. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  347. if cdbCanRead && cdbModTime.After(idxModeTime) {
  348. return true
  349. }
  350. if !cdbCanWrite {
  351. return false
  352. }
  353. if !idxCanRead {
  354. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  355. return false
  356. }
  357. glog.V(2).Infoln("opening file", fileName+".idx")
  358. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  359. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  360. return false
  361. }
  362. defer indexFile.Close()
  363. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  364. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  365. glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
  366. return false
  367. }
  368. return true
  369. }
  370. // volume is expired if modified time + volume ttl < now
  371. // except when volume is empty
  372. // or when the volume does not have a ttl
  373. // or when volumeSizeLimit is 0 when server just starts
  374. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  375. if volumeSizeLimit == 0 {
  376. //skip if we don't know size limit
  377. return false
  378. }
  379. if v.ContentSize() == 0 {
  380. return false
  381. }
  382. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  383. return false
  384. }
  385. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  386. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  387. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  388. if int64(v.Ttl.Minutes()) < livedMinutes {
  389. return true
  390. }
  391. return false
  392. }
  393. // wait either maxDelayMinutes or 10% of ttl minutes
  394. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  395. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  396. return false
  397. }
  398. removalDelay := v.Ttl.Minutes() / 10
  399. if removalDelay > maxDelayMinutes {
  400. removalDelay = maxDelayMinutes
  401. }
  402. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  403. return true
  404. }
  405. return false
  406. }