You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
9.3 KiB

10 years ago
10 years ago
12 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
  1. package storage
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. )
  13. type Volume struct {
  14. Id VolumeId
  15. dir string
  16. Collection string
  17. dataFile *os.File
  18. nm NeedleMapper
  19. needleMapKind NeedleMapType
  20. readOnly bool
  21. SuperBlock
  22. dataFileAccessLock sync.Mutex
  23. lastModifiedTime uint64 //unix time in seconds
  24. }
  25. func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
  26. v = &Volume{dir: dirname, Collection: collection, Id: id}
  27. v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
  28. v.needleMapKind = needleMapKind
  29. e = v.load(true, true, needleMapKind)
  30. return
  31. }
  32. func (v *Volume) String() string {
  33. return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
  34. }
  35. func (v *Volume) FileName() (fileName string) {
  36. if v.Collection == "" {
  37. fileName = path.Join(v.dir, v.Id.String())
  38. } else {
  39. fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
  40. }
  41. return
  42. }
  43. func (v *Volume) DataFile() *os.File {
  44. return v.dataFile
  45. }
  46. func (v *Volume) Version() Version {
  47. return v.SuperBlock.Version()
  48. }
  49. func (v *Volume) Size() int64 {
  50. stat, e := v.dataFile.Stat()
  51. if e == nil {
  52. return stat.Size()
  53. }
  54. glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
  55. return -1
  56. }
  57. // Close cleanly shuts down this volume
  58. func (v *Volume) Close() {
  59. v.dataFileAccessLock.Lock()
  60. defer v.dataFileAccessLock.Unlock()
  61. v.nm.Close()
  62. _ = v.dataFile.Close()
  63. }
  64. func (v *Volume) NeedToReplicate() bool {
  65. return v.ReplicaPlacement.GetCopyCount() > 1
  66. }
  67. // isFileUnchanged checks whether this needle to write is same as last one.
  68. // It requires serialized access in the same volume.
  69. func (v *Volume) isFileUnchanged(n *Needle) bool {
  70. if v.Ttl.String() != "" {
  71. return false
  72. }
  73. nv, ok := v.nm.Get(n.Id)
  74. if ok && nv.Offset > 0 {
  75. oldNeedle := new(Needle)
  76. err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  77. if err != nil {
  78. glog.V(0).Infof("Failed to check updated file %v", err)
  79. return false
  80. }
  81. defer oldNeedle.ReleaseMemory()
  82. if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
  83. n.DataSize = oldNeedle.DataSize
  84. return true
  85. }
  86. }
  87. return false
  88. }
  89. // Destroy removes everything related to this volume
  90. func (v *Volume) Destroy() (err error) {
  91. if v.readOnly {
  92. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  93. return
  94. }
  95. v.Close()
  96. err = os.Remove(v.dataFile.Name())
  97. if err != nil {
  98. return
  99. }
  100. err = v.nm.Destroy()
  101. return
  102. }
  103. // AppendBlob append a blob to end of the data file, used in replication
  104. func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
  105. if v.readOnly {
  106. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  107. return
  108. }
  109. v.dataFileAccessLock.Lock()
  110. defer v.dataFileAccessLock.Unlock()
  111. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  112. glog.V(0).Infof("failed to seek the end of file: %v", err)
  113. return
  114. }
  115. //ensure file writing starting from aligned positions
  116. if offset%NeedlePaddingSize != 0 {
  117. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  118. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  119. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  120. return
  121. }
  122. }
  123. v.dataFile.Write(b)
  124. return
  125. }
  126. func (v *Volume) write(n *Needle) (size uint32, err error) {
  127. glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
  128. if v.readOnly {
  129. err = fmt.Errorf("%s is read-only", v.dataFile.Name())
  130. return
  131. }
  132. v.dataFileAccessLock.Lock()
  133. defer v.dataFileAccessLock.Unlock()
  134. if v.isFileUnchanged(n) {
  135. size = n.DataSize
  136. glog.V(4).Infof("needle is unchanged!")
  137. return
  138. }
  139. var offset int64
  140. if offset, err = v.dataFile.Seek(0, 2); err != nil {
  141. glog.V(0).Infof("failed to seek the end of file: %v", err)
  142. return
  143. }
  144. //ensure file writing starting from aligned positions
  145. if offset%NeedlePaddingSize != 0 {
  146. offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
  147. if offset, err = v.dataFile.Seek(offset, 0); err != nil {
  148. glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
  149. return
  150. }
  151. }
  152. if size, err = n.Append(v.dataFile, v.Version()); err != nil {
  153. if e := v.dataFile.Truncate(offset); e != nil {
  154. err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
  155. }
  156. return
  157. }
  158. nv, ok := v.nm.Get(n.Id)
  159. if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
  160. if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
  161. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  162. }
  163. }
  164. if v.lastModifiedTime < n.LastModified {
  165. v.lastModifiedTime = n.LastModified
  166. }
  167. return
  168. }
  169. func (v *Volume) delete(n *Needle) (uint32, error) {
  170. glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String())
  171. if v.readOnly {
  172. return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
  173. }
  174. v.dataFileAccessLock.Lock()
  175. defer v.dataFileAccessLock.Unlock()
  176. nv, ok := v.nm.Get(n.Id)
  177. //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
  178. if ok {
  179. size := nv.Size
  180. if err := v.nm.Delete(n.Id); err != nil {
  181. return size, err
  182. }
  183. if _, err := v.dataFile.Seek(0, 2); err != nil {
  184. return size, err
  185. }
  186. n.Data = nil
  187. _, err := n.Append(v.dataFile, v.Version())
  188. return size, err
  189. }
  190. return 0, nil
  191. }
  192. // read fills in Needle content by looking up n.Id from NeedleMapper
  193. func (v *Volume) readNeedle(n *Needle) (int, error) {
  194. nv, ok := v.nm.Get(n.Id)
  195. if !ok || nv.Offset == 0 {
  196. return -1, errors.New("Not Found")
  197. }
  198. err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
  199. if err != nil {
  200. return 0, err
  201. }
  202. bytesRead := len(n.Data)
  203. if !n.HasTtl() {
  204. return bytesRead, nil
  205. }
  206. ttlMinutes := n.Ttl.Minutes()
  207. if ttlMinutes == 0 {
  208. return bytesRead, nil
  209. }
  210. if !n.HasLastModifiedDate() {
  211. return bytesRead, nil
  212. }
  213. if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
  214. return bytesRead, nil
  215. }
  216. n.ReleaseMemory()
  217. return -1, errors.New("Not Found")
  218. }
  219. func ScanVolumeFile(dirname string, collection string, id VolumeId,
  220. needleMapKind NeedleMapType,
  221. visitSuperBlock func(SuperBlock) error,
  222. readNeedleBody bool,
  223. visitNeedle func(n *Needle, offset int64) error) (err error) {
  224. var v *Volume
  225. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  226. return fmt.Errorf("Failed to load volume %d: %v", id, err)
  227. }
  228. if err = visitSuperBlock(v.SuperBlock); err != nil {
  229. return fmt.Errorf("Failed to process volume %d super block: %v", id, err)
  230. }
  231. version := v.Version()
  232. offset := int64(SuperBlockSize)
  233. n, rest, e := ReadNeedleHeader(v.dataFile, version, offset)
  234. if e != nil {
  235. err = fmt.Errorf("cannot read needle header: %v", e)
  236. return
  237. }
  238. for n != nil {
  239. if readNeedleBody {
  240. if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
  241. glog.V(0).Infof("cannot read needle body: %v", err)
  242. //err = fmt.Errorf("cannot read needle body: %v", err)
  243. //return
  244. }
  245. if n.DataSize >= n.Size {
  246. // this should come from a bug reported on #87 and #93
  247. // fixed in v0.69
  248. // remove this whole "if" clause later, long after 0.69
  249. oldRest, oldSize := rest, n.Size
  250. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  251. n.Size = 0
  252. rest = n.Size + NeedleChecksumSize + padding
  253. if rest%NeedlePaddingSize != 0 {
  254. rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
  255. }
  256. glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
  257. }
  258. }
  259. if err = visitNeedle(n, offset); err != nil {
  260. glog.V(0).Infof("visit needle error: %v", err)
  261. }
  262. offset += int64(NeedleHeaderSize) + int64(rest)
  263. glog.V(4).Infof("==> new entry offset %d", offset)
  264. if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
  265. if err == io.EOF {
  266. return nil
  267. }
  268. return fmt.Errorf("cannot read needle header: %v", err)
  269. }
  270. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  271. }
  272. return
  273. }
  274. func (v *Volume) ContentSize() uint64 {
  275. return v.nm.ContentSize()
  276. }
  277. // volume is expired if modified time + volume ttl < now
  278. // except when volume is empty
  279. // or when the volume does not have a ttl
  280. // or when volumeSizeLimit is 0 when server just starts
  281. func (v *Volume) expired(volumeSizeLimit uint64) bool {
  282. if volumeSizeLimit == 0 {
  283. //skip if we don't know size limit
  284. return false
  285. }
  286. if v.ContentSize() == 0 {
  287. return false
  288. }
  289. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  290. return false
  291. }
  292. glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
  293. livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
  294. glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
  295. if int64(v.Ttl.Minutes()) < livedMinutes {
  296. return true
  297. }
  298. return false
  299. }
  300. // wait either maxDelayMinutes or 10% of ttl minutes
  301. func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
  302. if v.Ttl == nil || v.Ttl.Minutes() == 0 {
  303. return false
  304. }
  305. removalDelay := v.Ttl.Minutes() / 10
  306. if removalDelay > maxDelayMinutes {
  307. removalDelay = maxDelayMinutes
  308. }
  309. if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
  310. return true
  311. }
  312. return false
  313. }