You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
8.7 KiB

9 years ago
10 years ago
5 years ago
5 years ago
5 years ago
  1. package util
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. )
  13. var (
  14. client *http.Client
  15. Transport *http.Transport
  16. )
  17. func init() {
  18. Transport = &http.Transport{
  19. MaxIdleConns: 1024,
  20. MaxIdleConnsPerHost: 1024,
  21. }
  22. client = &http.Client{
  23. Transport: Transport,
  24. }
  25. }
  26. func Post(url string, values url.Values) ([]byte, error) {
  27. r, err := client.PostForm(url, values)
  28. if err != nil {
  29. return nil, err
  30. }
  31. defer r.Body.Close()
  32. b, err := io.ReadAll(r.Body)
  33. if r.StatusCode >= 400 {
  34. if err != nil {
  35. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  36. } else {
  37. return nil, fmt.Errorf("%s: %s", url, r.Status)
  38. }
  39. }
  40. if err != nil {
  41. return nil, err
  42. }
  43. return b, nil
  44. }
  45. // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  46. // may need increasing http.Client.Timeout
  47. func Get(url string) ([]byte, bool, error) {
  48. request, err := http.NewRequest("GET", url, nil)
  49. request.Header.Add("Accept-Encoding", "gzip")
  50. response, err := client.Do(request)
  51. if err != nil {
  52. return nil, true, err
  53. }
  54. defer response.Body.Close()
  55. var reader io.ReadCloser
  56. switch response.Header.Get("Content-Encoding") {
  57. case "gzip":
  58. reader, err = gzip.NewReader(response.Body)
  59. defer reader.Close()
  60. default:
  61. reader = response.Body
  62. }
  63. b, err := io.ReadAll(reader)
  64. if response.StatusCode >= 400 {
  65. retryable := response.StatusCode >= 500
  66. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  67. }
  68. if err != nil {
  69. return nil, false, err
  70. }
  71. return b, false, nil
  72. }
  73. func Head(url string) (http.Header, error) {
  74. r, err := client.Head(url)
  75. if err != nil {
  76. return nil, err
  77. }
  78. defer CloseResponse(r)
  79. if r.StatusCode >= 400 {
  80. return nil, fmt.Errorf("%s: %s", url, r.Status)
  81. }
  82. return r.Header, nil
  83. }
  84. func Delete(url string, jwt string) error {
  85. req, err := http.NewRequest("DELETE", url, nil)
  86. if jwt != "" {
  87. req.Header.Set("Authorization", "BEARER "+string(jwt))
  88. }
  89. if err != nil {
  90. return err
  91. }
  92. resp, e := client.Do(req)
  93. if e != nil {
  94. return e
  95. }
  96. defer resp.Body.Close()
  97. body, err := io.ReadAll(resp.Body)
  98. if err != nil {
  99. return err
  100. }
  101. switch resp.StatusCode {
  102. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  103. return nil
  104. }
  105. m := make(map[string]interface{})
  106. if e := json.Unmarshal(body, &m); e == nil {
  107. if s, ok := m["error"].(string); ok {
  108. return errors.New(s)
  109. }
  110. }
  111. return errors.New(string(body))
  112. }
  113. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  114. req, err := http.NewRequest("DELETE", url, nil)
  115. if jwt != "" {
  116. req.Header.Set("Authorization", "BEARER "+string(jwt))
  117. }
  118. if err != nil {
  119. return
  120. }
  121. resp, err := client.Do(req)
  122. if err != nil {
  123. return
  124. }
  125. defer resp.Body.Close()
  126. body, err = io.ReadAll(resp.Body)
  127. if err != nil {
  128. return
  129. }
  130. httpStatus = resp.StatusCode
  131. return
  132. }
  133. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  134. r, err := client.PostForm(url, values)
  135. if err != nil {
  136. return err
  137. }
  138. defer CloseResponse(r)
  139. if r.StatusCode != 200 {
  140. return fmt.Errorf("%s: %s", url, r.Status)
  141. }
  142. for {
  143. n, err := r.Body.Read(allocatedBytes)
  144. if n > 0 {
  145. eachBuffer(allocatedBytes[:n])
  146. }
  147. if err != nil {
  148. if err == io.EOF {
  149. return nil
  150. }
  151. return err
  152. }
  153. }
  154. }
  155. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  156. r, err := client.PostForm(url, values)
  157. if err != nil {
  158. return err
  159. }
  160. defer CloseResponse(r)
  161. if r.StatusCode != 200 {
  162. return fmt.Errorf("%s: %s", url, r.Status)
  163. }
  164. return readFn(r.Body)
  165. }
  166. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  167. response, err := client.Get(fileUrl)
  168. if err != nil {
  169. return "", nil, nil, err
  170. }
  171. header = response.Header
  172. contentDisposition := response.Header["Content-Disposition"]
  173. if len(contentDisposition) > 0 {
  174. idx := strings.Index(contentDisposition[0], "filename=")
  175. if idx != -1 {
  176. filename = contentDisposition[0][idx+len("filename="):]
  177. filename = strings.Trim(filename, "\"")
  178. }
  179. }
  180. resp = response
  181. return
  182. }
  183. func Do(req *http.Request) (resp *http.Response, err error) {
  184. return client.Do(req)
  185. }
  186. func NormalizeUrl(url string) string {
  187. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  188. return url
  189. }
  190. return "http://" + url
  191. }
  192. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  193. if cipherKey != nil {
  194. var n int
  195. _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  196. n = copy(buf, data)
  197. })
  198. return int64(n), err
  199. }
  200. req, err := http.NewRequest("GET", fileUrl, nil)
  201. if err != nil {
  202. return 0, err
  203. }
  204. if !isFullChunk {
  205. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  206. } else {
  207. req.Header.Set("Accept-Encoding", "gzip")
  208. }
  209. r, err := client.Do(req)
  210. if err != nil {
  211. return 0, err
  212. }
  213. defer r.Body.Close()
  214. if r.StatusCode >= 400 {
  215. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  216. }
  217. var reader io.ReadCloser
  218. contentEncoding := r.Header.Get("Content-Encoding")
  219. switch contentEncoding {
  220. case "gzip":
  221. reader, err = gzip.NewReader(r.Body)
  222. defer reader.Close()
  223. default:
  224. reader = r.Body
  225. }
  226. var (
  227. i, m int
  228. n int64
  229. )
  230. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  231. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  232. for {
  233. m, err = reader.Read(buf[i:])
  234. i += m
  235. n += int64(m)
  236. if err == io.EOF {
  237. return n, nil
  238. }
  239. if err != nil {
  240. return n, err
  241. }
  242. if n == int64(len(buf)) {
  243. break
  244. }
  245. }
  246. // drains the response body to avoid memory leak
  247. data, _ := io.ReadAll(reader)
  248. if len(data) != 0 {
  249. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  250. }
  251. return n, err
  252. }
  253. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  254. if cipherKey != nil {
  255. return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  256. }
  257. req, err := http.NewRequest("GET", fileUrl, nil)
  258. if err != nil {
  259. return false, err
  260. }
  261. if isFullChunk {
  262. req.Header.Add("Accept-Encoding", "gzip")
  263. } else {
  264. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  265. }
  266. r, err := client.Do(req)
  267. if err != nil {
  268. return true, err
  269. }
  270. defer CloseResponse(r)
  271. if r.StatusCode >= 400 {
  272. retryable = r.StatusCode >= 500
  273. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  274. }
  275. var reader io.ReadCloser
  276. contentEncoding := r.Header.Get("Content-Encoding")
  277. switch contentEncoding {
  278. case "gzip":
  279. reader, err = gzip.NewReader(r.Body)
  280. defer reader.Close()
  281. default:
  282. reader = r.Body
  283. }
  284. var (
  285. m int
  286. )
  287. buf := make([]byte, 64*1024)
  288. for {
  289. m, err = reader.Read(buf)
  290. fn(buf[:m])
  291. if err == io.EOF {
  292. return false, nil
  293. }
  294. if err != nil {
  295. return false, err
  296. }
  297. }
  298. }
  299. func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  300. encryptedData, retryable, err := Get(fileUrl)
  301. if err != nil {
  302. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  303. }
  304. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  305. if err != nil {
  306. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  307. }
  308. if isContentCompressed {
  309. decryptedData, err = DecompressData(decryptedData)
  310. if err != nil {
  311. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  312. }
  313. }
  314. if len(decryptedData) < int(offset)+size {
  315. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  316. }
  317. if isFullChunk {
  318. fn(decryptedData)
  319. } else {
  320. fn(decryptedData[int(offset) : int(offset)+size])
  321. }
  322. return false, nil
  323. }
  324. func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
  325. req, err := http.NewRequest("GET", fileUrl, nil)
  326. if err != nil {
  327. return nil, err
  328. }
  329. if rangeHeader != "" {
  330. req.Header.Add("Range", rangeHeader)
  331. } else {
  332. req.Header.Add("Accept-Encoding", "gzip")
  333. }
  334. r, err := client.Do(req)
  335. if err != nil {
  336. return nil, err
  337. }
  338. if r.StatusCode >= 400 {
  339. return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  340. }
  341. var reader io.ReadCloser
  342. contentEncoding := r.Header.Get("Content-Encoding")
  343. switch contentEncoding {
  344. case "gzip":
  345. reader, err = gzip.NewReader(r.Body)
  346. defer reader.Close()
  347. default:
  348. reader = r.Body
  349. }
  350. return reader, nil
  351. }
  352. func CloseResponse(resp *http.Response) {
  353. io.Copy(io.Discard, resp.Body)
  354. resp.Body.Close()
  355. }
  356. func CloseRequest(req *http.Request) {
  357. io.Copy(io.Discard, req.Body)
  358. req.Body.Close()
  359. }