You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "github.com/aszxqw/weed-fs/go/glog"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. lastModifiedTime uint64 //unix time in seconds
  23. }
  24. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  25. v = &Volume{dir: dirname, Collection: collection, Id: id}
  26. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  27. e = v.load(true, true)
  28. return
  29. }
  30. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  31. v = &Volume{dir: dirname, Collection: collection, Id: id}
  32. v.SuperBlock = SuperBlock{}
  33. e = v.load(false, false)
  34. return
  35. }
  36. func (v *Volume) FileName() (fileName string) {
  37. if v.Collection == "" {
  38. fileName = path.Join(v.dir, v.Id.String())
  39. } else {
  40. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  41. }
  42. return
  43. }
  44. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  45. var e error
  46. fileName := v.FileName()
  47. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  48. if !canRead {
  49. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  50. }
  51. if canWrite {
  52. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  53. v.lastModifiedTime = uint64(modifiedTime.Unix())
  54. } else {
  55. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  56. v.dataFile, e = os.Open(fileName + ".dat")
  57. v.readOnly = true
  58. }
  59. } else {
  60. if createDatIfMissing {
  61. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  62. } else {
  63. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  64. }
  65. }
  66. if e != nil {
  67. if !os.IsPermission(e) {
  68. return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error())
  69. }
  70. }
  71. if v.ReplicaPlacement == nil {
  72. e = v.readSuperBlock()
  73. } else {
  74. e = v.maybeWriteSuperBlock()
  75. }
  76. if e == nil && alsoLoadIndex {
  77. if v.readOnly {
  78. if v.ensureConvertIdxToCdb(fileName) {
  79. v.nm, e = OpenCdbMap(fileName + ".cdb")
  80. return e
  81. }
  82. }
  83. var indexFile *os.File
  84. if v.readOnly {
  85. glog.V(1).Infoln("open to read file", fileName+".idx")
  86. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  87. return fmt.Errorf("cannot read Volume Index %s.idx: %s", fileName, e.Error())
  88. }
  89. } else {
  90. glog.V(1).Infoln("open to write file", fileName+".idx")
  91. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  92. return fmt.Errorf("cannot write Volume Index %s.idx: %s", fileName, e.Error())
  93. }
  94. }
  95. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  96. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  97. glog.V(0).Infoln("loading error:", e)
  98. }
  99. }
  100. return e
  101. }
  102. func (v *Volume) Version() Version {
  103. return v.SuperBlock.Version()
  104. }
  105. func (v *Volume) Size() int64 {
  106. stat, e := v.dataFile.Stat()
  107. if e == nil {
  108. return stat.Size()
  109. }
  110. glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
  111. return -1
  112. }
  113. func (v *Volume) Close() {
  114. v.accessLock.Lock()
  115. defer v.accessLock.Unlock()
  116. v.nm.Close()
  117. _ = v.dataFile.Close()
  118. }
  119. func (v *Volume) NeedToReplicate() bool {
  120. return v.ReplicaPlacement.GetCopyCount() > 1
  121. }
  122. func (v *Volume) isFileUnchanged(n *Needle) bool {
  123. nv, ok := v.nm.Get(n.Id)
  124. if ok && nv.Offset > 0 {
  125. oldNeedle := new(Needle)
  126. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  127. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  128. n.Size = oldNeedle.Size
  129. return true
  130. }
  131. }
  132. return false
  133. }
  134. func (v *Volume) Destroy() (err error) {
  135. if v.readOnly {
  136. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  137. return
  138. }
  139. v.Close()
  140. err = os.Remove(v.dataFile.Name())
  141. if err != nil {
  142. return
  143. }
  144. err = v.nm.Destroy()
  145. return
  146. }
  147. func (v *Volume) write(n *Needle) (size uint32, err error) {
  148. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  149. if v.readOnly {
  150. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  151. return
  152. }
  153. v.accessLock.Lock()
  154. defer v.accessLock.Unlock()
  155. if v.isFileUnchanged(n) {
  156. size = n.Size
  157. glog.V(4).Infof("needle is unchanged!")
  158. return
  159. }
  160. var offset int64
  161. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  162. return
  163. }
  164. //ensure file writing starting from aligned positions
  165. if offset%NeedlePaddingSize != 0 {
  166. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  167. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  168. glog.V(4).Infof("failed to align in datafile %s: %s", v.dataFile.Name(), err.Error())
  169. return
  170. }
  171. }
  172. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  173. if e := v.dataFile.Truncate(offset); e != nil {
  174. err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile.Name(), e.Error())
  175. }
  176. return
  177. }
  178. nv, ok := v.nm.Get(n.Id)
  179. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  180. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  181. glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
  182. }
  183. }
  184. if v.lastModifiedTime < n.LastModified {
  185. v.lastModifiedTime = n.LastModified
  186. }
  187. return
  188. }
  189. func (v *Volume) delete(n *Needle) (uint32, error) {
  190. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  191. if v.readOnly {
  192. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  193. }
  194. v.accessLock.Lock()
  195. defer v.accessLock.Unlock()
  196. nv, ok := v.nm.Get(n.Id)
  197. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  198. if ok {
  199. size := nv.Size
  200. if err := v.nm.Delete(n.Id); err != nil {
  201. return size, err
  202. }
  203. if _, err := v.dataFile.Seek(0, 2); err != nil {
  204. return size, err
  205. }
  206. n.Data = make([]byte, 0)
  207. _, err := n.Append(v.dataFile, v.Version())
  208. return size, err
  209. }
  210. return 0, nil
  211. }
  212. func (v *Volume) read(n *Needle) (int, error) {
  213. nv, ok := v.nm.Get(n.Id)
  214. if !ok || nv.Offset == 0 {
  215. return -1, errors.New("Not Found")
  216. }
  217. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  218. if err != nil {
  219. return bytesRead, err
  220. }
  221. if !n.HasTtl() {
  222. return bytesRead, err
  223. }
  224. ttlMinutes := n.Ttl.Minutes()
  225. if ttlMinutes == 0 {
  226. return bytesRead, nil
  227. }
  228. if !n.HasLastModifiedDate() {
  229. return bytesRead, nil
  230. }
  231. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  232. return bytesRead, nil
  233. }
  234. return -1, errors.New("Not Found")
  235. }
  236. func (v *Volume) freeze() error {
  237. if v.readOnly {
  238. return nil
  239. }
  240. nm, ok := v.nm.(*NeedleMap)
  241. if !ok {
  242. return nil
  243. }
  244. v.accessLock.Lock()
  245. defer v.accessLock.Unlock()
  246. bn, _ := baseFilename(v.dataFile.Name())
  247. cdbFn := bn + ".cdb"
  248. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  249. err := DumpNeedleMapToCdb(cdbFn, nm)
  250. if err != nil {
  251. return err
  252. }
  253. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  254. return err
  255. }
  256. nm.indexFile.Close()
  257. os.Remove(nm.indexFile.Name())
  258. v.readOnly = true
  259. return nil
  260. }
  261. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  262. visitSuperBlock func(SuperBlock) error,
  263. readNeedleBody bool,
  264. visitNeedle func(n *Needle, offset int64) error) (err error) {
  265. var v *Volume
  266. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  267. return errors.New("Failed to load volume:" + err.Error())
  268. }
  269. if err = visitSuperBlock(v.SuperBlock); err != nil {
  270. return errors.New("Failed to read super block:" + err.Error())
  271. }
  272. version := v.Version()
  273. offset := int64(SuperBlockSize)
  274. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  275. if e != nil {
  276. err = fmt.Errorf("cannot read needle header: %s", e)
  277. return
  278. }
  279. for n != nil {
  280. if readNeedleBody {
  281. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  282. err = fmt.Errorf("cannot read needle body: %s", err)
  283. return
  284. }
  285. }
  286. if err = visitNeedle(n, offset); err != nil {
  287. return
  288. }
  289. offset += int64(NeedleHeaderSize) + int64(rest)
  290. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  291. if err == io.EOF {
  292. return nil
  293. }
  294. return fmt.Errorf("cannot read needle header: %s", err)
  295. }
  296. }
  297. return
  298. }
  299. func (v *Volume) ContentSize() uint64 {
  300. return v.nm.ContentSize()
  301. }
  302. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  303. exists = true
  304. fi, err := os.Stat(filename)
  305. if os.IsNotExist(err) {
  306. exists = false
  307. return
  308. }
  309. if fi.Mode()&0400 != 0 {
  310. canRead = true
  311. }
  312. if fi.Mode()&0200 != 0 {
  313. canWrite = true
  314. }
  315. modTime = fi.ModTime()
  316. return
  317. }
  318. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  319. var indexFile *os.File
  320. var e error
  321. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  322. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  323. if cdbCanRead && cdbModTime.After(idxModeTime) {
  324. return true
  325. }
  326. if !cdbCanWrite {
  327. return false
  328. }
  329. if !idxCanRead {
  330. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  331. return false
  332. }
  333. glog.V(2).Infoln("opening file", fileName+".idx")
  334. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  335. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  336. return false
  337. }
  338. defer indexFile.Close()
  339. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  340. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  341. glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error())
  342. return false
  343. }
  344. return true
  345. }
  346. // volume is expired if modified time + volume ttl < now
  347. // except when volume is empty
  348. // or when the volume does not have a ttl
  349. // or when volumeSizeLimit is 0 when server just starts
  350. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  351. if volumeSizeLimit == 0 {
  352. //skip if we don't know size limit
  353. return false
  354. }
  355. if v.ContentSize() == 0 {
  356. return false
  357. }
  358. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  359. return false
  360. }
  361. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  362. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  363. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  364. if int64(v.Ttl.Minutes()) < livedMinutes {
  365. return true
  366. }
  367. return false
  368. }
  369. // wait either maxDelayMinutes or 10% of ttl minutes
  370. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  371. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  372. return false
  373. }
  374. removalDelay := v.Ttl.Minutes() / 10
  375. if removalDelay > maxDelayMinutes {
  376. removalDelay = maxDelayMinutes
  377. }
  378. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  379. return true
  380. }
  381. return false
  382. }