You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

327 lines
8.9 KiB

12 years ago
12 years ago
12 years ago
12 years ago
9 years ago
10 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
9 years ago
9 years ago
  1. package storage
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. const (
  11. FlagGzip = 0x01
  12. FlagHasName = 0x02
  13. FlagHasMime = 0x04
  14. FlagHasLastModifiedDate = 0x08
  15. FlagHasTtl = 0x10
  16. FlagHasPairs = 0x20
  17. FlagIsChunkManifest = 0x80
  18. LastModifiedBytesLength = 5
  19. TtlBytesLength = 2
  20. )
  21. func (n *Needle) DiskSize() int64 {
  22. return getActualSize(n.Size)
  23. }
  24. func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) {
  25. if s, ok := w.(io.Seeker); ok {
  26. if end, e := s.Seek(0, 1); e == nil {
  27. defer func(s io.Seeker, off int64) {
  28. if err != nil {
  29. if _, e = s.Seek(off, 0); e != nil {
  30. glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
  31. }
  32. }
  33. }(s, end)
  34. } else {
  35. err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
  36. return
  37. }
  38. }
  39. switch version {
  40. case Version1:
  41. header := make([]byte, NeedleHeaderSize)
  42. util.Uint32toBytes(header[0:4], n.Cookie)
  43. util.Uint64toBytes(header[4:12], n.Id)
  44. n.Size = uint32(len(n.Data))
  45. size = n.Size
  46. util.Uint32toBytes(header[12:16], n.Size)
  47. if _, err = w.Write(header); err != nil {
  48. return
  49. }
  50. if _, err = w.Write(n.Data); err != nil {
  51. return
  52. }
  53. actualSize = NeedleHeaderSize + int64(n.Size)
  54. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  55. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  56. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  57. return
  58. case Version2:
  59. header := make([]byte, NeedleHeaderSize)
  60. util.Uint32toBytes(header[0:4], n.Cookie)
  61. util.Uint64toBytes(header[4:12], n.Id)
  62. n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
  63. if n.DataSize > 0 {
  64. n.Size = 4 + n.DataSize + 1
  65. if n.HasName() {
  66. n.Size = n.Size + 1 + uint32(n.NameSize)
  67. }
  68. if n.HasMime() {
  69. n.Size = n.Size + 1 + uint32(n.MimeSize)
  70. }
  71. if n.HasLastModifiedDate() {
  72. n.Size = n.Size + LastModifiedBytesLength
  73. }
  74. if n.HasTtl() {
  75. n.Size = n.Size + TtlBytesLength
  76. }
  77. if n.HasPairs() {
  78. n.Size += 2 + uint32(n.PairsSize)
  79. }
  80. } else {
  81. n.Size = 0
  82. }
  83. size = n.DataSize
  84. util.Uint32toBytes(header[12:16], n.Size)
  85. if _, err = w.Write(header); err != nil {
  86. return
  87. }
  88. if n.DataSize > 0 {
  89. util.Uint32toBytes(header[0:4], n.DataSize)
  90. if _, err = w.Write(header[0:4]); err != nil {
  91. return
  92. }
  93. if _, err = w.Write(n.Data); err != nil {
  94. return
  95. }
  96. util.Uint8toBytes(header[0:1], n.Flags)
  97. if _, err = w.Write(header[0:1]); err != nil {
  98. return
  99. }
  100. if n.HasName() {
  101. util.Uint8toBytes(header[0:1], n.NameSize)
  102. if _, err = w.Write(header[0:1]); err != nil {
  103. return
  104. }
  105. if _, err = w.Write(n.Name); err != nil {
  106. return
  107. }
  108. }
  109. if n.HasMime() {
  110. util.Uint8toBytes(header[0:1], n.MimeSize)
  111. if _, err = w.Write(header[0:1]); err != nil {
  112. return
  113. }
  114. if _, err = w.Write(n.Mime); err != nil {
  115. return
  116. }
  117. }
  118. if n.HasLastModifiedDate() {
  119. util.Uint64toBytes(header[0:8], n.LastModified)
  120. if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
  121. return
  122. }
  123. }
  124. if n.HasTtl() && n.Ttl != nil {
  125. n.Ttl.ToBytes(header[0:TtlBytesLength])
  126. if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
  127. return
  128. }
  129. }
  130. if n.HasPairs() {
  131. util.Uint16toBytes(header[0:2], n.PairsSize)
  132. if _, err = w.Write(header[0:2]); err != nil {
  133. return
  134. }
  135. if _, err = w.Write(n.Pairs); err != nil {
  136. return
  137. }
  138. }
  139. }
  140. padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
  141. util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
  142. _, err = w.Write(header[0 : NeedleChecksumSize+padding])
  143. actualSize = NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize + int64(padding)
  144. return n.DataSize, actualSize, err
  145. }
  146. return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
  147. }
  148. func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
  149. NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize
  150. padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize)
  151. readSize := NeedleWithoutPaddingSize + padding
  152. return getBytesForFileBlock(r, offset, int(readSize))
  153. }
  154. func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
  155. bytes, block, err := ReadNeedleBlob(r, offset, size)
  156. if err != nil {
  157. return err
  158. }
  159. n.rawBlock = block
  160. n.ParseNeedleHeader(bytes)
  161. if n.Size != size {
  162. return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
  163. }
  164. switch version {
  165. case Version1:
  166. n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
  167. case Version2:
  168. n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
  169. }
  170. if size == 0 {
  171. return nil
  172. }
  173. checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
  174. newChecksum := NewCRC(n.Data)
  175. if checksum != newChecksum.Value() {
  176. return errors.New("CRC error! Data On Disk Corrupted")
  177. }
  178. n.Checksum = newChecksum
  179. return nil
  180. }
  181. func (n *Needle) ParseNeedleHeader(bytes []byte) {
  182. n.Cookie = util.BytesToUint32(bytes[0:4])
  183. n.Id = util.BytesToUint64(bytes[4:12])
  184. n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
  185. }
  186. func (n *Needle) readNeedleDataVersion2(bytes []byte) {
  187. index, lenBytes := 0, len(bytes)
  188. if index < lenBytes {
  189. n.DataSize = util.BytesToUint32(bytes[index : index+4])
  190. index = index + 4
  191. if int(n.DataSize)+index > lenBytes {
  192. // this if clause is due to bug #87 and #93, fixed in v0.69
  193. // remove this clause later
  194. return
  195. }
  196. n.Data = bytes[index : index+int(n.DataSize)]
  197. index = index + int(n.DataSize)
  198. n.Flags = bytes[index]
  199. index = index + 1
  200. }
  201. if index < lenBytes && n.HasName() {
  202. n.NameSize = uint8(bytes[index])
  203. index = index + 1
  204. n.Name = bytes[index : index+int(n.NameSize)]
  205. index = index + int(n.NameSize)
  206. }
  207. if index < lenBytes && n.HasMime() {
  208. n.MimeSize = uint8(bytes[index])
  209. index = index + 1
  210. n.Mime = bytes[index : index+int(n.MimeSize)]
  211. index = index + int(n.MimeSize)
  212. }
  213. if index < lenBytes && n.HasLastModifiedDate() {
  214. n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
  215. index = index + LastModifiedBytesLength
  216. }
  217. if index < lenBytes && n.HasTtl() {
  218. n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
  219. index = index + TtlBytesLength
  220. }
  221. if index < lenBytes && n.HasPairs() {
  222. n.PairsSize = util.BytesToUint16(bytes[index : index+2])
  223. index += 2
  224. end := index + int(n.PairsSize)
  225. n.Pairs = bytes[index:end]
  226. index = end
  227. }
  228. }
  229. func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
  230. n = new(Needle)
  231. if version == Version1 || version == Version2 {
  232. bytes := make([]byte, NeedleHeaderSize)
  233. var count int
  234. count, err = r.ReadAt(bytes, offset)
  235. if count <= 0 || err != nil {
  236. return nil, 0, err
  237. }
  238. n.ParseNeedleHeader(bytes)
  239. padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
  240. bodyLength = n.Size + NeedleChecksumSize + padding
  241. }
  242. return
  243. }
  244. //n should be a needle already read the header
  245. //the input stream will read until next file entry
  246. func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
  247. if bodyLength <= 0 {
  248. return nil
  249. }
  250. switch version {
  251. case Version1:
  252. bytes := make([]byte, bodyLength)
  253. if _, err = r.ReadAt(bytes, offset); err != nil {
  254. return
  255. }
  256. n.Data = bytes[:n.Size]
  257. n.Checksum = NewCRC(n.Data)
  258. case Version2:
  259. bytes := make([]byte, bodyLength)
  260. if _, err = r.ReadAt(bytes, offset); err != nil {
  261. return
  262. }
  263. n.readNeedleDataVersion2(bytes[0:n.Size])
  264. n.Checksum = NewCRC(n.Data)
  265. default:
  266. err = fmt.Errorf("Unsupported Version! (%d)", version)
  267. }
  268. return
  269. }
  270. func (n *Needle) IsGzipped() bool {
  271. return n.Flags&FlagGzip > 0
  272. }
  273. func (n *Needle) SetGzipped() {
  274. n.Flags = n.Flags | FlagGzip
  275. }
  276. func (n *Needle) HasName() bool {
  277. return n.Flags&FlagHasName > 0
  278. }
  279. func (n *Needle) SetHasName() {
  280. n.Flags = n.Flags | FlagHasName
  281. }
  282. func (n *Needle) HasMime() bool {
  283. return n.Flags&FlagHasMime > 0
  284. }
  285. func (n *Needle) SetHasMime() {
  286. n.Flags = n.Flags | FlagHasMime
  287. }
  288. func (n *Needle) HasLastModifiedDate() bool {
  289. return n.Flags&FlagHasLastModifiedDate > 0
  290. }
  291. func (n *Needle) SetHasLastModifiedDate() {
  292. n.Flags = n.Flags | FlagHasLastModifiedDate
  293. }
  294. func (n *Needle) HasTtl() bool {
  295. return n.Flags&FlagHasTtl > 0
  296. }
  297. func (n *Needle) SetHasTtl() {
  298. n.Flags = n.Flags | FlagHasTtl
  299. }
  300. func (n *Needle) IsChunkedManifest() bool {
  301. return n.Flags&FlagIsChunkManifest > 0
  302. }
  303. func (n *Needle) SetIsChunkManifest() {
  304. n.Flags = n.Flags | FlagIsChunkManifest
  305. }
  306. func (n *Needle) HasPairs() bool {
  307. return n.Flags&FlagHasPairs != 0
  308. }
  309. func (n *Needle) SetHasPairs() {
  310. n.Flags = n.Flags | FlagHasPairs
  311. }