You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. accessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Collection: collection, Id: id}
  37. v.SuperBlock = SuperBlock{}
  38. v.needleMapKind = needleMapKind
  39. e = v.load(false, false, needleMapKind)
  40. return
  41. }
  42. func (v *Volume) FileName() (fileName string) {
  43. if v.Collection == "" {
  44. fileName = path.Join(v.dir, v.Id.String())
  45. } else {
  46. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  47. }
  48. return
  49. }
  50. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  51. var e error
  52. fileName := v.FileName()
  53. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  54. if !canRead {
  55. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  56. }
  57. if canWrite {
  58. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  59. v.lastModifiedTime = uint64(modifiedTime.Unix())
  60. } else {
  61. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  62. v.dataFile, e = os.Open(fileName + ".dat")
  63. v.readOnly = true
  64. }
  65. } else {
  66. if createDatIfMissing {
  67. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  68. } else {
  69. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  70. }
  71. }
  72. if e != nil {
  73. if !os.IsPermission(e) {
  74. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  75. }
  76. }
  77. if v.ReplicaPlacement == nil {
  78. e = v.readSuperBlock()
  79. } else {
  80. e = v.maybeWriteSuperBlock()
  81. }
  82. if e == nil && alsoLoadIndex {
  83. var indexFile *os.File
  84. if v.readOnly {
  85. glog.V(1).Infoln("open to read file", fileName+".idx")
  86. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  87. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  88. }
  89. } else {
  90. glog.V(1).Infoln("open to write file", fileName+".idx")
  91. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  92. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  93. }
  94. }
  95. switch needleMapKind {
  96. case NeedleMapInMemory:
  97. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  98. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  99. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  100. }
  101. case NeedleMapLevelDb:
  102. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  103. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  104. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  105. }
  106. case NeedleMapBoltDb:
  107. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  108. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  109. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  110. }
  111. }
  112. }
  113. return e
  114. }
  115. func (v *Volume) Version() Version {
  116. return v.SuperBlock.Version()
  117. }
  118. func (v *Volume) Size() int64 {
  119. stat, e := v.dataFile.Stat()
  120. if e == nil {
  121. return stat.Size()
  122. }
  123. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  124. return -1
  125. }
  126. // Close cleanly shuts down this volume
  127. func (v *Volume) Close() {
  128. v.accessLock.Lock()
  129. defer v.accessLock.Unlock()
  130. v.nm.Close()
  131. _ = v.dataFile.Close()
  132. }
  133. func (v *Volume) NeedToReplicate() bool {
  134. return v.ReplicaPlacement.GetCopyCount() > 1
  135. }
  136. // isFileUnchanged checks whether this needle to write is same as last one.
  137. // It requires serialized access in the same volume.
  138. func (v *Volume) isFileUnchanged(n *Needle) bool {
  139. nv, ok := v.nm.Get(n.Id)
  140. if ok && nv.Offset > 0 {
  141. oldNeedle := new(Needle)
  142. _, err := oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  143. if err != nil {
  144. glog.V(0).Infof("Failed to check updated file %v", err)
  145. return false
  146. }
  147. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  148. n.DataSize = oldNeedle.DataSize
  149. return true
  150. }
  151. }
  152. return false
  153. }
  154. // Destroy removes everything related to this volume
  155. func (v *Volume) Destroy() (err error) {
  156. if v.readOnly {
  157. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  158. return
  159. }
  160. v.Close()
  161. err = os.Remove(v.dataFile.Name())
  162. if err != nil {
  163. return
  164. }
  165. err = v.nm.Destroy()
  166. return
  167. }
  168. func (v *Volume) write(n *Needle) (size uint32, err error) {
  169. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  170. if v.readOnly {
  171. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  172. return
  173. }
  174. v.accessLock.Lock()
  175. defer v.accessLock.Unlock()
  176. if v.isFileUnchanged(n) {
  177. size = n.DataSize
  178. glog.V(4).Infof("needle is unchanged!")
  179. return
  180. }
  181. var offset int64
  182. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  183. glog.V(0).Infof("failed to seek the end of file: %v", err)
  184. return
  185. }
  186. //ensure file writing starting from aligned positions
  187. if offset%NeedlePaddingSize != 0 {
  188. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  189. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  190. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  191. return
  192. }
  193. }
  194. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  195. if e := v.dataFile.Truncate(offset); e != nil {
  196. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  197. }
  198. return
  199. }
  200. nv, ok := v.nm.Get(n.Id)
  201. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  202. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  203. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  204. }
  205. }
  206. if v.lastModifiedTime < n.LastModified {
  207. v.lastModifiedTime = n.LastModified
  208. }
  209. return
  210. }
  211. func (v *Volume) delete(n *Needle) (uint32, error) {
  212. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  213. if v.readOnly {
  214. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  215. }
  216. v.accessLock.Lock()
  217. defer v.accessLock.Unlock()
  218. nv, ok := v.nm.Get(n.Id)
  219. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  220. if ok {
  221. size := nv.Size
  222. if err := v.nm.Delete(n.Id); err != nil {
  223. return size, err
  224. }
  225. if _, err := v.dataFile.Seek(0, 2); err != nil {
  226. return size, err
  227. }
  228. n.Data = nil
  229. _, err := n.Append(v.dataFile, v.Version())
  230. return size, err
  231. }
  232. return 0, nil
  233. }
  234. func (v *Volume) read(n *Needle) (int, error) {
  235. nv, ok := v.nm.Get(n.Id)
  236. if !ok || nv.Offset == 0 {
  237. return -1, errors.New("Not Found")
  238. }
  239. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  240. if err != nil {
  241. return bytesRead, err
  242. }
  243. if !n.HasTtl() {
  244. return bytesRead, err
  245. }
  246. ttlMinutes := n.Ttl.Minutes()
  247. if ttlMinutes == 0 {
  248. return bytesRead, nil
  249. }
  250. if !n.HasLastModifiedDate() {
  251. return bytesRead, nil
  252. }
  253. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  254. return bytesRead, nil
  255. }
  256. return -1, errors.New("Not Found")
  257. }
  258. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  259. needleMapKind NeedleMapType,
  260. visitSuperBlock func(SuperBlock) error,
  261. readNeedleBody bool,
  262. visitNeedle func(n *Needle, offset int64) error) (err error) {
  263. var v *Volume
  264. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  265. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  266. }
  267. if err = visitSuperBlock(v.SuperBlock); err != nil {
  268. return fmt.Errorf("Failed to read volume %d super block: %v", id, err)
  269. }
  270. version := v.Version()
  271. offset := int64(SuperBlockSize)
  272. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  273. if e != nil {
  274. err = fmt.Errorf("cannot read needle header: %v", e)
  275. return
  276. }
  277. for n != nil {
  278. if readNeedleBody {
  279. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  280. glog.V(0).Infof("cannot read needle body: %v", err)
  281. //err = fmt.Errorf("cannot read needle body: %v", err)
  282. //return
  283. }
  284. if n.DataSize >= n.Size {
  285. // this should come from a bug reported on #87 and #93
  286. // fixed in v0.69
  287. // remove this whole "if" clause later, long after 0.69
  288. oldRest, oldSize := rest, n.Size
  289. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  290. n.Size = 0
  291. rest = n.Size + NeedleChecksumSize + padding
  292. if rest%NeedlePaddingSize != 0 {
  293. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  294. }
  295. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  296. }
  297. }
  298. if err = visitNeedle(n, offset); err != nil {
  299. glog.V(0).Infof("visit needle error: %v", err)
  300. }
  301. offset += int64(NeedleHeaderSize) + int64(rest)
  302. glog.V(4).Infof("==> new entry offset %d", offset)
  303. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  304. if err == io.EOF {
  305. return nil
  306. }
  307. return fmt.Errorf("cannot read needle header: %v", err)
  308. }
  309. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  310. }
  311. return
  312. }
  313. func (v *Volume) ContentSize() uint64 {
  314. return v.nm.ContentSize()
  315. }
  316. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  317. exists = true
  318. fi, err := os.Stat(filename)
  319. if os.IsNotExist(err) {
  320. exists = false
  321. return
  322. }
  323. if fi.Mode()&0400 != 0 {
  324. canRead = true
  325. }
  326. if fi.Mode()&0200 != 0 {
  327. canWrite = true
  328. }
  329. modTime = fi.ModTime()
  330. return
  331. }
  332. // volume is expired if modified time + volume ttl < now
  333. // except when volume is empty
  334. // or when the volume does not have a ttl
  335. // or when volumeSizeLimit is 0 when server just starts
  336. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  337. if volumeSizeLimit == 0 {
  338. //skip if we don't know size limit
  339. return false
  340. }
  341. if v.ContentSize() == 0 {
  342. return false
  343. }
  344. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  345. return false
  346. }
  347. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  348. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  349. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  350. if int64(v.Ttl.Minutes()) < livedMinutes {
  351. return true
  352. }
  353. return false
  354. }
  355. // wait either maxDelayMinutes or 10% of ttl minutes
  356. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  357. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  358. return false
  359. }
  360. removalDelay := v.Ttl.Minutes() / 10
  361. if removalDelay > maxDelayMinutes {
  362. removalDelay = maxDelayMinutes
  363. }
  364. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  365. return true
  366. }
  367. return false
  368. }