You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

382 lines
11 KiB

12 years ago
12 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/weed-fs/go/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. readOnly bool
  20. SuperBlock
  21. accessLock sync.Mutex
  22. lastModifiedTime uint64 //unix time in seconds
  23. }
  24. func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  25. v = &Volume{dir: dirname, Collection: collection, Id: id}
  26. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  27. e = v.load(true, true, useLevelDb)
  28. return
  29. }
  30. func (v *Volume) String() string {
  31. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  32. }
  33. func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
  34. v = &Volume{dir: dirname, Collection: collection, Id: id}
  35. v.SuperBlock = SuperBlock{}
  36. e = v.load(false, false, false)
  37. return
  38. }
  39. func (v *Volume) FileName() (fileName string) {
  40. if v.Collection == "" {
  41. fileName = path.Join(v.dir, v.Id.String())
  42. } else {
  43. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  44. }
  45. return
  46. }
  47. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
  48. var e error
  49. fileName := v.FileName()
  50. if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
  51. if !canRead {
  52. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  53. }
  54. if canWrite {
  55. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  56. v.lastModifiedTime = uint64(modifiedTime.Unix())
  57. } else {
  58. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  59. v.dataFile, e = os.Open(fileName + ".dat")
  60. v.readOnly = true
  61. }
  62. } else {
  63. if createDatIfMissing {
  64. v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  65. } else {
  66. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  67. }
  68. }
  69. if e != nil {
  70. if !os.IsPermission(e) {
  71. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  72. }
  73. }
  74. if v.ReplicaPlacement == nil {
  75. e = v.readSuperBlock()
  76. } else {
  77. e = v.maybeWriteSuperBlock()
  78. }
  79. if e == nil && alsoLoadIndex {
  80. var indexFile *os.File
  81. if v.readOnly {
  82. glog.V(1).Infoln("open to read file", fileName+".idx")
  83. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  84. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  85. }
  86. } else {
  87. glog.V(1).Infoln("open to write file", fileName+".idx")
  88. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  89. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  90. }
  91. }
  92. if !useLevelDb {
  93. glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
  94. if v.nm, e = LoadNeedleMap(indexFile); e != nil {
  95. glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
  96. }
  97. } else {
  98. glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
  99. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
  100. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  101. }
  102. }
  103. }
  104. return e
  105. }
  106. func (v *Volume) Version() Version {
  107. return v.SuperBlock.Version()
  108. }
  109. func (v *Volume) Size() int64 {
  110. stat, e := v.dataFile.Stat()
  111. if e == nil {
  112. return stat.Size()
  113. }
  114. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  115. return -1
  116. }
  117. // Close cleanly shuts down this volume
  118. func (v *Volume) Close() {
  119. v.accessLock.Lock()
  120. defer v.accessLock.Unlock()
  121. v.nm.Close()
  122. _ = v.dataFile.Close()
  123. }
  124. func (v *Volume) NeedToReplicate() bool {
  125. return v.ReplicaPlacement.GetCopyCount() > 1
  126. }
  127. // isFileUnchanged checks whether this needle to write is same as last one.
  128. // It requires serialized access in the same volume.
  129. func (v *Volume) isFileUnchanged(n *Needle) bool {
  130. nv, ok := v.nm.Get(n.Id)
  131. if ok && nv.Offset > 0 {
  132. oldNeedle := new(Needle)
  133. oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  134. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  135. n.DataSize = oldNeedle.DataSize
  136. return true
  137. }
  138. }
  139. return false
  140. }
  141. // Destroy removes everything related to this volume
  142. func (v *Volume) Destroy() (err error) {
  143. if v.readOnly {
  144. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  145. return
  146. }
  147. v.Close()
  148. err = os.Remove(v.dataFile.Name())
  149. if err != nil {
  150. return
  151. }
  152. err = v.nm.Destroy()
  153. return
  154. }
  155. func (v *Volume) write(n *Needle) (size uint32, err error) {
  156. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  157. if v.readOnly {
  158. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  159. return
  160. }
  161. v.accessLock.Lock()
  162. defer v.accessLock.Unlock()
  163. if v.isFileUnchanged(n) {
  164. size = n.DataSize
  165. glog.V(4).Infof("needle is unchanged!")
  166. return
  167. }
  168. var offset int64
  169. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  170. glog.V(0).Infof("failed to seek the end of file: %v", err)
  171. return
  172. }
  173. //ensure file writing starting from aligned positions
  174. if offset%NeedlePaddingSize != 0 {
  175. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  176. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  177. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  178. return
  179. }
  180. }
  181. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  182. if e := v.dataFile.Truncate(offset); e != nil {
  183. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  184. }
  185. return
  186. }
  187. nv, ok := v.nm.Get(n.Id)
  188. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  189. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  190. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  191. }
  192. }
  193. if v.lastModifiedTime < n.LastModified {
  194. v.lastModifiedTime = n.LastModified
  195. }
  196. return
  197. }
  198. func (v *Volume) delete(n *Needle) (uint32, error) {
  199. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  200. if v.readOnly {
  201. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  202. }
  203. v.accessLock.Lock()
  204. defer v.accessLock.Unlock()
  205. nv, ok := v.nm.Get(n.Id)
  206. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  207. if ok {
  208. size := nv.Size
  209. if err := v.nm.Delete(n.Id); err != nil {
  210. return size, err
  211. }
  212. if _, err := v.dataFile.Seek(0, 2); err != nil {
  213. return size, err
  214. }
  215. n.Data = nil
  216. _, err := n.Append(v.dataFile, v.Version())
  217. return size, err
  218. }
  219. return 0, nil
  220. }
  221. func (v *Volume) read(n *Needle) (int, error) {
  222. nv, ok := v.nm.Get(n.Id)
  223. if !ok || nv.Offset == 0 {
  224. return -1, errors.New("Not Found")
  225. }
  226. bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  227. if err != nil {
  228. return bytesRead, err
  229. }
  230. if !n.HasTtl() {
  231. return bytesRead, err
  232. }
  233. ttlMinutes := n.Ttl.Minutes()
  234. if ttlMinutes == 0 {
  235. return bytesRead, nil
  236. }
  237. if !n.HasLastModifiedDate() {
  238. return bytesRead, nil
  239. }
  240. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  241. return bytesRead, nil
  242. }
  243. return -1, errors.New("Not Found")
  244. }
  245. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  246. visitSuperBlock func(SuperBlock) error,
  247. readNeedleBody bool,
  248. visitNeedle func(n *Needle, offset int64) error) (err error) {
  249. var v *Volume
  250. if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
  251. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  252. }
  253. if err = visitSuperBlock(v.SuperBlock); err != nil {
  254. return fmt.Errorf("Failed to read volume %d super block: %v", id, err)
  255. }
  256. version := v.Version()
  257. offset := int64(SuperBlockSize)
  258. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  259. if e != nil {
  260. err = fmt.Errorf("cannot read needle header: %v", e)
  261. return
  262. }
  263. for n != nil {
  264. if readNeedleBody {
  265. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  266. glog.V(0).Infof("cannot read needle body: %v", err)
  267. //err = fmt.Errorf("cannot read needle body: %v", err)
  268. //return
  269. }
  270. if n.DataSize >= n.Size {
  271. // this should come from a bug reported on #87 and #93
  272. // fixed in v0.69
  273. // remove this whole "if" clause later, long after 0.69
  274. oldRest, oldSize := rest, n.Size
  275. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  276. n.Size = 0
  277. rest = n.Size + NeedleChecksumSize + padding
  278. if rest%NeedlePaddingSize != 0 {
  279. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  280. }
  281. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  282. }
  283. }
  284. if err = visitNeedle(n, offset); err != nil {
  285. glog.V(0).Infof("visit needle error: %v", err)
  286. }
  287. offset += int64(NeedleHeaderSize) + int64(rest)
  288. glog.V(4).Infof("==> new entry offset %d", offset)
  289. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  290. if err == io.EOF {
  291. return nil
  292. }
  293. return fmt.Errorf("cannot read needle header: %v", err)
  294. }
  295. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  296. }
  297. return
  298. }
  299. func (v *Volume) ContentSize() uint64 {
  300. return v.nm.ContentSize()
  301. }
  302. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) {
  303. exists = true
  304. fi, err := os.Stat(filename)
  305. if os.IsNotExist(err) {
  306. exists = false
  307. return
  308. }
  309. if fi.Mode()&0400 != 0 {
  310. canRead = true
  311. }
  312. if fi.Mode()&0200 != 0 {
  313. canWrite = true
  314. }
  315. modTime = fi.ModTime()
  316. return
  317. }
  318. // volume is expired if modified time + volume ttl < now
  319. // except when volume is empty
  320. // or when the volume does not have a ttl
  321. // or when volumeSizeLimit is 0 when server just starts
  322. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  323. if volumeSizeLimit == 0 {
  324. //skip if we don't know size limit
  325. return false
  326. }
  327. if v.ContentSize() == 0 {
  328. return false
  329. }
  330. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  331. return false
  332. }
  333. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  334. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  335. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  336. if int64(v.Ttl.Minutes()) < livedMinutes {
  337. return true
  338. }
  339. return false
  340. }
  341. // wait either maxDelayMinutes or 10% of ttl minutes
  342. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  343. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  344. return false
  345. }
  346. removalDelay := v.Ttl.Minutes() / 10
  347. if removalDelay > maxDelayMinutes {
  348. removalDelay = maxDelayMinutes
  349. }
  350. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  351. return true
  352. }
  353. return false
  354. }