You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

450 lines
14 KiB

8 years ago
8 years ago
5 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
5 years ago
5 years ago
  1. // Package rssbot implements a Service capable of reading Atom/RSS feeds.
  2. package rssbot
  3. import (
  4. "errors"
  5. "fmt"
  6. "html"
  7. "net/http"
  8. "strconv"
  9. "time"
  10. log "github.com/Sirupsen/logrus"
  11. "github.com/die-net/lrucache"
  12. "github.com/gregjones/httpcache"
  13. "github.com/matrix-org/go-neb/database"
  14. "github.com/matrix-org/go-neb/polling"
  15. "github.com/matrix-org/go-neb/types"
  16. "github.com/matrix-org/gomatrix"
  17. "github.com/mmcdole/gofeed"
  18. "github.com/prometheus/client_golang/prometheus"
  19. )
  20. // ServiceType of the RSS Bot service
  21. const ServiceType = "rssbot"
  22. var cachingClient *http.Client
  23. var (
  24. pollCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
  25. Name: "goneb_rss_polls_total",
  26. Help: "The number of feed polls from RSS services",
  27. }, []string{"http_status"})
  28. )
  29. const minPollingIntervalSeconds = 60 * 5 // 5 min (News feeds can be genuinely spammy)
  30. // Service contains the Config fields for this service.
  31. //
  32. // Example request:
  33. // {
  34. // feeds: {
  35. // "http://rss.cnn.com/rss/edition.rss": {
  36. // poll_interval_mins: 60,
  37. // rooms: ["!cBrPbzWazCtlkMNQSF:localhost"]
  38. // },
  39. // "https://www.wired.com/feed/": {
  40. // rooms: ["!qmElAGdFYCHoCJuaNt:localhost"]
  41. // }
  42. // }
  43. // }
  44. type Service struct {
  45. types.DefaultService
  46. // Feeds is a map of feed URL to configuration options for this feed.
  47. Feeds map[string]struct {
  48. // Optional. The time to wait between polls. If this is less than minPollingIntervalSeconds, it is ignored.
  49. PollIntervalMins int `json:"poll_interval_mins"`
  50. // The list of rooms to send feed updates into. This cannot be empty.
  51. Rooms []string `json:"rooms"`
  52. // True if rss bot is unable to poll this feed. This is populated by Go-NEB. Use /getService to
  53. // retrieve this value.
  54. IsFailing bool `json:"is_failing"`
  55. // The time of the last successful poll. This is populated by Go-NEB. Use /getService to retrieve
  56. // this value.
  57. FeedUpdatedTimestampSecs int64 `json:"last_updated_ts_secs"`
  58. // Internal field. When we should poll again.
  59. NextPollTimestampSecs int64
  60. // Internal field. The most recently seen GUIDs. Sized to the number of items in the feed.
  61. RecentGUIDs []string
  62. } `json:"feeds"`
  63. }
  64. // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
  65. func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error {
  66. if len(s.Feeds) == 0 {
  67. // this is an error UNLESS the old service had some feeds in which case they are deleting us :(
  68. var numOldFeeds int
  69. oldFeedService, ok := oldService.(*Service)
  70. if !ok {
  71. log.WithField("service", oldService).Error("Old service isn't an rssbot.Service")
  72. } else {
  73. numOldFeeds = len(oldFeedService.Feeds)
  74. }
  75. if numOldFeeds == 0 {
  76. return errors.New("An RSS feed must be specified")
  77. }
  78. return nil
  79. }
  80. // Make sure we can parse the feed
  81. for feedURL, feedInfo := range s.Feeds {
  82. if _, err := readFeed(feedURL); err != nil {
  83. return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error())
  84. }
  85. if len(feedInfo.Rooms) == 0 {
  86. return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL)
  87. }
  88. }
  89. s.joinRooms(client)
  90. return nil
  91. }
  92. func (s *Service) joinRooms(client *gomatrix.Client) {
  93. roomSet := make(map[string]bool)
  94. for _, feedInfo := range s.Feeds {
  95. for _, roomID := range feedInfo.Rooms {
  96. roomSet[roomID] = true
  97. }
  98. }
  99. for roomID := range roomSet {
  100. if _, err := client.JoinRoom(roomID, "", nil); err != nil {
  101. log.WithFields(log.Fields{
  102. log.ErrorKey: err,
  103. "room_id": roomID,
  104. "user_id": client.UserID,
  105. }).Error("Failed to join room")
  106. }
  107. }
  108. }
  109. // PostRegister deletes this service if there are no feeds remaining.
  110. func (s *Service) PostRegister(oldService types.Service) {
  111. if len(s.Feeds) == 0 { // bye-bye :(
  112. logger := log.WithFields(log.Fields{
  113. "service_id": s.ServiceID(),
  114. "service_type": s.ServiceType(),
  115. })
  116. logger.Info("Deleting service: No feeds remaining.")
  117. polling.StopPolling(s)
  118. if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
  119. logger.WithError(err).Error("Failed to delete service")
  120. }
  121. }
  122. }
  123. // OnPoll rechecks RSS feeds which are due to be polled.
  124. //
  125. // In order for a feed to be polled, the current time must be greater than NextPollTimestampSecs.
  126. // In order for an item on a feed to be sent to Matrix, the item's GUID must not exist in RecentGUIDs.
  127. // The GUID for an item is created according to the following rules:
  128. // - If there is a GUID field, use it.
  129. // - Else if there is a Link field, use it as the GUID.
  130. // - Else if there is a Title field, use it as the GUID.
  131. //
  132. // Returns a timestamp representing when this Service should have OnPoll called again.
  133. func (s *Service) OnPoll(cli *gomatrix.Client) time.Time {
  134. logger := log.WithFields(log.Fields{
  135. "service_id": s.ServiceID(),
  136. "service_type": s.ServiceType(),
  137. })
  138. now := time.Now().Unix() // Second resolution
  139. // Work out which feeds should be polled
  140. var pollFeeds []string
  141. for u, feedInfo := range s.Feeds {
  142. if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs {
  143. // re-query this feed
  144. pollFeeds = append(pollFeeds, u)
  145. }
  146. }
  147. if len(pollFeeds) == 0 {
  148. return s.nextTimestamp()
  149. }
  150. // Query each feed and send new items to subscribed rooms
  151. for _, u := range pollFeeds {
  152. feed, items, err := s.queryFeed(u)
  153. if err != nil {
  154. logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
  155. incrementMetrics(u, err)
  156. continue
  157. }
  158. incrementMetrics(u, nil)
  159. logger.WithFields(log.Fields{
  160. "feed_url": u,
  161. "feed_items": len(feed.Items),
  162. "new_items": len(items),
  163. }).Info("Sending new items")
  164. // Loop backwards since [0] is the most recent and we want to send in chronological order
  165. for i := len(items) - 1; i >= 0; i-- {
  166. item := items[i]
  167. if err := s.sendToRooms(cli, u, feed, item); err != nil {
  168. logger.WithFields(log.Fields{
  169. "feed_url": u,
  170. log.ErrorKey: err,
  171. "item": item,
  172. }).Error("Failed to send item to room")
  173. }
  174. }
  175. }
  176. // Persist the service to save the next poll times
  177. if _, err := database.GetServiceDB().StoreService(s); err != nil {
  178. logger.WithError(err).Error("Failed to persist next poll times for service")
  179. }
  180. return s.nextTimestamp()
  181. }
  182. func incrementMetrics(urlStr string, err error) {
  183. if err != nil {
  184. herr, ok := err.(gofeed.HTTPError)
  185. statusCode := 0 // e.g. network timeout
  186. if ok {
  187. statusCode = herr.StatusCode
  188. }
  189. pollCounter.With(prometheus.Labels{"http_status": strconv.Itoa(statusCode)}).Inc()
  190. } else {
  191. pollCounter.With(prometheus.Labels{"http_status": "200"}).Inc() // technically 2xx but gofeed doesn't tell us which
  192. }
  193. }
  194. func (s *Service) nextTimestamp() time.Time {
  195. // return the earliest next poll ts
  196. var earliestNextTs int64
  197. for _, feedInfo := range s.Feeds {
  198. if earliestNextTs == 0 || feedInfo.NextPollTimestampSecs < earliestNextTs {
  199. earliestNextTs = feedInfo.NextPollTimestampSecs
  200. }
  201. }
  202. // Don't allow times in the past. Set a min re-poll threshold of 60s to avoid
  203. // tight-looping on feeds which 500.
  204. now := time.Now().Unix()
  205. if earliestNextTs <= now {
  206. earliestNextTs = now + 60
  207. }
  208. return time.Unix(earliestNextTs, 0)
  209. }
  210. // Query the given feed, update relevant timestamps and return NEW items
  211. func (s *Service) queryFeed(feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
  212. log.WithField("feed_url", feedURL).Info("Querying feed")
  213. var items []gofeed.Item
  214. feed, err := readFeed(feedURL)
  215. // check for no items in addition to any returned errors as it appears some RSS feeds
  216. // do not consistently return items.
  217. if err == nil && len(feed.Items) == 0 {
  218. err = errors.New("feed has 0 items")
  219. }
  220. if err != nil {
  221. f := s.Feeds[feedURL]
  222. f.IsFailing = true
  223. s.Feeds[feedURL] = f
  224. return nil, items, err
  225. }
  226. // Patch up the item list: make sure each item has a GUID.
  227. ensureItemsHaveGUIDs(feed)
  228. // Work out which items are new, if any (based on the last updated TS we have)
  229. // If the TS is 0 then this is the first ever poll, so let's not send 10s of events
  230. // into the room and just do new ones from this point onwards.
  231. if s.Feeds[feedURL].NextPollTimestampSecs != 0 {
  232. items = s.newItems(feedURL, feed.Items)
  233. }
  234. now := time.Now().Unix() // Second resolution
  235. // Work out when to next poll this feed
  236. nextPollTsSec := now + minPollingIntervalSeconds
  237. if s.Feeds[feedURL].PollIntervalMins > int(minPollingIntervalSeconds/60) {
  238. nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60)
  239. }
  240. // TODO: Handle the 'sy' Syndication extension to control update interval.
  241. // See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
  242. // Work out which GUIDs to remember. We don't want to remember every GUID ever as that leads to completely
  243. // unbounded growth of data.
  244. f := s.Feeds[feedURL]
  245. // Some RSS feeds can return a very small number of items then bounce
  246. // back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
  247. // forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
  248. // seen from this feed, up to a max of 10,000.
  249. maxGuids := 2 * len(feed.Items)
  250. if len(f.RecentGUIDs) > maxGuids {
  251. maxGuids = len(f.RecentGUIDs) // already 2x'd.
  252. }
  253. if maxGuids > 10000 {
  254. maxGuids = 10000
  255. }
  256. lastSet := uniqueStrings(f.RecentGUIDs) // e.g. [4,5,6]
  257. thisSet := uniqueGuids(feed.Items) // e.g. [1,2,3]
  258. guids := append(thisSet, lastSet...) // e.g. [1,2,3,4,5,6]
  259. guids = uniqueStrings(guids)
  260. if len(guids) > maxGuids {
  261. // Critically this favours the NEWEST elements, which are the ones we're most likely to see again.
  262. guids = guids[0:maxGuids]
  263. }
  264. // Update the service config to persist the new times
  265. f.NextPollTimestampSecs = nextPollTsSec
  266. f.FeedUpdatedTimestampSecs = now
  267. f.RecentGUIDs = guids
  268. f.IsFailing = false
  269. s.Feeds[feedURL] = f
  270. return feed, items, nil
  271. }
  272. func (s *Service) newItems(feedURL string, allItems []*gofeed.Item) (items []gofeed.Item) {
  273. for _, i := range allItems {
  274. if i == nil {
  275. continue
  276. }
  277. // if we've seen this guid before, we've sent it before
  278. seenBefore := false
  279. for _, guid := range s.Feeds[feedURL].RecentGUIDs {
  280. if guid == i.GUID {
  281. seenBefore = true
  282. break
  283. }
  284. }
  285. if seenBefore {
  286. continue
  287. }
  288. // Decode HTML for <title> and <description>:
  289. // The RSS 2.0 Spec http://cyber.harvard.edu/rss/rss.html#hrelementsOfLtitemgt supports a bunch
  290. // of weird ways to put HTML into <title> and <description> tags. Not all RSS feed producers run
  291. // these fields through entity encoders (some have ' unencoded, others have it as &#8217;). We'll
  292. // assume that all RSS fields are sending HTML for these fields and run them through a standard decoder.
  293. // This will inevitably break for some people, but that group of people are probably smaller, so *shrug*.
  294. i.Title = html.UnescapeString(i.Title)
  295. i.Description = html.UnescapeString(i.Description)
  296. items = append(items, *i)
  297. }
  298. return
  299. }
  300. func (s *Service) sendToRooms(cli *gomatrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
  301. logger := log.WithFields(log.Fields{
  302. "feed_url": feedURL,
  303. "title": item.Title,
  304. "guid": item.GUID,
  305. })
  306. logger.Info("Sending new feed item")
  307. for _, roomID := range s.Feeds[feedURL].Rooms {
  308. if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil {
  309. logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room")
  310. }
  311. }
  312. return nil
  313. }
  314. func itemToHTML(feed *gofeed.Feed, item gofeed.Item) gomatrix.HTMLMessage {
  315. return gomatrix.HTMLMessage{
  316. Body: fmt.Sprintf("%s: %s (%s)",
  317. html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link)),
  318. MsgType: "m.notice",
  319. Format: "org.matrix.custom.html",
  320. FormattedBody: fmt.Sprintf("<strong>%s</strong>:<br><a href=\"%s\"><strong>%s</strong></a>",
  321. html.EscapeString(feed.Title), html.EscapeString(item.Link), html.EscapeString(item.Title)),
  322. // <strong>FeedTitle</strong>:
  323. // <br>
  324. // <a href="url-of-the-entry"><strong>Title of the Entry</strong></a>
  325. }
  326. }
  327. func ensureItemsHaveGUIDs(feed *gofeed.Feed) {
  328. for idx := 0; idx < len(feed.Items); idx++ {
  329. itm := feed.Items[idx]
  330. if itm.GUID == "" {
  331. if itm.Link != "" {
  332. itm.GUID = itm.Link
  333. } else if itm.Title != "" {
  334. itm.GUID = itm.Title
  335. }
  336. feed.Items[idx] = itm
  337. }
  338. }
  339. }
  340. // uniqueStrings returns a new slice of strings with duplicate elements removed.
  341. // Order is otherwise preserved.
  342. func uniqueStrings(a []string) []string {
  343. ret := []string{}
  344. seen := make(map[string]bool)
  345. for _, str := range a {
  346. if seen[str] {
  347. continue
  348. }
  349. seen[str] = true
  350. ret = append(ret, str)
  351. }
  352. return ret
  353. }
  354. // uniqueGuids returns a new slice of GUID strings with duplicate elements removed.
  355. // Order is otherwise preserved.
  356. func uniqueGuids(a []*gofeed.Item) []string {
  357. ret := []string{}
  358. seen := make(map[string]bool)
  359. for _, item := range a {
  360. if seen[item.GUID] {
  361. continue
  362. }
  363. seen[item.GUID] = true
  364. ret = append(ret, item.GUID)
  365. }
  366. return ret
  367. }
  368. type userAgentRoundTripper struct {
  369. Transport http.RoundTripper
  370. }
  371. func (rt userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  372. req.Header.Set("User-Agent", "Go-NEB")
  373. return rt.Transport.RoundTrip(req)
  374. }
  375. func readFeed(feedURL string) (*gofeed.Feed, error) {
  376. // Don't use fp.ParseURL because it leaks on non-2xx responses as of 2016/11/29 (cac19c6c27)
  377. fp := gofeed.NewParser()
  378. resp, err := cachingClient.Get(feedURL)
  379. if resp != nil {
  380. defer resp.Body.Close()
  381. }
  382. if err != nil {
  383. return nil, err
  384. }
  385. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  386. return nil, gofeed.HTTPError{
  387. StatusCode: resp.StatusCode,
  388. Status: resp.Status,
  389. }
  390. }
  391. return fp.Parse(resp.Body)
  392. }
  393. func init() {
  394. lruCache := lrucache.New(1024*1024*20, 0) // 20 MB cache, no max-age
  395. cachingClient = &http.Client{
  396. Transport: userAgentRoundTripper{httpcache.NewTransport(lruCache)},
  397. }
  398. types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
  399. r := &Service{
  400. DefaultService: types.NewDefaultService(serviceID, serviceUserID, ServiceType),
  401. }
  402. return r
  403. })
  404. prometheus.MustRegister(pollCounter)
  405. }