You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

403 lines
8.8 KiB

9 years ago
10 years ago
5 years ago
5 years ago
5 years ago
  1. package util
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. )
  14. var (
  15. client *http.Client
  16. Transport *http.Transport
  17. )
  18. func init() {
  19. Transport = &http.Transport{
  20. MaxIdleConns: 1024,
  21. MaxIdleConnsPerHost: 1024,
  22. }
  23. client = &http.Client{
  24. Transport: Transport,
  25. }
  26. }
  27. func Post(url string, values url.Values) ([]byte, error) {
  28. r, err := client.PostForm(url, values)
  29. if err != nil {
  30. return nil, err
  31. }
  32. defer r.Body.Close()
  33. b, err := ioutil.ReadAll(r.Body)
  34. if r.StatusCode >= 400 {
  35. if err != nil {
  36. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  37. } else {
  38. return nil, fmt.Errorf("%s: %s", url, r.Status)
  39. }
  40. }
  41. if err != nil {
  42. return nil, err
  43. }
  44. return b, nil
  45. }
  46. // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  47. // may need increasing http.Client.Timeout
  48. func Get(url string) ([]byte, bool, error) {
  49. request, err := http.NewRequest("GET", url, nil)
  50. request.Header.Add("Accept-Encoding", "gzip")
  51. response, err := client.Do(request)
  52. if err != nil {
  53. return nil, true, err
  54. }
  55. defer response.Body.Close()
  56. var reader io.ReadCloser
  57. switch response.Header.Get("Content-Encoding") {
  58. case "gzip":
  59. reader, err = gzip.NewReader(response.Body)
  60. defer reader.Close()
  61. default:
  62. reader = response.Body
  63. }
  64. b, err := ioutil.ReadAll(reader)
  65. if response.StatusCode >= 400 {
  66. retryable := response.StatusCode >= 500
  67. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  68. }
  69. if err != nil {
  70. return nil, false, err
  71. }
  72. return b, false, nil
  73. }
  74. func Head(url string) (http.Header, error) {
  75. r, err := client.Head(url)
  76. if err != nil {
  77. return nil, err
  78. }
  79. defer CloseResponse(r)
  80. if r.StatusCode >= 400 {
  81. return nil, fmt.Errorf("%s: %s", url, r.Status)
  82. }
  83. return r.Header, nil
  84. }
  85. func Delete(url string, jwt string) error {
  86. req, err := http.NewRequest("DELETE", url, nil)
  87. if jwt != "" {
  88. req.Header.Set("Authorization", "BEARER "+string(jwt))
  89. }
  90. if err != nil {
  91. return err
  92. }
  93. resp, e := client.Do(req)
  94. if e != nil {
  95. return e
  96. }
  97. defer resp.Body.Close()
  98. body, err := ioutil.ReadAll(resp.Body)
  99. if err != nil {
  100. return err
  101. }
  102. switch resp.StatusCode {
  103. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  104. return nil
  105. }
  106. m := make(map[string]interface{})
  107. if e := json.Unmarshal(body, &m); e == nil {
  108. if s, ok := m["error"].(string); ok {
  109. return errors.New(s)
  110. }
  111. }
  112. return errors.New(string(body))
  113. }
  114. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  115. req, err := http.NewRequest("DELETE", url, nil)
  116. if jwt != "" {
  117. req.Header.Set("Authorization", "BEARER "+string(jwt))
  118. }
  119. if err != nil {
  120. return
  121. }
  122. resp, err := client.Do(req)
  123. if err != nil {
  124. return
  125. }
  126. defer resp.Body.Close()
  127. body, err = ioutil.ReadAll(resp.Body)
  128. if err != nil {
  129. return
  130. }
  131. httpStatus = resp.StatusCode
  132. return
  133. }
  134. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  135. r, err := client.PostForm(url, values)
  136. if err != nil {
  137. return err
  138. }
  139. defer CloseResponse(r)
  140. if r.StatusCode != 200 {
  141. return fmt.Errorf("%s: %s", url, r.Status)
  142. }
  143. for {
  144. n, err := r.Body.Read(allocatedBytes)
  145. if n > 0 {
  146. eachBuffer(allocatedBytes[:n])
  147. }
  148. if err != nil {
  149. if err == io.EOF {
  150. return nil
  151. }
  152. return err
  153. }
  154. }
  155. }
  156. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  157. r, err := client.PostForm(url, values)
  158. if err != nil {
  159. return err
  160. }
  161. defer CloseResponse(r)
  162. if r.StatusCode != 200 {
  163. return fmt.Errorf("%s: %s", url, r.Status)
  164. }
  165. return readFn(r.Body)
  166. }
  167. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  168. response, err := client.Get(fileUrl)
  169. if err != nil {
  170. return "", nil, nil, err
  171. }
  172. header = response.Header
  173. contentDisposition := response.Header["Content-Disposition"]
  174. if len(contentDisposition) > 0 {
  175. idx := strings.Index(contentDisposition[0], "filename=")
  176. if idx != -1 {
  177. filename = contentDisposition[0][idx+len("filename="):]
  178. filename = strings.Trim(filename, "\"")
  179. }
  180. }
  181. resp = response
  182. return
  183. }
  184. func Do(req *http.Request) (resp *http.Response, err error) {
  185. return client.Do(req)
  186. }
  187. func NormalizeUrl(url string) string {
  188. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  189. return url
  190. }
  191. return "http://" + url
  192. }
  193. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  194. if cipherKey != nil {
  195. var n int
  196. _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  197. n = copy(buf, data)
  198. })
  199. return int64(n), err
  200. }
  201. req, err := http.NewRequest("GET", fileUrl, nil)
  202. if err != nil {
  203. return 0, err
  204. }
  205. if !isFullChunk {
  206. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  207. } else {
  208. req.Header.Set("Accept-Encoding", "gzip")
  209. }
  210. r, err := client.Do(req)
  211. if err != nil {
  212. return 0, err
  213. }
  214. defer r.Body.Close()
  215. if r.StatusCode >= 400 {
  216. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  217. }
  218. var reader io.ReadCloser
  219. contentEncoding := r.Header.Get("Content-Encoding")
  220. switch contentEncoding {
  221. case "gzip":
  222. reader, err = gzip.NewReader(r.Body)
  223. defer reader.Close()
  224. default:
  225. reader = r.Body
  226. }
  227. var (
  228. i, m int
  229. n int64
  230. )
  231. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  232. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  233. for {
  234. m, err = reader.Read(buf[i:])
  235. i += m
  236. n += int64(m)
  237. if err == io.EOF {
  238. return n, nil
  239. }
  240. if err != nil {
  241. return n, err
  242. }
  243. if n == int64(len(buf)) {
  244. break
  245. }
  246. }
  247. // drains the response body to avoid memory leak
  248. data, _ := ioutil.ReadAll(reader)
  249. if len(data) != 0 {
  250. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  251. }
  252. return n, err
  253. }
  254. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  255. if cipherKey != nil {
  256. return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  257. }
  258. req, err := http.NewRequest("GET", fileUrl, nil)
  259. if err != nil {
  260. return false, err
  261. }
  262. if isFullChunk {
  263. req.Header.Add("Accept-Encoding", "gzip")
  264. } else {
  265. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  266. }
  267. r, err := client.Do(req)
  268. if err != nil {
  269. return true, err
  270. }
  271. defer CloseResponse(r)
  272. if r.StatusCode >= 400 {
  273. retryable = r.StatusCode >= 500
  274. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  275. }
  276. var reader io.ReadCloser
  277. contentEncoding := r.Header.Get("Content-Encoding")
  278. switch contentEncoding {
  279. case "gzip":
  280. reader, err = gzip.NewReader(r.Body)
  281. defer reader.Close()
  282. default:
  283. reader = r.Body
  284. }
  285. var (
  286. m int
  287. )
  288. buf := make([]byte, 64*1024)
  289. for {
  290. m, err = reader.Read(buf)
  291. fn(buf[:m])
  292. if err == io.EOF {
  293. return false, nil
  294. }
  295. if err != nil {
  296. return false, err
  297. }
  298. }
  299. }
  300. func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  301. encryptedData, retryable, err := Get(fileUrl)
  302. if err != nil {
  303. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  304. }
  305. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  306. if err != nil {
  307. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  308. }
  309. if isContentCompressed {
  310. decryptedData, err = DecompressData(decryptedData)
  311. if err != nil {
  312. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  313. }
  314. }
  315. if len(decryptedData) < int(offset)+size {
  316. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  317. }
  318. if isFullChunk {
  319. fn(decryptedData)
  320. } else {
  321. fn(decryptedData[int(offset) : int(offset)+size])
  322. }
  323. return false, nil
  324. }
  325. func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
  326. req, err := http.NewRequest("GET", fileUrl, nil)
  327. if err != nil {
  328. return nil, err
  329. }
  330. if rangeHeader != "" {
  331. req.Header.Add("Range", rangeHeader)
  332. } else {
  333. req.Header.Add("Accept-Encoding", "gzip")
  334. }
  335. r, err := client.Do(req)
  336. if err != nil {
  337. return nil, err
  338. }
  339. if r.StatusCode >= 400 {
  340. return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  341. }
  342. var reader io.ReadCloser
  343. contentEncoding := r.Header.Get("Content-Encoding")
  344. switch contentEncoding {
  345. case "gzip":
  346. reader, err = gzip.NewReader(r.Body)
  347. defer reader.Close()
  348. default:
  349. reader = r.Body
  350. }
  351. return reader, nil
  352. }
  353. func CloseResponse(resp *http.Response) {
  354. io.Copy(ioutil.Discard, resp.Body)
  355. resp.Body.Close()
  356. }
  357. func CloseRequest(req *http.Request) {
  358. io.Copy(ioutil.Discard, req.Body)
  359. req.Body.Close()
  360. }