You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

495 lines
14 KiB

10 years ago
12 years ago
10 years ago
13 years ago
10 years ago
11 years ago
10 years ago
12 years ago
10 years ago
12 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. type Volume struct {
  15. Id VolumeId
  16. dir string
  17. Collection string
  18. dataFile *os.File
  19. nm NeedleMapper
  20. needleMapKind NeedleMapType
  21. readOnly bool
  22. SuperBlock
  23. dataFileAccessLock sync.Mutex
  24. lastModifiedTime uint64 //unix time in seconds
  25. }
  26. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  27. v = &Volume{dir: dirname, Collection: collection, Id: id}
  28. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  29. v.needleMapKind = needleMapKind
  30. e = v.load(true, true, needleMapKind)
  31. return
  32. }
  33. func (v *Volume) String() string {
  34. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  35. }
  36. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  37. v = &Volume{dir: dirname, Collection: collection, Id: id}
  38. v.SuperBlock = SuperBlock{}
  39. v.needleMapKind = needleMapKind
  40. e = v.load(false, false, needleMapKind)
  41. return
  42. }
  43. func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
  44. if indexSize, err = util.GetFileSize(indexFile); err == nil {
  45. if indexSize%NeedleIndexSize != 0 {
  46. err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
  47. }
  48. }
  49. return
  50. }
  51. func readIndexEntryAtOffset(indexFile *os.File, offset int64, v Version) (bytes []byte, err error) {
  52. if offset < 0 {
  53. err = fmt.Errorf("offset %d for index file is invalid", offset)
  54. return
  55. }
  56. bytes = make([]byte, NeedleIndexSize)
  57. _, err = indexFile.ReadAt(bytes, offset)
  58. return
  59. }
  60. func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key uint64) error {
  61. if n, bodyLength, err := ReadNeedleHeader(datFile, v, offset); err != nil {
  62. return fmt.Errorf("can not read needle header: %v", err)
  63. } else {
  64. if n.Id != key {
  65. return fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
  66. } else {
  67. if bytes, err := n.ReadNeedleBody(datFile, v, offset+int64(NeedleHeaderSize), bodyLength); err != nil {
  68. return fmt.Errorf("dat file's body reading failed: %v", err)
  69. } else {
  70. checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize])
  71. if n.Checksum.Value() != checksum {
  72. return fmt.Errorf("CRC check failed")
  73. }
  74. }
  75. }
  76. }
  77. return nil
  78. }
  79. func volumeDataIntegrityChecking(v *Volume, indexFile *os.File) error {
  80. var indexSize int64
  81. var e error
  82. if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
  83. return fmt.Errorf("verifyIndexFileIntegrity failed: %v", e)
  84. }
  85. if indexSize != 0 {
  86. var lastIdxEntry []byte
  87. if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize, v.Version()); e != nil {
  88. return fmt.Errorf("readLastIndexEntry failed: %v", e)
  89. }
  90. key, offset, _ := idxFileEntry(lastIdxEntry)
  91. if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key); e != nil {
  92. return fmt.Errorf("verifyNeedleIntegrity failed: %v", e)
  93. }
  94. } else {
  95. if datSize, err := util.GetFileSize(v.dataFile); err == nil {
  96. if datSize > 0 {
  97. return fmt.Errorf("dat file size is %d, not empty while the index file is empty!", datSize)
  98. }
  99. }
  100. }
  101. return nil
  102. }
  103. func (v *Volume) FileName() (fileName string) {
  104. if v.Collection == "" {
  105. fileName = path.Join(v.dir, v.Id.String())
  106. } else {
  107. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  108. }
  109. return
  110. }
  111. func (v *Volume) DataFile() *os.File {
  112. return v.dataFile
  113. }
  114. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  115. var e error
  116. fileName := v.FileName()
  117. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  118. if !canRead {
  119. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  120. }
  121. if canWrite {
  122. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  123. v.lastModifiedTime = uint64(modifiedTime.Unix())
  124. } else {
  125. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  126. v.dataFile, e = os.Open(fileName + ".dat")
  127. v.readOnly = true
  128. }
  129. } else {
  130. if createDatIfMissing {
  131. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  132. } else {
  133. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  134. }
  135. }
  136. if e != nil {
  137. if !os.IsPermission(e) {
  138. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  139. }
  140. }
  141. if v.ReplicaPlacement == nil {
  142. e = v.readSuperBlock()
  143. } else {
  144. e = v.maybeWriteSuperBlock()
  145. }
  146. if e == nil && alsoLoadIndex {
  147. var indexFile *os.File
  148. if v.readOnly {
  149. glog.V(1).Infoln("open to read file", fileName+".idx")
  150. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  151. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  152. }
  153. } else {
  154. glog.V(1).Infoln("open to write file", fileName+".idx")
  155. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  156. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  157. }
  158. }
  159. if e = volumeDataIntegrityChecking(v, indexFile); e != nil {
  160. v.readOnly = true
  161. glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
  162. }
  163. switch needleMapKind {
  164. case NeedleMapInMemory:
  165. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  166. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  167. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  168. }
  169. case NeedleMapLevelDb:
  170. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  171. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  172. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  173. }
  174. case NeedleMapBoltDb:
  175. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  176. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  177. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  178. }
  179. }
  180. }
  181. return e
  182. }
  183. func (v *Volume) Version() Version {
  184. return v.SuperBlock.Version()
  185. }
  186. func (v *Volume) Size() int64 {
  187. stat, e := v.dataFile.Stat()
  188. if e == nil {
  189. return stat.Size()
  190. }
  191. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  192. return -1
  193. }
  194. // Close cleanly shuts down this volume
  195. func (v *Volume) Close() {
  196. v.dataFileAccessLock.Lock()
  197. defer v.dataFileAccessLock.Unlock()
  198. v.nm.Close()
  199. _ = v.dataFile.Close()
  200. }
  201. func (v *Volume) NeedToReplicate() bool {
  202. return v.ReplicaPlacement.GetCopyCount() > 1
  203. }
  204. // isFileUnchanged checks whether this needle to write is same as last one.
  205. // It requires serialized access in the same volume.
  206. func (v *Volume) isFileUnchanged(n *Needle) bool {
  207. if v.Ttl.String() != "" {
  208. return false
  209. }
  210. nv, ok := v.nm.Get(n.Id)
  211. if ok && nv.Offset > 0 {
  212. oldNeedle := new(Needle)
  213. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  214. if err != nil {
  215. glog.V(0).Infof("Failed to check updated file %v", err)
  216. return false
  217. }
  218. defer oldNeedle.ReleaseMemory()
  219. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  220. n.DataSize = oldNeedle.DataSize
  221. return true
  222. }
  223. }
  224. return false
  225. }
  226. // Destroy removes everything related to this volume
  227. func (v *Volume) Destroy() (err error) {
  228. if v.readOnly {
  229. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  230. return
  231. }
  232. v.Close()
  233. err = os.Remove(v.dataFile.Name())
  234. if err != nil {
  235. return
  236. }
  237. err = v.nm.Destroy()
  238. return
  239. }
  240. // AppendBlob append a blob to end of the data file, used in replication
  241. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  242. if v.readOnly {
  243. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  244. return
  245. }
  246. v.dataFileAccessLock.Lock()
  247. defer v.dataFileAccessLock.Unlock()
  248. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  249. glog.V(0).Infof("failed to seek the end of file: %v", err)
  250. return
  251. }
  252. //ensure file writing starting from aligned positions
  253. if offset%NeedlePaddingSize != 0 {
  254. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  255. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  256. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  257. return
  258. }
  259. }
  260. v.dataFile.Write(b)
  261. return
  262. }
  263. func (v *Volume) write(n *Needle) (size uint32, err error) {
  264. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  265. if v.readOnly {
  266. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  267. return
  268. }
  269. v.dataFileAccessLock.Lock()
  270. defer v.dataFileAccessLock.Unlock()
  271. if v.isFileUnchanged(n) {
  272. size = n.DataSize
  273. glog.V(4).Infof("needle is unchanged!")
  274. return
  275. }
  276. var offset int64
  277. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  278. glog.V(0).Infof("failed to seek the end of file: %v", err)
  279. return
  280. }
  281. //ensure file writing starting from aligned positions
  282. if offset%NeedlePaddingSize != 0 {
  283. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  284. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  285. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  286. return
  287. }
  288. }
  289. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  290. if e := v.dataFile.Truncate(offset); e != nil {
  291. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  292. }
  293. return
  294. }
  295. nv, ok := v.nm.Get(n.Id)
  296. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  297. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  298. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  299. }
  300. }
  301. if v.lastModifiedTime < n.LastModified {
  302. v.lastModifiedTime = n.LastModified
  303. }
  304. return
  305. }
  306. func (v *Volume) delete(n *Needle) (uint32, error) {
  307. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  308. if v.readOnly {
  309. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  310. }
  311. v.dataFileAccessLock.Lock()
  312. defer v.dataFileAccessLock.Unlock()
  313. nv, ok := v.nm.Get(n.Id)
  314. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  315. if ok {
  316. size := nv.Size
  317. if err := v.nm.Delete(n.Id); err != nil {
  318. return size, err
  319. }
  320. if _, err := v.dataFile.Seek(0, 2); err != nil {
  321. return size, err
  322. }
  323. n.Data = nil
  324. _, err := n.Append(v.dataFile, v.Version())
  325. return size, err
  326. }
  327. return 0, nil
  328. }
  329. // read fills in Needle content by looking up n.Id from NeedleMapper
  330. func (v *Volume) readNeedle(n *Needle) (int, error) {
  331. nv, ok := v.nm.Get(n.Id)
  332. if !ok || nv.Offset == 0 {
  333. return -1, errors.New("Not Found")
  334. }
  335. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  336. if err != nil {
  337. return 0, err
  338. }
  339. bytesRead := len(n.Data)
  340. if !n.HasTtl() {
  341. return bytesRead, nil
  342. }
  343. ttlMinutes := n.Ttl.Minutes()
  344. if ttlMinutes == 0 {
  345. return bytesRead, nil
  346. }
  347. if !n.HasLastModifiedDate() {
  348. return bytesRead, nil
  349. }
  350. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  351. return bytesRead, nil
  352. }
  353. n.ReleaseMemory()
  354. return -1, errors.New("Not Found")
  355. }
  356. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  357. needleMapKind NeedleMapType,
  358. visitSuperBlock func(SuperBlock) error,
  359. readNeedleBody bool,
  360. visitNeedle func(n *Needle, offset int64) error) (err error) {
  361. var v *Volume
  362. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  363. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  364. }
  365. if err = visitSuperBlock(v.SuperBlock); err != nil {
  366. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  367. }
  368. version := v.Version()
  369. offset := int64(SuperBlockSize)
  370. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  371. if e != nil {
  372. err = fmt.Errorf("cannot read needle header: %v", e)
  373. return
  374. }
  375. for n != nil {
  376. if readNeedleBody {
  377. if _, err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  378. glog.V(0).Infof("cannot read needle body: %v", err)
  379. //err = fmt.Errorf("cannot read needle body: %v", err)
  380. //return
  381. }
  382. if n.DataSize >= n.Size {
  383. // this should come from a bug reported on #87 and #93
  384. // fixed in v0.69
  385. // remove this whole "if" clause later, long after 0.69
  386. oldRest, oldSize := rest, n.Size
  387. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  388. n.Size = 0
  389. rest = n.Size + NeedleChecksumSize + padding
  390. if rest%NeedlePaddingSize != 0 {
  391. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  392. }
  393. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  394. }
  395. }
  396. if err = visitNeedle(n, offset); err != nil {
  397. glog.V(0).Infof("visit needle error: %v", err)
  398. }
  399. offset += int64(NeedleHeaderSize) + int64(rest)
  400. glog.V(4).Infof("==> new entry offset %d", offset)
  401. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  402. if err == io.EOF {
  403. return nil
  404. }
  405. return fmt.Errorf("cannot read needle header: %v", err)
  406. }
  407. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  408. }
  409. return
  410. }
  411. func (v *Volume) ContentSize() uint64 {
  412. return v.nm.ContentSize()
  413. }
  414. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  415. exists = true
  416. fi, err := os.Stat(filename)
  417. if os.IsNotExist(err) {
  418. exists = false
  419. return
  420. }
  421. if fi.Mode()&0400 != 0 {
  422. canRead = true
  423. }
  424. if fi.Mode()&0200 != 0 {
  425. canWrite = true
  426. }
  427. modTime = fi.ModTime()
  428. return
  429. }
  430. // volume is expired if modified time + volume ttl < now
  431. // except when volume is empty
  432. // or when the volume does not have a ttl
  433. // or when volumeSizeLimit is 0 when server just starts
  434. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  435. if volumeSizeLimit == 0 {
  436. //skip if we don't know size limit
  437. return false
  438. }
  439. if v.ContentSize() == 0 {
  440. return false
  441. }
  442. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  443. return false
  444. }
  445. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  446. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  447. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  448. if int64(v.Ttl.Minutes()) < livedMinutes {
  449. return true
  450. }
  451. return false
  452. }
  453. // wait either maxDelayMinutes or 10% of ttl minutes
  454. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  455. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  456. return false
  457. }
  458. removalDelay := v.Ttl.Minutes() / 10
  459. if removalDelay > maxDelayMinutes {
  460. removalDelay = maxDelayMinutes
  461. }
  462. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  463. return true
  464. }
  465. return false
  466. }