You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
14 KiB

10 years ago
12 years ago
10 years ago
12 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. type Volume struct {
  15. Id VolumeId
  16. dir string
  17. Collection string
  18. dataFile *os.File
  19. nm NeedleMapper
  20. needleMapKind NeedleMapType
  21. readOnly bool
  22. SuperBlock
  23. dataFileAccessLock sync.Mutex
  24. lastModifiedTime uint64 //unix time in seconds
  25. }
  26. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  27. v = &Volume{dir: dirname, Collection: collection, Id: id}
  28. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  29. v.needleMapKind = needleMapKind
  30. e = v.load(true, true, needleMapKind)
  31. return
  32. }
  33. func (v *Volume) String() string {
  34. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  35. }
  36. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  37. v = &Volume{dir: dirname, Collection: collection, Id: id}
  38. v.SuperBlock = SuperBlock{}
  39. v.needleMapKind = needleMapKind
  40. e = v.load(false, false, needleMapKind)
  41. return
  42. }
  43. func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
  44. if indexSize, err = util.GetFileSize(indexFile); err == nil {
  45. if indexSize%NeedleIndexSize != 0 {
  46. err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
  47. }
  48. }
  49. return
  50. }
  51. func readIndexEntryAtOffset(indexFile *os.File, offset int64, v Version) (bytes []byte, err error) {
  52. if offset < 0 {
  53. err = fmt.Errorf("offset %d for index file is invalid", offset)
  54. return
  55. }
  56. bytes = make([]byte, NeedleIndexSize)
  57. _, err = indexFile.ReadAt(bytes, offset)
  58. return
  59. }
  60. func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key uint64, size uint32) error {
  61. n := new(Needle)
  62. err := n.ReadData(datFile, offset, size, v)
  63. if err != nil {
  64. return err
  65. }
  66. if n.Id != key {
  67. return fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
  68. }
  69. return nil
  70. }
  71. func volumeDataIntegrityChecking(v *Volume, indexFile *os.File) error {
  72. var indexSize int64
  73. var e error
  74. if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
  75. return fmt.Errorf("verifyIndexFileIntegrity failed: %v", e)
  76. }
  77. if indexSize != 0 {
  78. var lastIdxEntry []byte
  79. if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize, v.Version()); e != nil {
  80. return fmt.Errorf("readLastIndexEntry failed: %v", e)
  81. }
  82. key, offset, size := idxFileEntry(lastIdxEntry)
  83. if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil {
  84. return fmt.Errorf("verifyNeedleIntegrity failed: %v", e)
  85. }
  86. } else {
  87. if datSize, err := util.GetFileSize(v.dataFile); err == nil {
  88. if datSize > 0 {
  89. return fmt.Errorf("dat file size is %d, not empty while the index file is empty!", datSize)
  90. }
  91. }
  92. }
  93. return nil
  94. }
  95. func (v *Volume) FileName() (fileName string) {
  96. if v.Collection == "" {
  97. fileName = path.Join(v.dir, v.Id.String())
  98. } else {
  99. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  100. }
  101. return
  102. }
  103. func (v *Volume) DataFile() *os.File {
  104. return v.dataFile
  105. }
  106. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  107. var e error
  108. fileName := v.FileName()
  109. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  110. if !canRead {
  111. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  112. }
  113. if canWrite {
  114. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  115. v.lastModifiedTime = uint64(modifiedTime.Unix())
  116. } else {
  117. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  118. v.dataFile, e = os.Open(fileName + ".dat")
  119. v.readOnly = true
  120. }
  121. } else {
  122. if createDatIfMissing {
  123. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  124. } else {
  125. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  126. }
  127. }
  128. if e != nil {
  129. if !os.IsPermission(e) {
  130. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  131. }
  132. }
  133. if v.ReplicaPlacement == nil {
  134. e = v.readSuperBlock()
  135. } else {
  136. e = v.maybeWriteSuperBlock()
  137. }
  138. if e == nil && alsoLoadIndex {
  139. var indexFile *os.File
  140. if v.readOnly {
  141. glog.V(1).Infoln("open to read file", fileName+".idx")
  142. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  143. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  144. }
  145. } else {
  146. glog.V(1).Infoln("open to write file", fileName+".idx")
  147. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  148. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  149. }
  150. }
  151. if e = volumeDataIntegrityChecking(v, indexFile); e != nil {
  152. v.readOnly = true
  153. glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
  154. }
  155. switch needleMapKind {
  156. case NeedleMapInMemory:
  157. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  158. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  159. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  160. }
  161. case NeedleMapLevelDb:
  162. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  163. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  164. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  165. }
  166. case NeedleMapBoltDb:
  167. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  168. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  169. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  170. }
  171. }
  172. }
  173. return e
  174. }
  175. func (v *Volume) Version() Version {
  176. return v.SuperBlock.Version()
  177. }
  178. func (v *Volume) Size() int64 {
  179. stat, e := v.dataFile.Stat()
  180. if e == nil {
  181. return stat.Size()
  182. }
  183. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  184. return -1
  185. }
  186. // Close cleanly shuts down this volume
  187. func (v *Volume) Close() {
  188. v.dataFileAccessLock.Lock()
  189. defer v.dataFileAccessLock.Unlock()
  190. v.nm.Close()
  191. _ = v.dataFile.Close()
  192. }
  193. func (v *Volume) NeedToReplicate() bool {
  194. return v.ReplicaPlacement.GetCopyCount() > 1
  195. }
  196. // isFileUnchanged checks whether this needle to write is same as last one.
  197. // It requires serialized access in the same volume.
  198. func (v *Volume) isFileUnchanged(n *Needle) bool {
  199. if v.Ttl.String() != "" {
  200. return false
  201. }
  202. nv, ok := v.nm.Get(n.Id)
  203. if ok && nv.Offset > 0 {
  204. oldNeedle := new(Needle)
  205. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  206. if err != nil {
  207. glog.V(0).Infof("Failed to check updated file %v", err)
  208. return false
  209. }
  210. defer oldNeedle.ReleaseMemory()
  211. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  212. n.DataSize = oldNeedle.DataSize
  213. return true
  214. }
  215. }
  216. return false
  217. }
  218. // Destroy removes everything related to this volume
  219. func (v *Volume) Destroy() (err error) {
  220. if v.readOnly {
  221. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  222. return
  223. }
  224. v.Close()
  225. err = os.Remove(v.dataFile.Name())
  226. if err != nil {
  227. return
  228. }
  229. err = v.nm.Destroy()
  230. return
  231. }
  232. // AppendBlob append a blob to end of the data file, used in replication
  233. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  234. if v.readOnly {
  235. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  236. return
  237. }
  238. v.dataFileAccessLock.Lock()
  239. defer v.dataFileAccessLock.Unlock()
  240. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  241. glog.V(0).Infof("failed to seek the end of file: %v", err)
  242. return
  243. }
  244. //ensure file writing starting from aligned positions
  245. if offset%NeedlePaddingSize != 0 {
  246. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  247. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  248. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  249. return
  250. }
  251. }
  252. v.dataFile.Write(b)
  253. return
  254. }
  255. func (v *Volume) write(n *Needle) (size uint32, err error) {
  256. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  257. if v.readOnly {
  258. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  259. return
  260. }
  261. v.dataFileAccessLock.Lock()
  262. defer v.dataFileAccessLock.Unlock()
  263. if v.isFileUnchanged(n) {
  264. size = n.DataSize
  265. glog.V(4).Infof("needle is unchanged!")
  266. return
  267. }
  268. var offset int64
  269. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  270. glog.V(0).Infof("failed to seek the end of file: %v", err)
  271. return
  272. }
  273. //ensure file writing starting from aligned positions
  274. if offset%NeedlePaddingSize != 0 {
  275. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  276. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  277. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  278. return
  279. }
  280. }
  281. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  282. if e := v.dataFile.Truncate(offset); e != nil {
  283. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  284. }
  285. return
  286. }
  287. nv, ok := v.nm.Get(n.Id)
  288. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  289. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  290. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  291. }
  292. }
  293. if v.lastModifiedTime < n.LastModified {
  294. v.lastModifiedTime = n.LastModified
  295. }
  296. return
  297. }
  298. func (v *Volume) delete(n *Needle) (uint32, error) {
  299. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  300. if v.readOnly {
  301. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  302. }
  303. v.dataFileAccessLock.Lock()
  304. defer v.dataFileAccessLock.Unlock()
  305. nv, ok := v.nm.Get(n.Id)
  306. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  307. if ok {
  308. size := nv.Size
  309. if err := v.nm.Delete(n.Id); err != nil {
  310. return size, err
  311. }
  312. if _, err := v.dataFile.Seek(0, 2); err != nil {
  313. return size, err
  314. }
  315. n.Data = nil
  316. _, err := n.Append(v.dataFile, v.Version())
  317. return size, err
  318. }
  319. return 0, nil
  320. }
  321. // read fills in Needle content by looking up n.Id from NeedleMapper
  322. func (v *Volume) readNeedle(n *Needle) (int, error) {
  323. nv, ok := v.nm.Get(n.Id)
  324. if !ok || nv.Offset == 0 {
  325. return -1, errors.New("Not Found")
  326. }
  327. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  328. if err != nil {
  329. return 0, err
  330. }
  331. bytesRead := len(n.Data)
  332. if !n.HasTtl() {
  333. return bytesRead, nil
  334. }
  335. ttlMinutes := n.Ttl.Minutes()
  336. if ttlMinutes == 0 {
  337. return bytesRead, nil
  338. }
  339. if !n.HasLastModifiedDate() {
  340. return bytesRead, nil
  341. }
  342. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  343. return bytesRead, nil
  344. }
  345. n.ReleaseMemory()
  346. return -1, errors.New("Not Found")
  347. }
  348. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  349. needleMapKind NeedleMapType,
  350. visitSuperBlock func(SuperBlock) error,
  351. readNeedleBody bool,
  352. visitNeedle func(n *Needle, offset int64) error) (err error) {
  353. var v *Volume
  354. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  355. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  356. }
  357. if err = visitSuperBlock(v.SuperBlock); err != nil {
  358. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  359. }
  360. version := v.Version()
  361. offset := int64(SuperBlockSize)
  362. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  363. if e != nil {
  364. err = fmt.Errorf("cannot read needle header: %v", e)
  365. return
  366. }
  367. for n != nil {
  368. if readNeedleBody {
  369. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  370. glog.V(0).Infof("cannot read needle body: %v", err)
  371. //err = fmt.Errorf("cannot read needle body: %v", err)
  372. //return
  373. }
  374. if n.DataSize >= n.Size {
  375. // this should come from a bug reported on #87 and #93
  376. // fixed in v0.69
  377. // remove this whole "if" clause later, long after 0.69
  378. oldRest, oldSize := rest, n.Size
  379. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  380. n.Size = 0
  381. rest = n.Size + NeedleChecksumSize + padding
  382. if rest%NeedlePaddingSize != 0 {
  383. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  384. }
  385. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  386. }
  387. }
  388. if err = visitNeedle(n, offset); err != nil {
  389. glog.V(0).Infof("visit needle error: %v", err)
  390. }
  391. offset += int64(NeedleHeaderSize) + int64(rest)
  392. glog.V(4).Infof("==> new entry offset %d", offset)
  393. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  394. if err == io.EOF {
  395. return nil
  396. }
  397. return fmt.Errorf("cannot read needle header: %v", err)
  398. }
  399. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  400. }
  401. return
  402. }
  403. func (v *Volume) ContentSize() uint64 {
  404. return v.nm.ContentSize()
  405. }
  406. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  407. exists = true
  408. fi, err := os.Stat(filename)
  409. if os.IsNotExist(err) {
  410. exists = false
  411. return
  412. }
  413. if fi.Mode()&0400 != 0 {
  414. canRead = true
  415. }
  416. if fi.Mode()&0200 != 0 {
  417. canWrite = true
  418. }
  419. modTime = fi.ModTime()
  420. return
  421. }
  422. // volume is expired if modified time + volume ttl < now
  423. // except when volume is empty
  424. // or when the volume does not have a ttl
  425. // or when volumeSizeLimit is 0 when server just starts
  426. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  427. if volumeSizeLimit == 0 {
  428. //skip if we don't know size limit
  429. return false
  430. }
  431. if v.ContentSize() == 0 {
  432. return false
  433. }
  434. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  435. return false
  436. }
  437. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  438. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  439. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  440. if int64(v.Ttl.Minutes()) < livedMinutes {
  441. return true
  442. }
  443. return false
  444. }
  445. // wait either maxDelayMinutes or 10% of ttl minutes
  446. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  447. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  448. return false
  449. }
  450. removalDelay := v.Ttl.Minutes() / 10
  451. if removalDelay > maxDelayMinutes {
  452. removalDelay = maxDelayMinutes
  453. }
  454. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  455. return true
  456. }
  457. return false
  458. }