You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

394 lines
8.7 KiB

9 years ago
10 years ago
5 years ago
5 years ago
4 years ago
  1. package util
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "strings"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. )
  15. var (
  16. client *http.Client
  17. Transport *http.Transport
  18. )
  19. func init() {
  20. Transport = &http.Transport{
  21. MaxIdleConnsPerHost: 1024,
  22. }
  23. client = &http.Client{
  24. Transport: Transport,
  25. }
  26. }
  27. func PostBytes(url string, body []byte) ([]byte, error) {
  28. r, err := client.Post(url, "", bytes.NewReader(body))
  29. if err != nil {
  30. return nil, fmt.Errorf("Post to %s: %v", url, err)
  31. }
  32. defer r.Body.Close()
  33. b, err := ioutil.ReadAll(r.Body)
  34. if err != nil {
  35. return nil, fmt.Errorf("Read response body: %v", err)
  36. }
  37. if r.StatusCode >= 400 {
  38. return nil, fmt.Errorf("%s: %s", url, r.Status)
  39. }
  40. return b, nil
  41. }
  42. func Post(url string, values url.Values) ([]byte, error) {
  43. r, err := client.PostForm(url, values)
  44. if err != nil {
  45. return nil, err
  46. }
  47. defer r.Body.Close()
  48. b, err := ioutil.ReadAll(r.Body)
  49. if r.StatusCode >= 400 {
  50. if err != nil {
  51. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  52. } else {
  53. return nil, fmt.Errorf("%s: %s", url, r.Status)
  54. }
  55. }
  56. if err != nil {
  57. return nil, err
  58. }
  59. return b, nil
  60. }
  61. // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  62. // may need increasing http.Client.Timeout
  63. func Get(url string) ([]byte, bool, error) {
  64. request, err := http.NewRequest("GET", url, nil)
  65. request.Header.Add("Accept-Encoding", "gzip")
  66. response, err := client.Do(request)
  67. if err != nil {
  68. return nil, true, err
  69. }
  70. defer response.Body.Close()
  71. var reader io.ReadCloser
  72. switch response.Header.Get("Content-Encoding") {
  73. case "gzip":
  74. reader, err = gzip.NewReader(response.Body)
  75. defer reader.Close()
  76. default:
  77. reader = response.Body
  78. }
  79. b, err := ioutil.ReadAll(reader)
  80. if response.StatusCode >= 400 {
  81. retryable := response.StatusCode >= 500
  82. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  83. }
  84. if err != nil {
  85. return nil, false, err
  86. }
  87. return b, false, nil
  88. }
  89. func Head(url string) (http.Header, error) {
  90. r, err := client.Head(url)
  91. if err != nil {
  92. return nil, err
  93. }
  94. defer CloseResponse(r)
  95. if r.StatusCode >= 400 {
  96. return nil, fmt.Errorf("%s: %s", url, r.Status)
  97. }
  98. return r.Header, nil
  99. }
  100. func Delete(url string, jwt string) error {
  101. req, err := http.NewRequest("DELETE", url, nil)
  102. if jwt != "" {
  103. req.Header.Set("Authorization", "BEARER "+string(jwt))
  104. }
  105. if err != nil {
  106. return err
  107. }
  108. resp, e := client.Do(req)
  109. if e != nil {
  110. return e
  111. }
  112. defer resp.Body.Close()
  113. body, err := ioutil.ReadAll(resp.Body)
  114. if err != nil {
  115. return err
  116. }
  117. switch resp.StatusCode {
  118. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  119. return nil
  120. }
  121. m := make(map[string]interface{})
  122. if e := json.Unmarshal(body, &m); e == nil {
  123. if s, ok := m["error"].(string); ok {
  124. return errors.New(s)
  125. }
  126. }
  127. return errors.New(string(body))
  128. }
  129. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  130. r, err := client.PostForm(url, values)
  131. if err != nil {
  132. return err
  133. }
  134. defer CloseResponse(r)
  135. if r.StatusCode != 200 {
  136. return fmt.Errorf("%s: %s", url, r.Status)
  137. }
  138. for {
  139. n, err := r.Body.Read(allocatedBytes)
  140. if n > 0 {
  141. eachBuffer(allocatedBytes[:n])
  142. }
  143. if err != nil {
  144. if err == io.EOF {
  145. return nil
  146. }
  147. return err
  148. }
  149. }
  150. }
  151. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  152. r, err := client.PostForm(url, values)
  153. if err != nil {
  154. return err
  155. }
  156. defer CloseResponse(r)
  157. if r.StatusCode != 200 {
  158. return fmt.Errorf("%s: %s", url, r.Status)
  159. }
  160. return readFn(r.Body)
  161. }
  162. func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) {
  163. response, err := client.Get(fileUrl)
  164. if err != nil {
  165. return "", nil, nil, err
  166. }
  167. header = response.Header
  168. contentDisposition := response.Header["Content-Disposition"]
  169. if len(contentDisposition) > 0 {
  170. idx := strings.Index(contentDisposition[0], "filename=")
  171. if idx != -1 {
  172. filename = contentDisposition[0][idx+len("filename="):]
  173. filename = strings.Trim(filename, "\"")
  174. }
  175. }
  176. resp = response
  177. return
  178. }
  179. func Do(req *http.Request) (resp *http.Response, err error) {
  180. return client.Do(req)
  181. }
  182. func NormalizeUrl(url string) string {
  183. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  184. return url
  185. }
  186. return "http://" + url
  187. }
  188. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  189. if cipherKey != nil {
  190. var n int
  191. _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  192. n = copy(buf, data)
  193. })
  194. return int64(n), err
  195. }
  196. req, err := http.NewRequest("GET", fileUrl, nil)
  197. if err != nil {
  198. return 0, err
  199. }
  200. if !isFullChunk {
  201. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  202. } else {
  203. req.Header.Set("Accept-Encoding", "gzip")
  204. }
  205. r, err := client.Do(req)
  206. if err != nil {
  207. return 0, err
  208. }
  209. defer r.Body.Close()
  210. if r.StatusCode >= 400 {
  211. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  212. }
  213. var reader io.ReadCloser
  214. contentEncoding := r.Header.Get("Content-Encoding")
  215. switch contentEncoding {
  216. case "gzip":
  217. reader, err = gzip.NewReader(r.Body)
  218. defer reader.Close()
  219. default:
  220. reader = r.Body
  221. }
  222. var (
  223. i, m int
  224. n int64
  225. )
  226. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  227. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  228. for {
  229. m, err = reader.Read(buf[i:])
  230. i += m
  231. n += int64(m)
  232. if err == io.EOF {
  233. return n, nil
  234. }
  235. if err != nil {
  236. return n, err
  237. }
  238. if n == int64(len(buf)) {
  239. break
  240. }
  241. }
  242. // drains the response body to avoid memory leak
  243. data, _ := ioutil.ReadAll(reader)
  244. if len(data) != 0 {
  245. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  246. }
  247. return n, err
  248. }
  249. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  250. if cipherKey != nil {
  251. return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  252. }
  253. req, err := http.NewRequest("GET", fileUrl, nil)
  254. if err != nil {
  255. return false, err
  256. }
  257. if isFullChunk {
  258. req.Header.Add("Accept-Encoding", "gzip")
  259. } else {
  260. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  261. }
  262. r, err := client.Do(req)
  263. if err != nil {
  264. return true, err
  265. }
  266. defer CloseResponse(r)
  267. if r.StatusCode >= 400 {
  268. retryable = r.StatusCode >= 500
  269. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  270. }
  271. var reader io.ReadCloser
  272. contentEncoding := r.Header.Get("Content-Encoding")
  273. switch contentEncoding {
  274. case "gzip":
  275. reader, err = gzip.NewReader(r.Body)
  276. defer reader.Close()
  277. default:
  278. reader = r.Body
  279. }
  280. var (
  281. m int
  282. )
  283. buf := make([]byte, 64*1024)
  284. for {
  285. m, err = reader.Read(buf)
  286. fn(buf[:m])
  287. if err == io.EOF {
  288. return false, nil
  289. }
  290. if err != nil {
  291. return false, err
  292. }
  293. }
  294. }
  295. func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  296. encryptedData, retryable, err := Get(fileUrl)
  297. if err != nil {
  298. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  299. }
  300. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  301. if err != nil {
  302. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  303. }
  304. if isContentCompressed {
  305. decryptedData, err = DecompressData(decryptedData)
  306. if err != nil {
  307. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  308. }
  309. }
  310. if len(decryptedData) < int(offset)+size {
  311. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  312. }
  313. if isFullChunk {
  314. fn(decryptedData)
  315. } else {
  316. fn(decryptedData[int(offset) : int(offset)+size])
  317. }
  318. return false, nil
  319. }
  320. func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
  321. req, err := http.NewRequest("GET", fileUrl, nil)
  322. if err != nil {
  323. return nil, err
  324. }
  325. if rangeHeader != "" {
  326. req.Header.Add("Range", rangeHeader)
  327. } else {
  328. req.Header.Add("Accept-Encoding", "gzip")
  329. }
  330. r, err := client.Do(req)
  331. if err != nil {
  332. return nil, err
  333. }
  334. defer CloseResponse(r)
  335. if r.StatusCode >= 400 {
  336. return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  337. }
  338. var reader io.ReadCloser
  339. contentEncoding := r.Header.Get("Content-Encoding")
  340. switch contentEncoding {
  341. case "gzip":
  342. reader, err = gzip.NewReader(r.Body)
  343. defer reader.Close()
  344. default:
  345. reader = r.Body
  346. }
  347. return reader, nil
  348. }
  349. func CloseResponse(resp *http.Response) {
  350. io.Copy(ioutil.Discard, resp.Body)
  351. resp.Body.Close()
  352. }