You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

428 lines
12 KiB

10 years ago
12 years ago
10 years ago
12 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. dataFileAccessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Collection: collection, Id: id}
  37. v.SuperBlock = SuperBlock{}
  38. v.needleMapKind = needleMapKind
  39. e = v.load(false, false, needleMapKind)
  40. return
  41. }
  42. func (v *Volume) FileName() (fileName string) {
  43. if v.Collection == "" {
  44. fileName = path.Join(v.dir, v.Id.String())
  45. } else {
  46. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  47. }
  48. return
  49. }
  50. func (v *Volume) DataFile() *os.File {
  51. return v.dataFile
  52. }
  53. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  54. var e error
  55. fileName := v.FileName()
  56. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  57. if !canRead {
  58. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  59. }
  60. if canWrite {
  61. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  62. v.lastModifiedTime = uint64(modifiedTime.Unix())
  63. } else {
  64. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  65. v.dataFile, e = os.Open(fileName + ".dat")
  66. v.readOnly = true
  67. }
  68. } else {
  69. if createDatIfMissing {
  70. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  71. } else {
  72. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  73. }
  74. }
  75. if e != nil {
  76. if !os.IsPermission(e) {
  77. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  78. }
  79. }
  80. if v.ReplicaPlacement == nil {
  81. e = v.readSuperBlock()
  82. } else {
  83. e = v.maybeWriteSuperBlock()
  84. }
  85. if e == nil && alsoLoadIndex {
  86. var indexFile *os.File
  87. if v.readOnly {
  88. glog.V(1).Infoln("open to read file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  90. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  91. }
  92. } else {
  93. glog.V(1).Infoln("open to write file", fileName+".idx")
  94. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  95. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  96. }
  97. }
  98. switch needleMapKind {
  99. case NeedleMapInMemory:
  100. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  101. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  102. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  103. }
  104. case NeedleMapLevelDb:
  105. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  106. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  107. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  108. }
  109. case NeedleMapBoltDb:
  110. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  111. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  112. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  113. }
  114. }
  115. }
  116. return e
  117. }
  118. func (v *Volume) Version() Version {
  119. return v.SuperBlock.Version()
  120. }
  121. func (v *Volume) Size() int64 {
  122. stat, e := v.dataFile.Stat()
  123. if e == nil {
  124. return stat.Size()
  125. }
  126. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  127. return -1
  128. }
  129. // Close cleanly shuts down this volume
  130. func (v *Volume) Close() {
  131. v.dataFileAccessLock.Lock()
  132. defer v.dataFileAccessLock.Unlock()
  133. v.nm.Close()
  134. _ = v.dataFile.Close()
  135. }
  136. func (v *Volume) NeedToReplicate() bool {
  137. return v.ReplicaPlacement.GetCopyCount() > 1
  138. }
  139. // isFileUnchanged checks whether this needle to write is same as last one.
  140. // It requires serialized access in the same volume.
  141. func (v *Volume) isFileUnchanged(n *Needle) bool {
  142. if v.Ttl.String() != "" {
  143. return false
  144. }
  145. nv, ok := v.nm.Get(n.Id)
  146. if ok && nv.Offset > 0 {
  147. oldNeedle := new(Needle)
  148. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  149. if err != nil {
  150. glog.V(0).Infof("Failed to check updated file %v", err)
  151. return false
  152. }
  153. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  154. n.DataSize = oldNeedle.DataSize
  155. return true
  156. }
  157. }
  158. return false
  159. }
  160. // Destroy removes everything related to this volume
  161. func (v *Volume) Destroy() (err error) {
  162. if v.readOnly {
  163. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  164. return
  165. }
  166. v.Close()
  167. err = os.Remove(v.dataFile.Name())
  168. if err != nil {
  169. return
  170. }
  171. err = v.nm.Destroy()
  172. return
  173. }
  174. // AppendBlob append a blob to end of the data file, used in replication
  175. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  176. if v.readOnly {
  177. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  178. return
  179. }
  180. v.dataFileAccessLock.Lock()
  181. defer v.dataFileAccessLock.Unlock()
  182. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  183. glog.V(0).Infof("failed to seek the end of file: %v", err)
  184. return
  185. }
  186. //ensure file writing starting from aligned positions
  187. if offset%NeedlePaddingSize != 0 {
  188. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  189. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  190. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  191. return
  192. }
  193. }
  194. v.dataFile.Write(b)
  195. return
  196. }
  197. func (v *Volume) write(n *Needle) (size uint32, err error) {
  198. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  199. if v.readOnly {
  200. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  201. return
  202. }
  203. v.dataFileAccessLock.Lock()
  204. defer v.dataFileAccessLock.Unlock()
  205. if v.isFileUnchanged(n) {
  206. size = n.DataSize
  207. glog.V(4).Infof("needle is unchanged!")
  208. return
  209. }
  210. var offset int64
  211. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  212. glog.V(0).Infof("failed to seek the end of file: %v", err)
  213. return
  214. }
  215. //ensure file writing starting from aligned positions
  216. if offset%NeedlePaddingSize != 0 {
  217. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  218. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  219. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  220. return
  221. }
  222. }
  223. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  224. if e := v.dataFile.Truncate(offset); e != nil {
  225. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  226. }
  227. return
  228. }
  229. nv, ok := v.nm.Get(n.Id)
  230. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  231. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  232. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  233. }
  234. }
  235. if v.lastModifiedTime < n.LastModified {
  236. v.lastModifiedTime = n.LastModified
  237. }
  238. return
  239. }
  240. func (v *Volume) delete(n *Needle) (uint32, error) {
  241. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  242. if v.readOnly {
  243. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  244. }
  245. v.dataFileAccessLock.Lock()
  246. defer v.dataFileAccessLock.Unlock()
  247. nv, ok := v.nm.Get(n.Id)
  248. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  249. if ok {
  250. size := nv.Size
  251. if err := v.nm.Delete(n.Id); err != nil {
  252. return size, err
  253. }
  254. if _, err := v.dataFile.Seek(0, 2); err != nil {
  255. return size, err
  256. }
  257. n.Data = nil
  258. _, err := n.Append(v.dataFile, v.Version())
  259. return size, err
  260. }
  261. return 0, nil
  262. }
  263. // read fills in Needle content by looking up n.Id from NeedleMapper
  264. func (v *Volume) readNeedle(n *Needle) (int, error) {
  265. nv, ok := v.nm.Get(n.Id)
  266. if !ok || nv.Offset == 0 {
  267. return -1, errors.New("Not Found")
  268. }
  269. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  270. if err != nil {
  271. return 0, err
  272. }
  273. bytesRead := len(n.Data)
  274. if !n.HasTtl() {
  275. return bytesRead, nil
  276. }
  277. ttlMinutes := n.Ttl.Minutes()
  278. if ttlMinutes == 0 {
  279. return bytesRead, nil
  280. }
  281. if !n.HasLastModifiedDate() {
  282. return bytesRead, nil
  283. }
  284. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  285. return bytesRead, nil
  286. }
  287. return -1, errors.New("Not Found")
  288. }
  289. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  290. needleMapKind NeedleMapType,
  291. visitSuperBlock func(SuperBlock) error,
  292. readNeedleBody bool,
  293. visitNeedle func(n *Needle, offset int64) error) (err error) {
  294. var v *Volume
  295. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  296. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  297. }
  298. if err = visitSuperBlock(v.SuperBlock); err != nil {
  299. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  300. }
  301. version := v.Version()
  302. offset := int64(SuperBlockSize)
  303. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  304. if e != nil {
  305. err = fmt.Errorf("cannot read needle header: %v", e)
  306. return
  307. }
  308. for n != nil {
  309. if readNeedleBody {
  310. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  311. glog.V(0).Infof("cannot read needle body: %v", err)
  312. //err = fmt.Errorf("cannot read needle body: %v", err)
  313. //return
  314. }
  315. if n.DataSize >= n.Size {
  316. // this should come from a bug reported on #87 and #93
  317. // fixed in v0.69
  318. // remove this whole "if" clause later, long after 0.69
  319. oldRest, oldSize := rest, n.Size
  320. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  321. n.Size = 0
  322. rest = n.Size + NeedleChecksumSize + padding
  323. if rest%NeedlePaddingSize != 0 {
  324. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  325. }
  326. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  327. }
  328. }
  329. if err = visitNeedle(n, offset); err != nil {
  330. glog.V(0).Infof("visit needle error: %v", err)
  331. }
  332. offset += int64(NeedleHeaderSize) + int64(rest)
  333. glog.V(4).Infof("==> new entry offset %d", offset)
  334. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  335. if err == io.EOF {
  336. return nil
  337. }
  338. return fmt.Errorf("cannot read needle header: %v", err)
  339. }
  340. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  341. }
  342. return
  343. }
  344. func (v *Volume) ContentSize() uint64 {
  345. return v.nm.ContentSize()
  346. }
  347. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  348. exists = true
  349. fi, err := os.Stat(filename)
  350. if os.IsNotExist(err) {
  351. exists = false
  352. return
  353. }
  354. if fi.Mode()&0400 != 0 {
  355. canRead = true
  356. }
  357. if fi.Mode()&0200 != 0 {
  358. canWrite = true
  359. }
  360. modTime = fi.ModTime()
  361. return
  362. }
  363. // volume is expired if modified time + volume ttl < now
  364. // except when volume is empty
  365. // or when the volume does not have a ttl
  366. // or when volumeSizeLimit is 0 when server just starts
  367. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  368. if volumeSizeLimit == 0 {
  369. //skip if we don't know size limit
  370. return false
  371. }
  372. if v.ContentSize() == 0 {
  373. return false
  374. }
  375. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  376. return false
  377. }
  378. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  379. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  380. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  381. if int64(v.Ttl.Minutes()) < livedMinutes {
  382. return true
  383. }
  384. return false
  385. }
  386. // wait either maxDelayMinutes or 10% of ttl minutes
  387. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  388. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  389. return false
  390. }
  391. removalDelay := v.Ttl.Minutes() / 10
  392. if removalDelay > maxDelayMinutes {
  393. removalDelay = maxDelayMinutes
  394. }
  395. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  396. return true
  397. }
  398. return false
  399. }