You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.5 KiB

12 years ago
12 years ago
12 years ago
12 years ago
9 years ago
10 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
9 years ago
9 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/chrislusf/seaweedfs/go/glog"
  8. "github.com/chrislusf/seaweedfs/go/util"
  9. )
  10. const (
  11. FlagGzip = 0x01
  12. FlagHasName = 0x02
  13. FlagHasMime = 0x04
  14. FlagHasLastModifiedDate = 0x08
  15. FlagHasTtl = 0x10
  16. FlagIsChunkManifest = 0x80
  17. LastModifiedBytesLength = 5
  18. TtlBytesLength = 2
  19. )
  20. func (n *Needle) DiskSize() int64 {
  21. padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize)
  22. return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize
  23. }
  24. func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
  25. if s, ok := w.(io.Seeker); ok {
  26. if end, e := s.Seek(0, 1); e == nil {
  27. defer func(s io.Seeker, off int64) {
  28. if err != nil {
  29. if _, e = s.Seek(off, 0); e != nil {
  30. glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
  31. }
  32. }
  33. }(s, end)
  34. } else {
  35. err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
  36. return
  37. }
  38. }
  39. switch version {
  40. case Version1:
  41. header := make([]byte, NeedleHeaderSize)
  42. util.Uint32toBytes(header[0:4], n.Cookie)
  43. util.Uint64toBytes(header[4:12], n.Id)
  44. n.Size = uint32(len(n.Data))
  45. size = n.Size
  46. util.Uint32toBytes(header[12:16], n.Size)
  47. if _, err = w.Write(header); err != nil {
  48. return
  49. }
  50. if _, err = w.Write(n.Data); err != nil {
  51. return
  52. }
  53. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  54. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  55. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  56. return
  57. case Version2:
  58. header := make([]byte, NeedleHeaderSize)
  59. util.Uint32toBytes(header[0:4], n.Cookie)
  60. util.Uint64toBytes(header[4:12], n.Id)
  61. n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
  62. if n.DataSize > 0 {
  63. n.Size = 4 + n.DataSize + 1
  64. if n.HasName() {
  65. n.Size = n.Size + 1 + uint32(n.NameSize)
  66. }
  67. if n.HasMime() {
  68. n.Size = n.Size + 1 + uint32(n.MimeSize)
  69. }
  70. if n.HasLastModifiedDate() {
  71. n.Size = n.Size + LastModifiedBytesLength
  72. }
  73. if n.HasTtl() {
  74. n.Size = n.Size + TtlBytesLength
  75. }
  76. } else {
  77. n.Size = 0
  78. }
  79. size = n.DataSize
  80. util.Uint32toBytes(header[12:16], n.Size)
  81. if _, err = w.Write(header); err != nil {
  82. return
  83. }
  84. if n.DataSize > 0 {
  85. util.Uint32toBytes(header[0:4], n.DataSize)
  86. if _, err = w.Write(header[0:4]); err != nil {
  87. return
  88. }
  89. if _, err = w.Write(n.Data); err != nil {
  90. return
  91. }
  92. util.Uint8toBytes(header[0:1], n.Flags)
  93. if _, err = w.Write(header[0:1]); err != nil {
  94. return
  95. }
  96. if n.HasName() {
  97. util.Uint8toBytes(header[0:1], n.NameSize)
  98. if _, err = w.Write(header[0:1]); err != nil {
  99. return
  100. }
  101. if _, err = w.Write(n.Name); err != nil {
  102. return
  103. }
  104. }
  105. if n.HasMime() {
  106. util.Uint8toBytes(header[0:1], n.MimeSize)
  107. if _, err = w.Write(header[0:1]); err != nil {
  108. return
  109. }
  110. if _, err = w.Write(n.Mime); err != nil {
  111. return
  112. }
  113. }
  114. if n.HasLastModifiedDate() {
  115. util.Uint64toBytes(header[0:8], n.LastModified)
  116. if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
  117. return
  118. }
  119. }
  120. if n.HasTtl() && n.Ttl != nil {
  121. n.Ttl.ToBytes(header[0:TtlBytesLength])
  122. if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
  123. return
  124. }
  125. }
  126. }
  127. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  128. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  129. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  130. return n.DataSize, err
  131. }
  132. return 0, fmt.Errorf("Unsupported Version! (%d)", version)
  133. }
  134. func ReleaseBytes(b []byte) {
  135. // println("Releasing", len(b))
  136. BYTESPOOL.Put(b)
  137. }
  138. func BorrwoBytes(size int) []byte {
  139. ret := BYTESPOOL.Get(size)
  140. // println("Reading", len(ret))
  141. return ret
  142. }
  143. func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice, rawBytes []byte, err error) {
  144. padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
  145. readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
  146. rawBytes = BorrwoBytes(int(readSize))
  147. dataSlice = rawBytes[0:int(readSize)]
  148. _, err = r.ReadAt(dataSlice, offset)
  149. return
  150. }
  151. func (n *Needle) ReleaseMemory() {
  152. ReleaseBytes(n.rawBytes)
  153. }
  154. func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
  155. bytes, rawBytes, err := ReadNeedleBlob(r, offset, size)
  156. n.rawBytes = rawBytes
  157. if err != nil {
  158. return err
  159. }
  160. n.ParseNeedleHeader(bytes)
  161. if n.Size != size {
  162. return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
  163. }
  164. switch version {
  165. case Version1:
  166. n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
  167. case Version2:
  168. n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
  169. }
  170. checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
  171. newChecksum := NewCRC(n.Data)
  172. if checksum != newChecksum.Value() {
  173. return errors.New("CRC error! Data On Disk Corrupted")
  174. }
  175. n.Checksum = newChecksum
  176. return nil
  177. }
  178. func (n *Needle) ParseNeedleHeader(bytes []byte) {
  179. n.Cookie = util.BytesToUint32(bytes[0:4])
  180. n.Id = util.BytesToUint64(bytes[4:12])
  181. n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
  182. }
  183. func (n *Needle) readNeedleDataVersion2(bytes []byte) {
  184. index, lenBytes := 0, len(bytes)
  185. if index < lenBytes {
  186. n.DataSize = util.BytesToUint32(bytes[index : index+4])
  187. index = index + 4
  188. if int(n.DataSize)+index > lenBytes {
  189. // this if clause is due to bug #87 and #93, fixed in v0.69
  190. // remove this clause later
  191. return
  192. }
  193. n.Data = bytes[index : index+int(n.DataSize)]
  194. index = index + int(n.DataSize)
  195. n.Flags = bytes[index]
  196. index = index + 1
  197. }
  198. if index < lenBytes && n.HasName() {
  199. n.NameSize = uint8(bytes[index])
  200. index = index + 1
  201. n.Name = bytes[index : index+int(n.NameSize)]
  202. index = index + int(n.NameSize)
  203. }
  204. if index < lenBytes && n.HasMime() {
  205. n.MimeSize = uint8(bytes[index])
  206. index = index + 1
  207. n.Mime = bytes[index : index+int(n.MimeSize)]
  208. index = index + int(n.MimeSize)
  209. }
  210. if index < lenBytes && n.HasLastModifiedDate() {
  211. n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
  212. index = index + LastModifiedBytesLength
  213. }
  214. if index < lenBytes && n.HasTtl() {
  215. n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
  216. index = index + TtlBytesLength
  217. }
  218. }
  219. func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
  220. n = new(Needle)
  221. if version == Version1 || version == Version2 {
  222. bytes := make([]byte, NeedleHeaderSize)
  223. var count int
  224. count, err = r.ReadAt(bytes, offset)
  225. if count <= 0 || err != nil {
  226. return nil, 0, err
  227. }
  228. n.ParseNeedleHeader(bytes)
  229. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  230. bodyLength = n.Size + NeedleChecksumSize + padding
  231. }
  232. return
  233. }
  234. //n should be a needle already read the header
  235. //the input stream will read until next file entry
  236. func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
  237. if bodyLength <= 0 {
  238. return nil
  239. }
  240. switch version {
  241. case Version1:
  242. bytes := make([]byte, bodyLength)
  243. if _, err = r.ReadAt(bytes, offset); err != nil {
  244. return
  245. }
  246. n.Data = bytes[:n.Size]
  247. n.Checksum = NewCRC(n.Data)
  248. case Version2:
  249. bytes := make([]byte, bodyLength)
  250. if _, err = r.ReadAt(bytes, offset); err != nil {
  251. return
  252. }
  253. n.readNeedleDataVersion2(bytes[0:n.Size])
  254. n.Checksum = NewCRC(n.Data)
  255. default:
  256. err = fmt.Errorf("Unsupported Version! (%d)", version)
  257. }
  258. return
  259. }
  260. func (n *Needle) IsGzipped() bool {
  261. return n.Flags&FlagGzip > 0
  262. }
  263. func (n *Needle) SetGzipped() {
  264. n.Flags = n.Flags | FlagGzip
  265. }
  266. func (n *Needle) HasName() bool {
  267. return n.Flags&FlagHasName > 0
  268. }
  269. func (n *Needle) SetHasName() {
  270. n.Flags = n.Flags | FlagHasName
  271. }
  272. func (n *Needle) HasMime() bool {
  273. return n.Flags&FlagHasMime > 0
  274. }
  275. func (n *Needle) SetHasMime() {
  276. n.Flags = n.Flags | FlagHasMime
  277. }
  278. func (n *Needle) HasLastModifiedDate() bool {
  279. return n.Flags&FlagHasLastModifiedDate > 0
  280. }
  281. func (n *Needle) SetHasLastModifiedDate() {
  282. n.Flags = n.Flags | FlagHasLastModifiedDate
  283. }
  284. func (n *Needle) HasTtl() bool {
  285. return n.Flags&FlagHasTtl > 0
  286. }
  287. func (n *Needle) SetHasTtl() {
  288. n.Flags = n.Flags | FlagHasTtl
  289. }
  290. func (n *Needle) IsChunkedManifest() bool {
  291. return n.Flags&FlagIsChunkManifest > 0
  292. }
  293. func (n *Needle) SetIsChunkManifest() {
  294. n.Flags = n.Flags | FlagIsChunkManifest
  295. }