You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

392 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. accessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  36. v = &Volume{dir: dirname, Collection: collection, Id: id}
  37. v.SuperBlock = SuperBlock{}
  38. v.needleMapKind = needleMapKind
  39. e = v.load(false, false, needleMapKind)
  40. return
  41. }
  42. func (v *Volume) FileName() (fileName string) {
  43. if v.Collection == "" {
  44. fileName = path.Join(v.dir, v.Id.String())
  45. } else {
  46. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  47. }
  48. return
  49. }
  50. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
  51. var e error
  52. fileName := v.FileName()
  53. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  54. if !canRead {
  55. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  56. }
  57. if canWrite {
  58. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  59. v.lastModifiedTime = uint64(modifiedTime.Unix())
  60. } else {
  61. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  62. v.dataFile, e = os.Open(fileName + ".dat")
  63. v.readOnly = true
  64. }
  65. } else {
  66. if createDatIfMissing {
  67. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  68. } else {
  69. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  70. }
  71. }
  72. if e != nil {
  73. if !os.IsPermission(e) {
  74. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  75. }
  76. }
  77. if v.ReplicaPlacement == nil {
  78. e = v.readSuperBlock()
  79. } else {
  80. e = v.maybeWriteSuperBlock()
  81. }
  82. if e == nil && alsoLoadIndex {
  83. var indexFile *os.File
  84. if v.readOnly {
  85. glog.V(1).Infoln("open to read file", fileName+".idx")
  86. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  87. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  88. }
  89. } else {
  90. glog.V(1).Infoln("open to write file", fileName+".idx")
  91. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  92. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  93. }
  94. }
  95. switch needleMapKind {
  96. case NeedleMapInMemory:
  97. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  98. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  99. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  100. }
  101. case NeedleMapLevelDb:
  102. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  103. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  104. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  105. }
  106. case NeedleMapBoltDb:
  107. glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
  108. if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
  109. glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
  110. }
  111. }
  112. }
  113. return e
  114. }
  115. func (v *Volume) Version() Version {
  116. return v.SuperBlock.Version()
  117. }
  118. func (v *Volume) Size() int64 {
  119. stat, e := v.dataFile.Stat()
  120. if e == nil {
  121. return stat.Size()
  122. }
  123. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  124. return -1
  125. }
  126. // Close cleanly shuts down this volume
  127. func (v *Volume) Close() {
  128. v.accessLock.Lock()
  129. defer v.accessLock.Unlock()
  130. v.nm.Close()
  131. _ = v.dataFile.Close()
  132. }
  133. func (v *Volume) NeedToReplicate() bool {
  134. return v.ReplicaPlacement.GetCopyCount() > 1
  135. }
  136. // isFileUnchanged checks whether this needle to write is same as last one.
  137. // It requires serialized access in the same volume.
  138. func (v *Volume) isFileUnchanged(n *Needle) bool {
  139. nv, ok := v.nm.Get(n.Id)
  140. if ok && nv.Offset > 0 {
  141. oldNeedle := new(Needle)
  142. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  143. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  144. n.DataSize = oldNeedle.DataSize
  145. return true
  146. }
  147. }
  148. return false
  149. }
  150. // Destroy removes everything related to this volume
  151. func (v *Volume) Destroy() (err error) {
  152. if v.readOnly {
  153. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  154. return
  155. }
  156. v.Close()
  157. err = os.Remove(v.dataFile.Name())
  158. if err != nil {
  159. return
  160. }
  161. err = v.nm.Destroy()
  162. return
  163. }
  164. func (v *Volume) write(n *Needle) (size uint32, err error) {
  165. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  166. if v.readOnly {
  167. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  168. return
  169. }
  170. v.accessLock.Lock()
  171. defer v.accessLock.Unlock()
  172. if v.isFileUnchanged(n) {
  173. size = n.DataSize
  174. glog.V(4).Infof("needle is unchanged!")
  175. return
  176. }
  177. var offset int64
  178. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  179. glog.V(0).Infof("failed to seek the end of file: %v", err)
  180. return
  181. }
  182. //ensure file writing starting from aligned positions
  183. if offset%NeedlePaddingSize != 0 {
  184. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  185. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  186. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  187. return
  188. }
  189. }
  190. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  191. if e := v.dataFile.Truncate(offset); e != nil {
  192. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  193. }
  194. return
  195. }
  196. nv, ok := v.nm.Get(n.Id)
  197. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  198. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  199. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  200. }
  201. }
  202. if v.lastModifiedTime < n.LastModified {
  203. v.lastModifiedTime = n.LastModified
  204. }
  205. return
  206. }
  207. func (v *Volume) delete(n *Needle) (uint32, error) {
  208. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  209. if v.readOnly {
  210. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  211. }
  212. v.accessLock.Lock()
  213. defer v.accessLock.Unlock()
  214. nv, ok := v.nm.Get(n.Id)
  215. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  216. if ok {
  217. size := nv.Size
  218. if err := v.nm.Delete(n.Id); err != nil {
  219. return size, err
  220. }
  221. if _, err := v.dataFile.Seek(0, 2); err != nil {
  222. return size, err
  223. }
  224. n.Data = nil
  225. _, err := n.Append(v.dataFile, v.Version())
  226. return size, err
  227. }
  228. return 0, nil
  229. }
  230. func (v *Volume) read(n *Needle) (int, error) {
  231. nv, ok := v.nm.Get(n.Id)
  232. if !ok || nv.Offset == 0 {
  233. return -1, errors.New("Not Found")
  234. }
  235. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  236. if err != nil {
  237. return bytesRead, err
  238. }
  239. if !n.HasTtl() {
  240. return bytesRead, err
  241. }
  242. ttlMinutes := n.Ttl.Minutes()
  243. if ttlMinutes == 0 {
  244. return bytesRead, nil
  245. }
  246. if !n.HasLastModifiedDate() {
  247. return bytesRead, nil
  248. }
  249. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  250. return bytesRead, nil
  251. }
  252. return -1, errors.New("Not Found")
  253. }
  254. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  255. needleMapKind NeedleMapType,
  256. visitSuperBlock func(SuperBlock) error,
  257. readNeedleBody bool,
  258. visitNeedle func(n *Needle, offset int64) error) (err error) {
  259. var v *Volume
  260. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  261. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  262. }
  263. if err = visitSuperBlock(v.SuperBlock); err != nil {
  264. return fmt.Errorf("Failed to read volume %d super block: %v", id, err)
  265. }
  266. version := v.Version()
  267. offset := int64(SuperBlockSize)
  268. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  269. if e != nil {
  270. err = fmt.Errorf("cannot read needle header: %v", e)
  271. return
  272. }
  273. for n != nil {
  274. if readNeedleBody {
  275. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  276. glog.V(0).Infof("cannot read needle body: %v", err)
  277. //err = fmt.Errorf("cannot read needle body: %v", err)
  278. //return
  279. }
  280. if n.DataSize >= n.Size {
  281. // this should come from a bug reported on #87 and #93
  282. // fixed in v0.69
  283. // remove this whole "if" clause later, long after 0.69
  284. oldRest, oldSize := rest, n.Size
  285. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  286. n.Size = 0
  287. rest = n.Size + NeedleChecksumSize + padding
  288. if rest%NeedlePaddingSize != 0 {
  289. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  290. }
  291. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  292. }
  293. }
  294. if err = visitNeedle(n, offset); err != nil {
  295. glog.V(0).Infof("visit needle error: %v", err)
  296. }
  297. offset += int64(NeedleHeaderSize) + int64(rest)
  298. glog.V(4).Infof("==> new entry offset %d", offset)
  299. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  300. if err == io.EOF {
  301. return nil
  302. }
  303. return fmt.Errorf("cannot read needle header: %v", err)
  304. }
  305. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  306. }
  307. return
  308. }
  309. func (v *Volume) ContentSize() uint64 {
  310. return v.nm.ContentSize()
  311. }
  312. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  313. exists = true
  314. fi, err := os.Stat(filename)
  315. if os.IsNotExist(err) {
  316. exists = false
  317. return
  318. }
  319. if fi.Mode()&0400 != 0 {
  320. canRead = true
  321. }
  322. if fi.Mode()&0200 != 0 {
  323. canWrite = true
  324. }
  325. modTime = fi.ModTime()
  326. return
  327. }
  328. // volume is expired if modified time + volume ttl < now
  329. // except when volume is empty
  330. // or when the volume does not have a ttl
  331. // or when volumeSizeLimit is 0 when server just starts
  332. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  333. if volumeSizeLimit == 0 {
  334. //skip if we don't know size limit
  335. return false
  336. }
  337. if v.ContentSize() == 0 {
  338. return false
  339. }
  340. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  341. return false
  342. }
  343. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  344. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  345. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  346. if int64(v.Ttl.Minutes()) < livedMinutes {
  347. return true
  348. }
  349. return false
  350. }
  351. // wait either maxDelayMinutes or 10% of ttl minutes
  352. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  353. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  354. return false
  355. }
  356. removalDelay := v.Ttl.Minutes() / 10
  357. if removalDelay > maxDelayMinutes {
  358. removalDelay = maxDelayMinutes
  359. }
  360. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  361. return true
  362. }
  363. return false
  364. }