You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

425 lines
12 KiB

10 years ago
12 years ago
10 years ago
12 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. dataFileAccessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Collection: collection, Id: id}
  37. v.SuperBlock = SuperBlock{}
  38. v.needleMapKind = needleMapKind
  39. e = v.load(false, false, needleMapKind)
  40. return
  41. }
  42. func (v *Volume) FileName() (fileName string) {
  43. if v.Collection == "" {
  44. fileName = path.Join(v.dir, v.Id.String())
  45. } else {
  46. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  47. }
  48. return
  49. }
  50. func (v *Volume) DataFile() *os.File {
  51. return v.dataFile
  52. }
  53. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  54. var e error
  55. fileName := v.FileName()
  56. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  57. if !canRead {
  58. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  59. }
  60. if canWrite {
  61. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  62. v.lastModifiedTime = uint64(modifiedTime.Unix())
  63. } else {
  64. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  65. v.dataFile, e = os.Open(fileName + ".dat")
  66. v.readOnly = true
  67. }
  68. } else {
  69. if createDatIfMissing {
  70. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  71. } else {
  72. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  73. }
  74. }
  75. if e != nil {
  76. if !os.IsPermission(e) {
  77. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  78. }
  79. }
  80. if v.ReplicaPlacement == nil {
  81. e = v.readSuperBlock()
  82. } else {
  83. e = v.maybeWriteSuperBlock()
  84. }
  85. if e == nil && alsoLoadIndex {
  86. var indexFile *os.File
  87. if v.readOnly {
  88. glog.V(1).Infoln("open to read file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  90. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  91. }
  92. } else {
  93. glog.V(1).Infoln("open to write file", fileName+".idx")
  94. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  95. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  96. }
  97. }
  98. switch needleMapKind {
  99. case NeedleMapInMemory:
  100. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  101. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  102. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  103. }
  104. case NeedleMapLevelDb:
  105. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  106. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  107. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  108. }
  109. case NeedleMapBoltDb:
  110. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  111. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  112. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  113. }
  114. }
  115. }
  116. return e
  117. }
  118. func (v *Volume) Version() Version {
  119. return v.SuperBlock.Version()
  120. }
  121. func (v *Volume) Size() int64 {
  122. stat, e := v.dataFile.Stat()
  123. if e == nil {
  124. return stat.Size()
  125. }
  126. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  127. return -1
  128. }
  129. // Close cleanly shuts down this volume
  130. func (v *Volume) Close() {
  131. v.dataFileAccessLock.Lock()
  132. defer v.dataFileAccessLock.Unlock()
  133. v.nm.Close()
  134. _ = v.dataFile.Close()
  135. }
  136. func (v *Volume) NeedToReplicate() bool {
  137. return v.ReplicaPlacement.GetCopyCount() > 1
  138. }
  139. // isFileUnchanged checks whether this needle to write is same as last one.
  140. // It requires serialized access in the same volume.
  141. func (v *Volume) isFileUnchanged(n *Needle) bool {
  142. nv, ok := v.nm.Get(n.Id)
  143. if ok && nv.Offset > 0 {
  144. oldNeedle := new(Needle)
  145. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  146. if err != nil {
  147. glog.V(0).Infof("Failed to check updated file %v", err)
  148. return false
  149. }
  150. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  151. n.DataSize = oldNeedle.DataSize
  152. return true
  153. }
  154. }
  155. return false
  156. }
  157. // Destroy removes everything related to this volume
  158. func (v *Volume) Destroy() (err error) {
  159. if v.readOnly {
  160. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  161. return
  162. }
  163. v.Close()
  164. err = os.Remove(v.dataFile.Name())
  165. if err != nil {
  166. return
  167. }
  168. err = v.nm.Destroy()
  169. return
  170. }
  171. // AppendBlob append a blob to end of the data file, used in replication
  172. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  173. if v.readOnly {
  174. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  175. return
  176. }
  177. v.dataFileAccessLock.Lock()
  178. defer v.dataFileAccessLock.Unlock()
  179. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  180. glog.V(0).Infof("failed to seek the end of file: %v", err)
  181. return
  182. }
  183. //ensure file writing starting from aligned positions
  184. if offset%NeedlePaddingSize != 0 {
  185. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  186. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  187. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  188. return
  189. }
  190. }
  191. v.dataFile.Write(b)
  192. return
  193. }
  194. func (v *Volume) write(n *Needle) (size uint32, err error) {
  195. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  196. if v.readOnly {
  197. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  198. return
  199. }
  200. v.dataFileAccessLock.Lock()
  201. defer v.dataFileAccessLock.Unlock()
  202. if v.isFileUnchanged(n) {
  203. size = n.DataSize
  204. glog.V(4).Infof("needle is unchanged!")
  205. return
  206. }
  207. var offset int64
  208. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  209. glog.V(0).Infof("failed to seek the end of file: %v", err)
  210. return
  211. }
  212. //ensure file writing starting from aligned positions
  213. if offset%NeedlePaddingSize != 0 {
  214. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  215. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  216. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  217. return
  218. }
  219. }
  220. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  221. if e := v.dataFile.Truncate(offset); e != nil {
  222. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  223. }
  224. return
  225. }
  226. nv, ok := v.nm.Get(n.Id)
  227. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  228. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  229. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  230. }
  231. }
  232. if v.lastModifiedTime < n.LastModified {
  233. v.lastModifiedTime = n.LastModified
  234. }
  235. return
  236. }
  237. func (v *Volume) delete(n *Needle) (uint32, error) {
  238. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  239. if v.readOnly {
  240. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  241. }
  242. v.dataFileAccessLock.Lock()
  243. defer v.dataFileAccessLock.Unlock()
  244. nv, ok := v.nm.Get(n.Id)
  245. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  246. if ok {
  247. size := nv.Size
  248. if err := v.nm.Delete(n.Id); err != nil {
  249. return size, err
  250. }
  251. if _, err := v.dataFile.Seek(0, 2); err != nil {
  252. return size, err
  253. }
  254. n.Data = nil
  255. _, err := n.Append(v.dataFile, v.Version())
  256. return size, err
  257. }
  258. return 0, nil
  259. }
  260. // read fills in Needle content by looking up n.Id from NeedleMapper
  261. func (v *Volume) readNeedle(n *Needle) (int, error) {
  262. nv, ok := v.nm.Get(n.Id)
  263. if !ok || nv.Offset == 0 {
  264. return -1, errors.New("Not Found")
  265. }
  266. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  267. if err != nil {
  268. return 0, err
  269. }
  270. bytesRead := len(n.Data)
  271. if !n.HasTtl() {
  272. return bytesRead, nil
  273. }
  274. ttlMinutes := n.Ttl.Minutes()
  275. if ttlMinutes == 0 {
  276. return bytesRead, nil
  277. }
  278. if !n.HasLastModifiedDate() {
  279. return bytesRead, nil
  280. }
  281. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  282. return bytesRead, nil
  283. }
  284. return -1, errors.New("Not Found")
  285. }
  286. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  287. needleMapKind NeedleMapType,
  288. visitSuperBlock func(SuperBlock) error,
  289. readNeedleBody bool,
  290. visitNeedle func(n *Needle, offset int64) error) (err error) {
  291. var v *Volume
  292. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  293. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  294. }
  295. if err = visitSuperBlock(v.SuperBlock); err != nil {
  296. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  297. }
  298. version := v.Version()
  299. offset := int64(SuperBlockSize)
  300. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  301. if e != nil {
  302. err = fmt.Errorf("cannot read needle header: %v", e)
  303. return
  304. }
  305. for n != nil {
  306. if readNeedleBody {
  307. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  308. glog.V(0).Infof("cannot read needle body: %v", err)
  309. //err = fmt.Errorf("cannot read needle body: %v", err)
  310. //return
  311. }
  312. if n.DataSize >= n.Size {
  313. // this should come from a bug reported on #87 and #93
  314. // fixed in v0.69
  315. // remove this whole "if" clause later, long after 0.69
  316. oldRest, oldSize := rest, n.Size
  317. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  318. n.Size = 0
  319. rest = n.Size + NeedleChecksumSize + padding
  320. if rest%NeedlePaddingSize != 0 {
  321. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  322. }
  323. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  324. }
  325. }
  326. if err = visitNeedle(n, offset); err != nil {
  327. glog.V(0).Infof("visit needle error: %v", err)
  328. }
  329. offset += int64(NeedleHeaderSize) + int64(rest)
  330. glog.V(4).Infof("==> new entry offset %d", offset)
  331. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  332. if err == io.EOF {
  333. return nil
  334. }
  335. return fmt.Errorf("cannot read needle header: %v", err)
  336. }
  337. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  338. }
  339. return
  340. }
  341. func (v *Volume) ContentSize() uint64 {
  342. return v.nm.ContentSize()
  343. }
  344. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  345. exists = true
  346. fi, err := os.Stat(filename)
  347. if os.IsNotExist(err) {
  348. exists = false
  349. return
  350. }
  351. if fi.Mode()&0400 != 0 {
  352. canRead = true
  353. }
  354. if fi.Mode()&0200 != 0 {
  355. canWrite = true
  356. }
  357. modTime = fi.ModTime()
  358. return
  359. }
  360. // volume is expired if modified time + volume ttl < now
  361. // except when volume is empty
  362. // or when the volume does not have a ttl
  363. // or when volumeSizeLimit is 0 when server just starts
  364. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  365. if volumeSizeLimit == 0 {
  366. //skip if we don't know size limit
  367. return false
  368. }
  369. if v.ContentSize() == 0 {
  370. return false
  371. }
  372. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  373. return false
  374. }
  375. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  376. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  377. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  378. if int64(v.Ttl.Minutes()) < livedMinutes {
  379. return true
  380. }
  381. return false
  382. }
  383. // wait either maxDelayMinutes or 10% of ttl minutes
  384. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  385. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  386. return false
  387. }
  388. removalDelay := v.Ttl.Minutes() / 10
  389. if removalDelay > maxDelayMinutes {
  390. removalDelay = maxDelayMinutes
  391. }
  392. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  393. return true
  394. }
  395. return false
  396. }