You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

413 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. lastModifiedTime uint64 //unix time in seconds
  23. }
  24. func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  25. v = &Volume{dir: dirname, Collection: collection, Id: id}
  26. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  27. e = v.load(true, true)
  28. return
  29. }
  30. func (v *Volume) String() string {
  31. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  32. }
  33. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  34. v = &Volume{dir: dirname, Collection: collection, Id: id}
  35. v.SuperBlock = SuperBlock{}
  36. e = v.load(false, false)
  37. return
  38. }
  39. func (v *Volume) FileName() (fileName string) {
  40. if v.Collection == "" {
  41. fileName = path.Join(v.dir, v.Id.String())
  42. } else {
  43. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  44. }
  45. return
  46. }
  47. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
  48. var e error
  49. fileName := v.FileName()
  50. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  51. if !canRead {
  52. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  53. }
  54. if canWrite {
  55. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  56. v.lastModifiedTime = uint64(modifiedTime.Unix())
  57. } else {
  58. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  59. v.dataFile, e = os.Open(fileName + ".dat")
  60. v.readOnly = true
  61. }
  62. } else {
  63. if createDatIfMissing {
  64. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  65. } else {
  66. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  67. }
  68. }
  69. if e != nil {
  70. if !os.IsPermission(e) {
  71. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  72. }
  73. }
  74. if v.ReplicaPlacement == nil {
  75. e = v.readSuperBlock()
  76. } else {
  77. e = v.maybeWriteSuperBlock()
  78. }
  79. if e == nil && alsoLoadIndex {
  80. if v.readOnly {
  81. if v.ensureConvertIdxToCdb(fileName) {
  82. v.nm, e = OpenCdbMap(fileName + ".cdb")
  83. return e
  84. }
  85. }
  86. var indexFile *os.File
  87. if v.readOnly {
  88. glog.V(1).Infoln("open to read file", fileName+".idx")
  89. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  90. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  91. }
  92. } else {
  93. glog.V(1).Infoln("open to write file", fileName+".idx")
  94. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  95. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  96. }
  97. }
  98. glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
  99. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  100. glog.V(0).Infoln("loading error:", e)
  101. }
  102. }
  103. return e
  104. }
  105. func (v *Volume) Version() Version {
  106. return v.SuperBlock.Version()
  107. }
  108. func (v *Volume) Size() int64 {
  109. stat, e := v.dataFile.Stat()
  110. if e == nil {
  111. return stat.Size()
  112. }
  113. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  114. return -1
  115. }
  116. func (v *Volume) Close() {
  117. v.accessLock.Lock()
  118. defer v.accessLock.Unlock()
  119. v.nm.Close()
  120. _ = v.dataFile.Close()
  121. }
  122. func (v *Volume) NeedToReplicate() bool {
  123. return v.ReplicaPlacement.GetCopyCount() > 1
  124. }
  125. func (v *Volume) isFileUnchanged(n *Needle) bool {
  126. nv, ok := v.nm.Get(n.Id)
  127. if ok && nv.Offset > 0 {
  128. oldNeedle := new(Needle)
  129. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  130. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  131. n.DataSize = oldNeedle.DataSize
  132. return true
  133. }
  134. }
  135. return false
  136. }
  137. func (v *Volume) Destroy() (err error) {
  138. if v.readOnly {
  139. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  140. return
  141. }
  142. v.Close()
  143. err = os.Remove(v.dataFile.Name())
  144. if err != nil {
  145. return
  146. }
  147. err = v.nm.Destroy()
  148. return
  149. }
  150. func (v *Volume) write(n *Needle) (size uint32, err error) {
  151. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  152. if v.readOnly {
  153. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  154. return
  155. }
  156. v.accessLock.Lock()
  157. defer v.accessLock.Unlock()
  158. if v.isFileUnchanged(n) {
  159. size = n.DataSize
  160. glog.V(4).Infof("needle is unchanged!")
  161. return
  162. }
  163. var offset int64
  164. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  165. glog.V(0).Infof("faile to seek the end of file: %v", err)
  166. return
  167. }
  168. //ensure file writing starting from aligned positions
  169. if offset%NeedlePaddingSize != 0 {
  170. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  171. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  172. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  173. return
  174. }
  175. }
  176. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  177. if e := v.dataFile.Truncate(offset); e != nil {
  178. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  179. }
  180. return
  181. }
  182. nv, ok := v.nm.Get(n.Id)
  183. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  184. if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  185. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  186. }
  187. }
  188. if v.lastModifiedTime < n.LastModified {
  189. v.lastModifiedTime = n.LastModified
  190. }
  191. return
  192. }
  193. func (v *Volume) delete(n *Needle) (uint32, error) {
  194. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  195. if v.readOnly {
  196. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  197. }
  198. v.accessLock.Lock()
  199. defer v.accessLock.Unlock()
  200. nv, ok := v.nm.Get(n.Id)
  201. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  202. if ok {
  203. size := nv.Size
  204. if err := v.nm.Delete(n.Id); err != nil {
  205. return size, err
  206. }
  207. if _, err := v.dataFile.Seek(0, 2); err != nil {
  208. return size, err
  209. }
  210. n.Data = make([]byte, 0)
  211. _, err := n.Append(v.dataFile, v.Version())
  212. return size, err
  213. }
  214. return 0, nil
  215. }
  216. func (v *Volume) read(n *Needle) (int, error) {
  217. nv, ok := v.nm.Get(n.Id)
  218. if !ok || nv.Offset == 0 {
  219. return -1, errors.New("Not Found")
  220. }
  221. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  222. if err != nil {
  223. return bytesRead, err
  224. }
  225. if !n.HasTtl() {
  226. return bytesRead, err
  227. }
  228. ttlMinutes := n.Ttl.Minutes()
  229. if ttlMinutes == 0 {
  230. return bytesRead, nil
  231. }
  232. if !n.HasLastModifiedDate() {
  233. return bytesRead, nil
  234. }
  235. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  236. return bytesRead, nil
  237. }
  238. return -1, errors.New("Not Found")
  239. }
  240. func (v *Volume) freeze() error {
  241. if v.readOnly {
  242. return nil
  243. }
  244. nm, ok := v.nm.(*NeedleMap)
  245. if !ok {
  246. return nil
  247. }
  248. v.accessLock.Lock()
  249. defer v.accessLock.Unlock()
  250. bn, _ := baseFilename(v.dataFile.Name())
  251. cdbFn := bn + ".cdb"
  252. glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
  253. err := DumpNeedleMapToCdb(cdbFn, nm)
  254. if err != nil {
  255. return err
  256. }
  257. if v.nm, err = OpenCdbMap(cdbFn); err != nil {
  258. return err
  259. }
  260. nm.indexFile.Close()
  261. os.Remove(nm.indexFile.Name())
  262. v.readOnly = true
  263. return nil
  264. }
  265. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  266. visitSuperBlock func(SuperBlock) error,
  267. readNeedleBody bool,
  268. visitNeedle func(n *Needle, offset int64) error) (err error) {
  269. var v *Volume
  270. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  271. return errors.New("Failed to load volume:" + err.Error())
  272. }
  273. if err = visitSuperBlock(v.SuperBlock); err != nil {
  274. return errors.New("Failed to read super block:" + err.Error())
  275. }
  276. version := v.Version()
  277. offset := int64(SuperBlockSize)
  278. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  279. if e != nil {
  280. err = fmt.Errorf("cannot read needle header: %v", e)
  281. return
  282. }
  283. for n != nil {
  284. if readNeedleBody {
  285. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  286. err = fmt.Errorf("cannot read needle body: %v", err)
  287. return
  288. }
  289. }
  290. if err = visitNeedle(n, offset); err != nil {
  291. return
  292. }
  293. offset += int64(NeedleHeaderSize) + int64(rest)
  294. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  295. if err == io.EOF {
  296. return nil
  297. }
  298. return fmt.Errorf("cannot read needle header: %v", err)
  299. }
  300. }
  301. return
  302. }
  303. func (v *Volume) ContentSize() uint64 {
  304. return v.nm.ContentSize()
  305. }
  306. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  307. exists = true
  308. fi, err := os.Stat(filename)
  309. if os.IsNotExist(err) {
  310. exists = false
  311. return
  312. }
  313. if fi.Mode()&0400 != 0 {
  314. canRead = true
  315. }
  316. if fi.Mode()&0200 != 0 {
  317. canWrite = true
  318. }
  319. modTime = fi.ModTime()
  320. return
  321. }
  322. func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
  323. var indexFile *os.File
  324. var e error
  325. _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
  326. _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
  327. if cdbCanRead && cdbModTime.After(idxModeTime) {
  328. return true
  329. }
  330. if !cdbCanWrite {
  331. return false
  332. }
  333. if !idxCanRead {
  334. glog.V(0).Infoln("Can not read file", fileName+".idx!")
  335. return false
  336. }
  337. glog.V(2).Infoln("opening file", fileName+".idx")
  338. if indexFile, e = os.Open(fileName + ".idx"); e != nil {
  339. glog.V(0).Infoln("Failed to read file", fileName+".idx !")
  340. return false
  341. }
  342. defer indexFile.Close()
  343. glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
  344. if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
  345. glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
  346. return false
  347. }
  348. return true
  349. }
  350. // volume is expired if modified time + volume ttl < now
  351. // except when volume is empty
  352. // or when the volume does not have a ttl
  353. // or when volumeSizeLimit is 0 when server just starts
  354. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  355. if volumeSizeLimit == 0 {
  356. //skip if we don't know size limit
  357. return false
  358. }
  359. if v.ContentSize() == 0 {
  360. return false
  361. }
  362. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  363. return false
  364. }
  365. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  366. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  367. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  368. if int64(v.Ttl.Minutes()) < livedMinutes {
  369. return true
  370. }
  371. return false
  372. }
  373. // wait either maxDelayMinutes or 10% of ttl minutes
  374. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  375. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  376. return false
  377. }
  378. removalDelay := v.Ttl.Minutes() / 10
  379. if removalDelay > maxDelayMinutes {
  380. removalDelay = maxDelayMinutes
  381. }
  382. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  383. return true
  384. }
  385. return false
  386. }