You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

408 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/weed-fs/go/glog"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. lastModifiedTime uint64 //unix time in seconds
  23. }
  24. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  25. v = &Volume{dir: dirname, Collection: collection, Id: id}
  26. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  27. e = v.load(true, true)
  28. return
  29. }
  30. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  31. v = &Volume{dir: dirname, Collection: collection, Id: id}
  32. v.SuperBlock = SuperBlock{}
  33. e = v.load(false, false)
  34. return
  35. }
  36. func (v *Volume) FileName() (fileName string) {
  37. if v.Collection == "" {
  38. fileName = path.Join(v.dir, v.Id.String())
  39. } else {
  40. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  41. }
  42. return
  43. }
  44. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  45. var e error
  46. fileName := v.FileName()
  47. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  48. if !canRead {
  49. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  50. }
  51. if canWrite {
  52. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  53. v.lastModifiedTime = uint64(modifiedTime.Unix())
  54. } else {
  55. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  56. v.dataFile, e = os.Open(fileName + ".dat")
  57. v.readOnly = true
  58. }
  59. } else {
  60. if createDatIfMissing {
  61. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  62. } else {
  63. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  64. }
  65. }
  66. if e != nil {
  67. if !os.IsPermission(e) {
  68. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  69. }
  70. }
  71. if v.ReplicaPlacement == nil {
  72. e = v.readSuperBlock()
  73. } else {
  74. e = v.maybeWriteSuperBlock()
  75. }
  76. if e == nil && alsoLoadIndex {
  77. if v.readOnly {
  78. if v.ensureConvertIdxToCdb(fileName) {
  79. v.nm, e = OpenCdbMap(fileName + ".cdb")
  80. return e
  81. }
  82. }
  83. var indexFile *os.File
  84. if v.readOnly {
  85. glog.V(1).Infoln("open to read file", fileName+".idx")
  86. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  87. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  88. }
  89. } else {
  90. glog.V(1).Infoln("open to write file", fileName+".idx")
  91. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  92. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  93. }
  94. }
  95. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  96. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  97. glog.V(0).Infoln("loading error:", e)
  98. }
  99. }
  100. return e
  101. }
  102. func (v *Volume) Version() Version {
  103. return v.SuperBlock.Version()
  104. }
  105. func (v *Volume) Size() int64 {
  106. stat, e := v.dataFile.Stat()
  107. if e == nil {
  108. return stat.Size()
  109. }
  110. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  111. return -1
  112. }
  113. func (v *Volume) Close() {
  114. v.accessLock.Lock()
  115. defer v.accessLock.Unlock()
  116. v.nm.Close()
  117. _ = v.dataFile.Close()
  118. }
  119. func (v *Volume) NeedToReplicate() bool {
  120. return v.ReplicaPlacement.GetCopyCount() > 1
  121. }
  122. func (v *Volume) isFileUnchanged(n *Needle) bool {
  123. nv, ok := v.nm.Get(n.Id)
  124. if ok && nv.Offset > 0 {
  125. oldNeedle := new(Needle)
  126. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  127. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  128. n.Size = oldNeedle.Size
  129. return true
  130. }
  131. }
  132. return false
  133. }
  134. func (v *Volume) Destroy() (err error) {
  135. if v.readOnly {
  136. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  137. return
  138. }
  139. v.Close()
  140. err = os.Remove(v.dataFile.Name())
  141. if err != nil {
  142. return
  143. }
  144. err = v.nm.Destroy()
  145. return
  146. }
  147. func (v *Volume) write(n *Needle) (size uint32, err error) {
  148. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  149. if v.readOnly {
  150. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  151. return
  152. }
  153. v.accessLock.Lock()
  154. defer v.accessLock.Unlock()
  155. if v.isFileUnchanged(n) {
  156. size = n.Size
  157. glog.V(4).Infof("needle is unchanged!")
  158. return
  159. }
  160. var offset int64
  161. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  162. glog.V(0).Infof("faile to seek the end of file: %v", err)
  163. return
  164. }
  165. //ensure file writing starting from aligned positions
  166. if offset%NeedlePaddingSize != 0 {
  167. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  168. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  169. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  170. return
  171. }
  172. }
  173. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  174. if e := v.dataFile.Truncate(offset); e != nil {
  175. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  176. }
  177. return
  178. }
  179. nv, ok := v.nm.Get(n.Id)
  180. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  181. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  182. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  183. }
  184. }
  185. if v.lastModifiedTime < n.LastModified {
  186. v.lastModifiedTime = n.LastModified
  187. }
  188. return
  189. }
  190. func (v *Volume) delete(n *Needle) (uint32, error) {
  191. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  192. if v.readOnly {
  193. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  194. }
  195. v.accessLock.Lock()
  196. defer v.accessLock.Unlock()
  197. nv, ok := v.nm.Get(n.Id)
  198. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  199. if ok {
  200. size := nv.Size
  201. if err := v.nm.Delete(n.Id); err != nil {
  202. return size, err
  203. }
  204. if _, err := v.dataFile.Seek(0, 2); err != nil {
  205. return size, err
  206. }
  207. n.Data = make([]byte, 0)
  208. _, err := n.Append(v.dataFile, v.Version())
  209. return size, err
  210. }
  211. return 0, nil
  212. }
  213. func (v *Volume) read(n *Needle) (int, error) {
  214. nv, ok := v.nm.Get(n.Id)
  215. if !ok || nv.Offset == 0 {
  216. return -1, errors.New("Not Found")
  217. }
  218. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  219. if err != nil {
  220. return bytesRead, err
  221. }
  222. if !n.HasTtl() {
  223. return bytesRead, err
  224. }
  225. ttlMinutes := n.Ttl.Minutes()
  226. if ttlMinutes == 0 {
  227. return bytesRead, nil
  228. }
  229. if !n.HasLastModifiedDate() {
  230. return bytesRead, nil
  231. }
  232. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  233. return bytesRead, nil
  234. }
  235. return -1, errors.New("Not Found")
  236. }
  237. func (v *Volume) freeze() error {
  238. if v.readOnly {
  239. return nil
  240. }
  241. nm, ok := v.nm.(*NeedleMap)
  242. if !ok {
  243. return nil
  244. }
  245. v.accessLock.Lock()
  246. defer v.accessLock.Unlock()
  247. bn, _ := baseFilename(v.dataFile.Name())
  248. cdbFn := bn + ".cdb"
  249. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  250. err := DumpNeedleMapToCdb(cdbFn, nm)
  251. if err != nil {
  252. return err
  253. }
  254. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  255. return err
  256. }
  257. nm.indexFile.Close()
  258. os.Remove(nm.indexFile.Name())
  259. v.readOnly = true
  260. return nil
  261. }
  262. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  263. visitSuperBlock func(SuperBlock) error,
  264. readNeedleBody bool,
  265. visitNeedle func(n *Needle, offset int64) error) (err error) {
  266. var v *Volume
  267. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  268. return errors.New("Failed to load volume:" + err.Error())
  269. }
  270. if err = visitSuperBlock(v.SuperBlock); err != nil {
  271. return errors.New("Failed to read super block:" + err.Error())
  272. }
  273. version := v.Version()
  274. offset := int64(SuperBlockSize)
  275. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  276. if e != nil {
  277. err = fmt.Errorf("cannot read needle header: %v", e)
  278. return
  279. }
  280. for n != nil {
  281. if readNeedleBody {
  282. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  283. err = fmt.Errorf("cannot read needle body: %v", err)
  284. return
  285. }
  286. }
  287. if err = visitNeedle(n, offset); err != nil {
  288. return
  289. }
  290. offset += int64(NeedleHeaderSize) + int64(rest)
  291. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  292. if err == io.EOF {
  293. return nil
  294. }
  295. return fmt.Errorf("cannot read needle header: %v", err)
  296. }
  297. }
  298. return
  299. }
  300. func (v *Volume) ContentSize() uint64 {
  301. return v.nm.ContentSize()
  302. }
  303. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  304. exists = true
  305. fi, err := os.Stat(filename)
  306. if os.IsNotExist(err) {
  307. exists = false
  308. return
  309. }
  310. if fi.Mode()&0400 != 0 {
  311. canRead = true
  312. }
  313. if fi.Mode()&0200 != 0 {
  314. canWrite = true
  315. }
  316. modTime = fi.ModTime()
  317. return
  318. }
  319. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  320. var indexFile *os.File
  321. var e error
  322. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  323. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  324. if cdbCanRead && cdbModTime.After(idxModeTime) {
  325. return true
  326. }
  327. if !cdbCanWrite {
  328. return false
  329. }
  330. if !idxCanRead {
  331. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  332. return false
  333. }
  334. glog.V(2).Infoln("opening file", fileName+".idx")
  335. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  336. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  337. return false
  338. }
  339. defer indexFile.Close()
  340. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  341. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  342. glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
  343. return false
  344. }
  345. return true
  346. }
  347. // volume is expired if modified time + volume ttl < now
  348. // except when volume is empty
  349. // or when the volume does not have a ttl
  350. // or when volumeSizeLimit is 0 when server just starts
  351. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  352. if volumeSizeLimit == 0 {
  353. //skip if we don't know size limit
  354. return false
  355. }
  356. if v.ContentSize() == 0 {
  357. return false
  358. }
  359. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  360. return false
  361. }
  362. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  363. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  364. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  365. if int64(v.Ttl.Minutes()) < livedMinutes {
  366. return true
  367. }
  368. return false
  369. }
  370. // wait either maxDelayMinutes or 10% of ttl minutes
  371. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  372. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  373. return false
  374. }
  375. removalDelay := v.Ttl.Minutes() / 10
  376. if removalDelay > maxDelayMinutes {
  377. removalDelay = maxDelayMinutes
  378. }
  379. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  380. return true
  381. }
  382. return false
  383. }