You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
12 KiB

10 years ago
12 years ago
10 years ago
12 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. dataFileAccessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Collection: collection, Id: id}
  37. v.SuperBlock = SuperBlock{}
  38. v.needleMapKind = needleMapKind
  39. e = v.load(false, false, needleMapKind)
  40. return
  41. }
  42. func (v *Volume) FileName() (fileName string) {
  43. if v.Collection == "" {
  44. fileName = path.Join(v.dir, v.Id.String())
  45. } else {
  46. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  47. }
  48. return
  49. }
  50. func (v *Volume) DataFile() *os.File {
  51. return v.dataFile
  52. }
  53. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  54. var e error
  55. fileName := v.FileName()
  56. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  57. if !canRead {
  58. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  59. }
  60. if canWrite {
  61. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  62. v.lastModifiedTime = uint64(modifiedTime.Unix())
  63. } else {
  64. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  65. v.dataFile, e = os.Open(fileName + ".dat")
  66. v.readOnly = true
  67. }
  68. } else {
  69. if createDatIfMissing {
  70. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  71. } else {
  72. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  73. }
  74. }
  75. if e != nil {
  76. if !os.IsPermission(e) {
  77. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  78. }
  79. }
  80. if v.ReplicaPlacement == nil {
  81. e = v.readSuperBlock()
  82. } else {
  83. e = v.maybeWriteSuperBlock()
  84. }
  85. if e == nil && alsoLoadIndex {
  86. var indexFile *os.File
  87. if v.readOnly {
  88. glog.V(1).Infoln("open to read file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  90. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  91. }
  92. } else {
  93. glog.V(1).Infoln("open to write file", fileName+".idx")
  94. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  95. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  96. }
  97. }
  98. switch needleMapKind {
  99. case NeedleMapInMemory:
  100. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  101. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  102. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  103. }
  104. case NeedleMapLevelDb:
  105. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  106. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  107. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  108. }
  109. case NeedleMapBoltDb:
  110. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  111. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  112. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  113. }
  114. }
  115. }
  116. return e
  117. }
  118. func (v *Volume) Version() Version {
  119. return v.SuperBlock.Version()
  120. }
  121. func (v *Volume) Size() int64 {
  122. stat, e := v.dataFile.Stat()
  123. if e == nil {
  124. return stat.Size()
  125. }
  126. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  127. return -1
  128. }
  129. // Close cleanly shuts down this volume
  130. func (v *Volume) Close() {
  131. v.dataFileAccessLock.Lock()
  132. defer v.dataFileAccessLock.Unlock()
  133. v.nm.Close()
  134. _ = v.dataFile.Close()
  135. }
  136. func (v *Volume) NeedToReplicate() bool {
  137. return v.ReplicaPlacement.GetCopyCount() > 1
  138. }
  139. // isFileUnchanged checks whether this needle to write is same as last one.
  140. // It requires serialized access in the same volume.
  141. func (v *Volume) isFileUnchanged(n *Needle) bool {
  142. if v.Ttl.String() != "" {
  143. return false
  144. }
  145. nv, ok := v.nm.Get(n.Id)
  146. if ok && nv.Offset > 0 {
  147. oldNeedle := new(Needle)
  148. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  149. if err != nil {
  150. glog.V(0).Infof("Failed to check updated file %v", err)
  151. return false
  152. }
  153. defer oldNeedle.ReleaseMemory()
  154. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  155. n.DataSize = oldNeedle.DataSize
  156. return true
  157. }
  158. }
  159. return false
  160. }
  161. // Destroy removes everything related to this volume
  162. func (v *Volume) Destroy() (err error) {
  163. if v.readOnly {
  164. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  165. return
  166. }
  167. v.Close()
  168. err = os.Remove(v.dataFile.Name())
  169. if err != nil {
  170. return
  171. }
  172. err = v.nm.Destroy()
  173. return
  174. }
  175. // AppendBlob append a blob to end of the data file, used in replication
  176. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  177. if v.readOnly {
  178. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  179. return
  180. }
  181. v.dataFileAccessLock.Lock()
  182. defer v.dataFileAccessLock.Unlock()
  183. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  184. glog.V(0).Infof("failed to seek the end of file: %v", err)
  185. return
  186. }
  187. //ensure file writing starting from aligned positions
  188. if offset%NeedlePaddingSize != 0 {
  189. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  190. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  191. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  192. return
  193. }
  194. }
  195. v.dataFile.Write(b)
  196. return
  197. }
  198. func (v *Volume) write(n *Needle) (size uint32, err error) {
  199. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  200. if v.readOnly {
  201. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  202. return
  203. }
  204. v.dataFileAccessLock.Lock()
  205. defer v.dataFileAccessLock.Unlock()
  206. if v.isFileUnchanged(n) {
  207. size = n.DataSize
  208. glog.V(4).Infof("needle is unchanged!")
  209. return
  210. }
  211. var offset int64
  212. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  213. glog.V(0).Infof("failed to seek the end of file: %v", err)
  214. return
  215. }
  216. //ensure file writing starting from aligned positions
  217. if offset%NeedlePaddingSize != 0 {
  218. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  219. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  220. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  221. return
  222. }
  223. }
  224. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  225. if e := v.dataFile.Truncate(offset); e != nil {
  226. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  227. }
  228. return
  229. }
  230. nv, ok := v.nm.Get(n.Id)
  231. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  232. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  233. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  234. }
  235. }
  236. if v.lastModifiedTime < n.LastModified {
  237. v.lastModifiedTime = n.LastModified
  238. }
  239. return
  240. }
  241. func (v *Volume) delete(n *Needle) (uint32, error) {
  242. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  243. if v.readOnly {
  244. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  245. }
  246. v.dataFileAccessLock.Lock()
  247. defer v.dataFileAccessLock.Unlock()
  248. nv, ok := v.nm.Get(n.Id)
  249. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  250. if ok {
  251. size := nv.Size
  252. if err := v.nm.Delete(n.Id); err != nil {
  253. return size, err
  254. }
  255. if _, err := v.dataFile.Seek(0, 2); err != nil {
  256. return size, err
  257. }
  258. n.Data = nil
  259. _, err := n.Append(v.dataFile, v.Version())
  260. return size, err
  261. }
  262. return 0, nil
  263. }
  264. // read fills in Needle content by looking up n.Id from NeedleMapper
  265. func (v *Volume) readNeedle(n *Needle) (int, error) {
  266. nv, ok := v.nm.Get(n.Id)
  267. if !ok || nv.Offset == 0 {
  268. return -1, errors.New("Not Found")
  269. }
  270. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  271. if err != nil {
  272. return 0, err
  273. }
  274. bytesRead := len(n.Data)
  275. if !n.HasTtl() {
  276. return bytesRead, nil
  277. }
  278. ttlMinutes := n.Ttl.Minutes()
  279. if ttlMinutes == 0 {
  280. return bytesRead, nil
  281. }
  282. if !n.HasLastModifiedDate() {
  283. return bytesRead, nil
  284. }
  285. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  286. return bytesRead, nil
  287. }
  288. n.ReleaseMemory()
  289. return -1, errors.New("Not Found")
  290. }
  291. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  292. needleMapKind NeedleMapType,
  293. visitSuperBlock func(SuperBlock) error,
  294. readNeedleBody bool,
  295. visitNeedle func(n *Needle, offset int64) error) (err error) {
  296. var v *Volume
  297. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  298. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  299. }
  300. if err = visitSuperBlock(v.SuperBlock); err != nil {
  301. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  302. }
  303. version := v.Version()
  304. offset := int64(SuperBlockSize)
  305. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  306. if e != nil {
  307. err = fmt.Errorf("cannot read needle header: %v", e)
  308. return
  309. }
  310. for n != nil {
  311. if readNeedleBody {
  312. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  313. glog.V(0).Infof("cannot read needle body: %v", err)
  314. //err = fmt.Errorf("cannot read needle body: %v", err)
  315. //return
  316. }
  317. if n.DataSize >= n.Size {
  318. // this should come from a bug reported on #87 and #93
  319. // fixed in v0.69
  320. // remove this whole "if" clause later, long after 0.69
  321. oldRest, oldSize := rest, n.Size
  322. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  323. n.Size = 0
  324. rest = n.Size + NeedleChecksumSize + padding
  325. if rest%NeedlePaddingSize != 0 {
  326. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  327. }
  328. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  329. }
  330. }
  331. if err = visitNeedle(n, offset); err != nil {
  332. glog.V(0).Infof("visit needle error: %v", err)
  333. }
  334. offset += int64(NeedleHeaderSize) + int64(rest)
  335. glog.V(4).Infof("==> new entry offset %d", offset)
  336. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  337. if err == io.EOF {
  338. return nil
  339. }
  340. return fmt.Errorf("cannot read needle header: %v", err)
  341. }
  342. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  343. }
  344. return
  345. }
  346. func (v *Volume) ContentSize() uint64 {
  347. return v.nm.ContentSize()
  348. }
  349. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  350. exists = true
  351. fi, err := os.Stat(filename)
  352. if os.IsNotExist(err) {
  353. exists = false
  354. return
  355. }
  356. if fi.Mode()&0400 != 0 {
  357. canRead = true
  358. }
  359. if fi.Mode()&0200 != 0 {
  360. canWrite = true
  361. }
  362. modTime = fi.ModTime()
  363. return
  364. }
  365. // volume is expired if modified time + volume ttl < now
  366. // except when volume is empty
  367. // or when the volume does not have a ttl
  368. // or when volumeSizeLimit is 0 when server just starts
  369. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  370. if volumeSizeLimit == 0 {
  371. //skip if we don't know size limit
  372. return false
  373. }
  374. if v.ContentSize() == 0 {
  375. return false
  376. }
  377. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  378. return false
  379. }
  380. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  381. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  382. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  383. if int64(v.Ttl.Minutes()) < livedMinutes {
  384. return true
  385. }
  386. return false
  387. }
  388. // wait either maxDelayMinutes or 10% of ttl minutes
  389. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  390. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  391. return false
  392. }
  393. removalDelay := v.Ttl.Minutes() / 10
  394. if removalDelay > maxDelayMinutes {
  395. removalDelay = maxDelayMinutes
  396. }
  397. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  398. return true
  399. }
  400. return false
  401. }