diff --git a/src/github.com/matrix-org/go-neb/api/handlers/service.go b/src/github.com/matrix-org/go-neb/api/handlers/service.go index f33bd97..6acb777 100644 --- a/src/github.com/matrix-org/go-neb/api/handlers/service.go +++ b/src/github.com/matrix-org/go-neb/api/handlers/service.go @@ -16,6 +16,7 @@ import ( "github.com/matrix-org/go-neb/metrics" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ConfigureService represents an HTTP handler which can process /admin/configureService requests. @@ -211,13 +212,14 @@ func (h *GetService) OnIncomingRequest(req *http.Request) (interface{}, *errors. }{srv.ServiceID(), srv.ServiceType(), srv}, nil } -func checkClientForService(service types.Service, client *matrix.Client) error { +func checkClientForService(service types.Service, client *gomatrix.Client) error { // If there are any commands or expansions for this Service then the service user ID // MUST be a syncing client or else the Service will never get the incoming command/expansion! cmds := service.Commands(client) expans := service.Expansions(client) if len(cmds) > 0 || len(expans) > 0 { - if !client.ClientConfig.Sync { + nebStore := client.Store.(*matrix.NEBStore) + if !nebStore.ClientConfig.Sync { return fmt.Errorf( "Service type '%s' requires a syncing client", service.ServiceType(), ) diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 4352766..842e7f2 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -2,9 +2,9 @@ package clients import ( "net/http" - "net/url" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/api" @@ -12,34 +12,10 @@ import ( "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/metrics" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" shellwords "github.com/mattn/go-shellwords" ) -type nextBatchStore struct { - db database.Storer -} - -func (s nextBatchStore) Save(userID, nextBatch string) { - if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "user_id": userID, - "next_batch": nextBatch, - }).Error("Failed to persist next_batch token") - } -} -func (s nextBatchStore) Load(userID string) string { - token, err := s.db.LoadNextBatch(userID) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "user_id": userID, - }).Error("Failed to load next_batch token") - return "" - } - return token -} - // A Clients is a collection of clients used for bot services. type Clients struct { db database.Storer @@ -60,7 +36,7 @@ func New(db database.Storer, cli *http.Client) *Clients { } // Client gets a client for the userID -func (c *Clients) Client(userID string) (*matrix.Client, error) { +func (c *Clients) Client(userID string) (*gomatrix.Client, error) { entry := c.getClient(userID) if entry.client != nil { return entry.client, nil @@ -93,7 +69,7 @@ func (c *Clients) Start() error { type clientEntry struct { config api.ClientConfig - client *matrix.Client + client *gomatrix.Client } func (c *Clients) getClient(userID string) clientEntry { @@ -172,7 +148,7 @@ func (c *Clients) updateClientInDB(newConfig api.ClientConfig) (new clientEntry, return } -func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) { +func (c *Clients) onMessageEvent(client *gomatrix.Client, event *gomatrix.Event) { services, err := c.db.LoadServicesForUser(client.UserID) if err != nil { log.WithFields(log.Fields{ @@ -232,7 +208,7 @@ func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) { // the matching command with the longest path. Returns the JSON encodable // content of a single matrix message event to use as a response or nil if no // response is appropriate. -func runCommandForService(cmds []types.Command, event *matrix.Event, arguments []string) interface{} { +func runCommandForService(cmds []types.Command, event *gomatrix.Event, arguments []string) interface{} { var bestMatch *types.Command for _, command := range cmds { matches := command.Matches(arguments) @@ -264,7 +240,7 @@ func runCommandForService(cmds []types.Command, event *matrix.Event, arguments [ }).Warn("Command returned both error and content.") } metrics.IncrementCommand(bestMatch.Path[0], metrics.StatusFailure) - content = matrix.TextMessage{"m.notice", err.Error()} + content = gomatrix.TextMessage{"m.notice", err.Error()} } else { metrics.IncrementCommand(bestMatch.Path[0], metrics.StatusSuccess) } @@ -273,7 +249,7 @@ func runCommandForService(cmds []types.Command, event *matrix.Event, arguments [ } // run the expansions for a matrix event. -func runExpansionsForService(expans []types.Expansion, event *matrix.Event, body string) []interface{} { +func runExpansionsForService(expans []types.Expansion, event *gomatrix.Event, body string) []interface{} { var responses []interface{} for _, expansion := range expans { @@ -294,7 +270,7 @@ func runExpansionsForService(expans []types.Expansion, event *matrix.Event, body return responses } -func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) { +func (c *Clients) onBotOptionsEvent(client *gomatrix.Client, event *gomatrix.Event) { // see if these options are for us. The state key is the user ID with a leading _ // to get around restrictions in the HS about having user IDs as state keys. targetUserID := strings.TrimPrefix(event.StateKey, "_") @@ -318,7 +294,7 @@ func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) } } -func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) { +func (c *Clients) onRoomMemberEvent(client *gomatrix.Client, event *gomatrix.Event) { if event.StateKey != client.UserID { return // not our member event } @@ -335,7 +311,11 @@ func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) }) logger.Print("Accepting invite from user") - if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { + content := struct { + Inviter string `json:"inviter"` + }{event.Sender} + + if _, err := client.JoinRoom(event.RoomID, "", content); err != nil { logger.WithError(err).Print("Failed to join room") } else { logger.Print("Joined room") @@ -343,35 +323,60 @@ func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) } } -func (c *Clients) newClient(config api.ClientConfig) (*matrix.Client, error) { - homeserverURL, err := url.Parse(config.HomeserverURL) +func (c *Clients) newClient(config api.ClientConfig) (*gomatrix.Client, error) { + client, err := gomatrix.NewClient(config.HomeserverURL, config.UserID, config.AccessToken) if err != nil { return nil, err } - - client := matrix.NewClient(c.httpClient, homeserverURL, config.AccessToken, config.UserID) - client.NextBatchStorer = nextBatchStore{c.db} - client.ClientConfig = config + client.Client = c.httpClient + syncer := client.Syncer.(*gomatrix.DefaultSyncer) + nebStore := &matrix.NEBStore{ + InMemoryStore: *gomatrix.NewInMemoryStore(), + Database: c.db, + ClientConfig: config, + } + client.Store = nebStore + syncer.Store = nebStore // TODO: Check that the access token is valid for the userID by peforming // a request against the server. - client.Worker.OnEventType("m.room.message", func(event *matrix.Event) { + syncer.OnEventType("m.room.message", func(event *gomatrix.Event) { c.onMessageEvent(client, event) }) - client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) { + syncer.OnEventType("m.room.bot.options", func(event *gomatrix.Event) { c.onBotOptionsEvent(client, event) }) if config.AutoJoinRooms { - client.Worker.OnEventType("m.room.member", func(event *matrix.Event) { + syncer.OnEventType("m.room.member", func(event *gomatrix.Event) { c.onRoomMemberEvent(client, event) }) } + log.WithFields(log.Fields{ + "user_id": config.UserID, + "sync": config.Sync, + "auto_join_rooms": config.AutoJoinRooms, + "since": nebStore.LoadNextBatch(config.UserID), + }).Info("Created new client") + if config.Sync { - go client.Sync() + go func() { + for { + if e := client.Sync(); e != nil { + log.WithFields(log.Fields{ + log.ErrorKey: e, + "user_id": config.UserID, + }).Error("Fatal Sync() error") + time.Sleep(10 * time.Second) + } else { + log.WithField("user_id", config.UserID).Info("Stopping Sync()") + return + } + } + }() } return client, nil diff --git a/src/github.com/matrix-org/go-neb/clients/clients_test.go b/src/github.com/matrix-org/go-neb/clients/clients_test.go index bc6f70e..2fcc613 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients_test.go +++ b/src/github.com/matrix-org/go-neb/clients/clients_test.go @@ -2,13 +2,13 @@ package clients import ( "fmt" - "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" - "github.com/matrix-org/go-neb/types" "net/http" - "net/url" "reflect" "testing" + + "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) var commandParseTests = []struct { @@ -28,7 +28,7 @@ type MockService struct { commands []types.Command } -func (s *MockService) Commands(cli *matrix.Client) []types.Command { +func (s *MockService) Commands(cli *gomatrix.Client) []types.Command { return s.commands } @@ -72,12 +72,12 @@ func TestCommandParsing(t *testing.T) { Transport: trans, } clients := New(&store, cli) - hsURL, _ := url.Parse("https://someplace.somewhere") - mxCli := matrix.NewClient(cli, hsURL, "token", "@service:user") + mxCli, _ := gomatrix.NewClient("https://someplace.somewhere", "@service:user", "token") + mxCli.Client = cli for _, input := range commandParseTests { executedCmdArgs = []string{} - event := matrix.Event{ + event := gomatrix.Event{ Type: "m.room.message", Sender: "@someone:somewhere", RoomID: "!foo:bar", diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index e03c9eb..d863949 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -1,443 +1,69 @@ -// Package matrix provides an HTTP client that can interact with a Homeserver via r0 APIs (/sync). -// -// It is NOT safe to access the field (or any sub-fields of) 'Rooms' concurrently. In essence, this -// structure MUST be treated as read-only. The matrix client will update this structure as new events -// arrive from the homeserver. -// -// Internally, the client has 1 goroutine for polling the server, and 1 goroutine for processing data -// returned. The polling goroutine communicates to the processing goroutine by a buffered channel -// which feedback loops if processing takes a while as it will delay more data from being pulled down -// if the buffer gets full. Modification of the 'Rooms' field of the client is done EXCLUSIVELY on the -// processing goroutine. package matrix import ( - "bytes" "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/url" - "path" - "strconv" - "sync" - "time" log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/api" - "github.com/matrix-org/go-neb/errors" -) - -var ( - filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) + "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/gomatrix" ) -// NextBatchStorer controls loading/saving of next_batch tokens for users -type NextBatchStorer interface { - // Save a next_batch token for a given user. Best effort. - Save(userID, nextBatch string) - // Load a next_batch token for a given user. Return an empty string if no token exists. - Load(userID string) string -} - -// noopNextBatchStore does not load or save next_batch tokens. -type noopNextBatchStore struct{} - -func (s noopNextBatchStore) Save(userID, nextBatch string) {} -func (s noopNextBatchStore) Load(userID string) string { return "" } - -// Client represents a Matrix client. -type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string - NextBatchStorer NextBatchStorer - ClientConfig api.ClientConfig -} - -func (cli *Client) buildURL(urlPath ...string) string { - ps := []string{cli.Prefix} - for _, p := range urlPath { - ps = append(ps, p) - } - return cli.buildBaseURL(ps...) -} - -func (cli *Client) buildBaseURL(urlPath ...string) string { - // copy the URL. Purposefully ignore error as the input is from a valid URL already - hsURL, _ := url.Parse(cli.HomeserverURL.String()) - parts := []string{hsURL.Path} - parts = append(parts, urlPath...) - hsURL.Path = path.Join(parts...) - query := hsURL.Query() - query.Set("access_token", cli.AccessToken) - hsURL.RawQuery = query.Encode() - return hsURL.String() -} - -func (cli *Client) buildURLWithQuery(urlPath []string, urlQuery map[string]string) string { - u, _ := url.Parse(cli.buildURL(urlPath...)) - q := u.Query() - for k, v := range urlQuery { - q.Set(k, v) - } - u.RawQuery = q.Encode() - return u.String() -} - -// JoinRoom joins the client to a room ID or alias. If serverName is specified, this will be added as a query param -// to instruct the homeserver to join via that server. If invitingUserID is specified, the inviting user ID will be -// inserted into the content of the join request. Returns a room ID. -func (cli *Client) JoinRoom(roomIDorAlias, serverName, invitingUserID string) (string, error) { - var urlPath string - if serverName != "" { - urlPath = cli.buildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ - "server_name": serverName, - }) - } else { - urlPath = cli.buildURL("join", roomIDorAlias) - } - - content := struct { - Inviter string `json:"inviter,omitempty"` - }{} - content.Inviter = invitingUserID - - resBytes, err := cli.sendJSON("POST", urlPath, content) - if err != nil { - return "", err - } - var joinRoomResponse joinRoomHTTPResponse - if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { - return "", err - } - return joinRoomResponse.RoomID, nil -} - -// SetDisplayName sets the user's profile display name -func (cli *Client) SetDisplayName(displayName string) error { - urlPath := cli.buildURL("profile", cli.UserID, "displayname") - s := struct { - DisplayName string `json:"displayname"` - }{displayName} - _, err := cli.sendJSON("PUT", urlPath, &s) - return err +// NEBStore implements the gomatrix.Storer interface. +// +// It persists the next batch token in the database, and includes a ClientConfig for the client. +type NEBStore struct { + gomatrix.InMemoryStore + Database database.Storer + ClientConfig api.ClientConfig } -// SendMessageEvent sends a message event into a room, returning the event_id on success. -// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal. -func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (string, error) { - txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) - urlPath := cli.buildURL("rooms", roomID, "send", eventType, txnID) - resBytes, err := cli.sendJSON("PUT", urlPath, contentJSON) - if err != nil { - return "", err - } - var sendEventResponse sendEventHTTPResponse - if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { - return "", err +// SaveNextBatch saves to the database. +func (s *NEBStore) SaveNextBatch(userID, nextBatch string) { + if err := s.Database.UpdateNextBatch(userID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") } - return sendEventResponse.EventID, nil } -// SendText sends an m.room.message event into the given room with a msgtype of m.text -func (cli *Client) SendText(roomID, text string) (string, error) { - return cli.SendMessageEvent(roomID, "m.room.message", - TextMessage{"m.text", text}) -} - -// UploadLink uploads an HTTP URL and then returns an MXC URI. -func (cli *Client) UploadLink(link string) (string, error) { - res, err := cli.httpClient.Get(link) - if res != nil { - defer res.Body.Close() - } +// LoadNextBatch loads from the database. +func (s *NEBStore) LoadNextBatch(userID string) string { + token, err := s.Database.LoadNextBatch(userID) if err != nil { - return "", err + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + }).Error("Failed to load next_batch token") + return "" } - return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) + return token } -// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI. -func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (string, error) { - req, err := http.NewRequest("POST", cli.buildBaseURL("_matrix/media/r0/upload"), content) - if err != nil { - return "", err - } - req.Header.Set("Content-Type", contentType) - req.ContentLength = contentLength - log.WithFields(log.Fields{ - "content_type": contentType, - "content_length": contentLength, - }).Print("Uploading to content repo") - res, err := cli.httpClient.Do(req) - if res != nil { - defer res.Body.Close() - } - if err != nil { - return "", err - } - if res.StatusCode != 200 { - return "", fmt.Errorf("Upload request returned HTTP %d", res.StatusCode) - } - m := struct { - ContentURI string `json:"content_uri"` - }{} - if err := json.NewDecoder(res.Body).Decode(&m); err != nil { - return "", err - } - return m.ContentURI, nil +// StarterLinkMessage represents a message with a starter_link custom data. +type StarterLinkMessage struct { + Body string + Link string } -// Sync starts syncing with the provided Homeserver. This function will be invoked continually. -// If Sync is called twice then the first sync will be stopped. -func (cli *Client) Sync() { - // Mark the client as syncing. - // We will keep syncing until the syncing state changes. Either because - // Sync is called or StopSync is called. - syncingID := cli.incrementSyncingID() - logger := log.WithFields(log.Fields{ - "syncing": syncingID, - "user_id": cli.UserID, - }) - - // TODO: Store the filter ID in the database - filterID, err := cli.createFilter() - if err != nil { - logger.WithError(err).Fatal("Failed to create filter") - // TODO: Maybe do some sort of error handling here? - } - cli.filterID = filterID - logger.WithField("filter", filterID).Print("Got filter ID") - nextToken := cli.NextBatchStorer.Load(cli.UserID) - - logger.WithField("next_batch", nextToken).Print("Starting sync") - - channel := make(chan syncHTTPResponse, 5) - - go func() { - for response := range channel { - cli.Worker.onSyncHTTPResponse(response) - } - }() - defer close(channel) - - for { - // Do a /sync - syncBytes, err := cli.doSync(30000, nextToken) - if err != nil { - logger.WithError(err).Warn("doSync failed") - time.Sleep(5 * time.Second) - continue - } - - // Decode sync response into syncHTTPResponse - var syncResponse syncHTTPResponse - if err = json.Unmarshal(syncBytes, &syncResponse); err != nil { - logger.WithError(err).Warn("Failed to decode sync data") - time.Sleep(5 * time.Second) - continue - } +// MarshalJSON converts this message into actual event content JSON. +func (m StarterLinkMessage) MarshalJSON() ([]byte, error) { + var data map[string]string - // Check that the syncing state hasn't changed - // Either because we've stopped syncing or another sync has been started. - // We discard the response from our sync. - if cli.getSyncingID() != syncingID { - logger.Print("Stopping sync") - return - } - - processResponse := cli.shouldProcessResponse(nextToken, &syncResponse) - nextToken = syncResponse.NextBatch - - // Save the token now *before* passing it through to the worker. This means it's possible - // to not process some events, but it means that we won't get constantly stuck processing - // a malformed/buggy event which keeps making us panic. - cli.NextBatchStorer.Save(cli.UserID, nextToken) - - if processResponse { - // Update client state - channel <- syncResponse + if m.Link != "" { + data = map[string]string{ + "org.matrix.neb.starter_link": m.Link, } } -} -// shouldProcessResponse returns true if the response should be processed. May modify the response to remove -// stuff that shouldn't be processed. -func (cli *Client) shouldProcessResponse(tokenOnSync string, syncResponse *syncHTTPResponse) bool { - if tokenOnSync == "" { - return false - } - // This is a horrible hack because /sync will return the most recent messages for a room - // as soon as you /join it. We do NOT want to process those events in that particular room - // because they may have already been processed (if you toggle the bot in/out of the room). - // - // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us - // exists and is "join" and then discard processing that room entirely if so. - // TODO: We probably want to process the !commands from after the last join event in the timeline. - for roomID, roomData := range syncResponse.Rooms.Join { - for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { - e := roomData.Timeline.Events[i] - if e.Type == "m.room.member" && e.StateKey == cli.UserID { - m := e.Content["membership"] - mship, ok := m.(string) - if !ok { - continue - } - if mship == "join" { - log.WithFields(log.Fields{ - "room_id": roomID, - "user_id": cli.UserID, - "start_token": tokenOnSync, - }).Info("Discarding /sync events in room: just joined it.") - _, ok := syncResponse.Rooms.Join[roomID] - if !ok { - panic("room " + roomID + " does not exist in Join?!") - } - delete(syncResponse.Rooms.Join, roomID) // don't re-process !commands - delete(syncResponse.Rooms.Invite, roomID) // don't re-process invites - break - } - } - } - } - return true -} - -func (cli *Client) incrementSyncingID() uint32 { - cli.syncingMutex.Lock() - defer cli.syncingMutex.Unlock() - cli.syncingID++ - return cli.syncingID -} - -func (cli *Client) getSyncingID() uint32 { - cli.syncingMutex.Lock() - defer cli.syncingMutex.Unlock() - return cli.syncingID -} - -// StopSync stops the ongoing sync started by Sync. -func (cli *Client) StopSync() { - // Advance the syncing state so that any running Syncs will terminate. - cli.incrementSyncingID() -} - -// This should only be called by the worker goroutine -func (cli *Client) getOrCreateRoom(roomID string) *Room { - room := cli.Rooms[roomID] - if room == nil { // create a new Room - room = NewRoom(roomID) - cli.Rooms[roomID] = room - } - return room -} - -func (cli *Client) sendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { - jsonStr, err := json.Marshal(contentJSON) - if err != nil { - return nil, err - } - req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - logger := log.WithFields(log.Fields{ - "method": method, - "url": httpURL, - "json": string(jsonStr), - }) - logger.Print("Sending JSON request") - res, err := cli.httpClient.Do(req) - if res != nil { - defer res.Body.Close() - } - if err != nil { - logger.WithError(err).Warn("Failed to send JSON request") - return nil, err + msg := struct { + MsgType string `json:"msgtype"` + Body string `json:"body"` + Data map[string]string `json:"data,omitempty"` + }{ + "m.notice", m.Body, data, } - contents, err := ioutil.ReadAll(res.Body) - if res.StatusCode >= 300 { - logger.WithFields(log.Fields{ - "code": res.StatusCode, - "body": string(contents), - }).Warn("Failed to send JSON request") - return nil, errors.HTTPError{ - Code: res.StatusCode, - Message: "Failed to " + method + " JSON: HTTP " + strconv.Itoa(res.StatusCode), - } - } - if err != nil { - logger.WithError(err).Warn("Failed to read response") - return nil, err - } - return contents, nil -} - -func (cli *Client) createFilter() (string, error) { - urlPath := cli.buildURL("user", cli.UserID, "filter") - resBytes, err := cli.sendJSON("POST", urlPath, &filterJSON) - if err != nil { - return "", err - } - var filterResponse filterHTTPResponse - if err = json.Unmarshal(resBytes, &filterResponse); err != nil { - return "", err - } - return filterResponse.FilterID, nil -} - -func (cli *Client) doSync(timeout int, since string) ([]byte, error) { - query := map[string]string{ - "timeout": strconv.Itoa(timeout), - } - if since != "" { - query["since"] = since - } - if cli.filterID != "" { - query["filter"] = cli.filterID - } - urlPath := cli.buildURLWithQuery([]string{"sync"}, query) - req, err := http.NewRequest("GET", urlPath, nil) - if err != nil { - return nil, err - } - res, err := cli.httpClient.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - contents, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, err - } - return contents, nil -} - -// NewClient creates a new Matrix Client ready for syncing -func NewClient(httpClient *http.Client, homeserverURL *url.URL, accessToken, userID string) *Client { - cli := Client{ - AccessToken: accessToken, - HomeserverURL: homeserverURL, - UserID: userID, - Prefix: "/_matrix/client/r0", - } - cli.Worker = newWorker(&cli) - // By default, use a no-op next_batch storer which will never save tokens and always - // "load" the empty string as a token. The client will work with this storer: it just won't - // remember the token across restarts. In practice, a database backend should be used. - cli.NextBatchStorer = noopNextBatchStore{} - cli.Rooms = make(map[string]*Room) - cli.httpClient = httpClient - - return &cli + return json.Marshal(msg) } diff --git a/src/github.com/matrix-org/go-neb/matrix/responses.go b/src/github.com/matrix-org/go-neb/matrix/responses.go deleted file mode 100644 index 284554e..0000000 --- a/src/github.com/matrix-org/go-neb/matrix/responses.go +++ /dev/null @@ -1,40 +0,0 @@ -package matrix - -type filterHTTPResponse struct { - FilterID string `json:"filter_id"` -} - -type joinRoomHTTPResponse struct { - RoomID string `json:"room_id"` -} - -type sendEventHTTPResponse struct { - EventID string `json:"event_id"` -} - -type syncHTTPResponse struct { - NextBatch string `json:"next_batch"` - AccountData struct { - Events []Event `json:"events"` - } `json:"account_data"` - Presence struct { - Events []Event `json:"events"` - } `json:"presence"` - Rooms struct { - Join map[string]struct { - State struct { - Events []Event `json:"events"` - } `json:"state"` - Timeline struct { - Events []Event `json:"events"` - Limited bool `json:"limited"` - PrevBatch string `json:"prev_batch"` - } `json:"timeline"` - } `json:"join"` - Invite map[string]struct { - State struct { - Events []Event - } `json:"invite_state"` - } `json:"invite"` - } `json:"rooms"` -} diff --git a/src/github.com/matrix-org/go-neb/matrix/worker.go b/src/github.com/matrix-org/go-neb/matrix/worker.go deleted file mode 100644 index 3674601..0000000 --- a/src/github.com/matrix-org/go-neb/matrix/worker.go +++ /dev/null @@ -1,81 +0,0 @@ -package matrix - -import ( - log "github.com/Sirupsen/logrus" - "runtime/debug" -) - -// Worker processes incoming events and updates the Matrix client's data structures. It also informs -// any attached listeners of the new events. -type Worker struct { - client *Client - listeners map[string][]OnEventListener // event type to listeners array -} - -// OnEventListener can be used with Worker.OnEventType to be informed of incoming events. -type OnEventListener func(*Event) - -func newWorker(client *Client) *Worker { - return &Worker{ - client, - make(map[string][]OnEventListener), - } -} - -// OnEventType allows callers to be notified when there are new events for the given event type. -// There are no duplicate checks. -func (worker *Worker) OnEventType(eventType string, callback OnEventListener) { - _, exists := worker.listeners[eventType] - if !exists { - worker.listeners[eventType] = []OnEventListener{} - } - worker.listeners[eventType] = append(worker.listeners[eventType], callback) -} - -func (worker *Worker) notifyListeners(event *Event) { - listeners, exists := worker.listeners[event.Type] - if !exists { - return - } - for _, fn := range listeners { - fn(event) - } -} - -func (worker *Worker) onSyncHTTPResponse(res syncHTTPResponse) { - defer func() { - if r := recover(); r != nil { - userID := "" - if worker.client != nil { - userID = worker.client.UserID - } - log.WithFields(log.Fields{ - "panic": r, - "user_id": userID, - }).Errorf( - "onSyncHTTPResponse panicked!\n%s", debug.Stack(), - ) - } - }() - - for roomID, roomData := range res.Rooms.Join { - room := worker.client.getOrCreateRoom(roomID) - for _, event := range roomData.State.Events { - event.RoomID = roomID - room.UpdateState(&event) - worker.notifyListeners(&event) - } - for _, event := range roomData.Timeline.Events { - event.RoomID = roomID - worker.notifyListeners(&event) - } - } - for roomID, roomData := range res.Rooms.Invite { - room := worker.client.getOrCreateRoom(roomID) - for _, event := range roomData.State.Events { - event.RoomID = roomID - room.UpdateState(&event) - worker.notifyListeners(&event) - } - } -} diff --git a/src/github.com/matrix-org/go-neb/services/echo/echo.go b/src/github.com/matrix-org/go-neb/services/echo/echo.go index 6630bef..32ae181 100644 --- a/src/github.com/matrix-org/go-neb/services/echo/echo.go +++ b/src/github.com/matrix-org/go-neb/services/echo/echo.go @@ -4,8 +4,8 @@ package echo import ( "strings" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the Echo service @@ -19,12 +19,12 @@ type Service struct { // Commands supported: // !echo some message // Responds with a notice of "some message". -func (e *Service) Commands(cli *matrix.Client) []types.Command { +func (e *Service) Commands(cli *gomatrix.Client) []types.Command { return []types.Command{ types.Command{ Path: []string{"echo"}, Command: func(roomID, userID string, args []string) (interface{}, error) { - return &matrix.TextMessage{"m.notice", strings.Join(args, " ")}, nil + return &gomatrix.TextMessage{"m.notice", strings.Join(args, " ")}, nil }, }, } diff --git a/src/github.com/matrix-org/go-neb/services/giphy/giphy.go b/src/github.com/matrix-org/go-neb/services/giphy/giphy.go index a38f741..6bc51be 100644 --- a/src/github.com/matrix-org/go-neb/services/giphy/giphy.go +++ b/src/github.com/matrix-org/go-neb/services/giphy/giphy.go @@ -10,8 +10,8 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the Giphy service. @@ -50,7 +50,7 @@ type Service struct { // Commands supported: // !giphy some search query without quotes // Responds with a suitable GIF into the same room as the command. -func (s *Service) Commands(client *matrix.Client) []types.Command { +func (s *Service) Commands(client *gomatrix.Client) []types.Command { return []types.Command{ types.Command{ Path: []string{"giphy"}, @@ -61,7 +61,7 @@ func (s *Service) Commands(client *matrix.Client) []types.Command { } } -func (s *Service) cmdGiphy(client *matrix.Client, roomID, userID string, args []string) (interface{}, error) { +func (s *Service) cmdGiphy(client *gomatrix.Client, roomID, userID string, args []string) (interface{}, error) { // only 1 arg which is the text to search for. query := strings.Join(args, " ") gifResult, err := s.searchGiphy(query) @@ -71,16 +71,16 @@ func (s *Service) cmdGiphy(client *matrix.Client, roomID, userID string, args [] if gifResult.Images.Original.URL == "" { return nil, fmt.Errorf("No results") } - mxc, err := client.UploadLink(gifResult.Images.Original.URL) + resUpload, err := client.UploadLink(gifResult.Images.Original.URL) if err != nil { return nil, err } - return matrix.ImageMessage{ + return gomatrix.ImageMessage{ MsgType: "m.image", Body: gifResult.Slug, - URL: mxc, - Info: matrix.ImageInfo{ + URL: resUpload.ContentURI, + Info: gomatrix.ImageInfo{ Height: asInt(gifResult.Images.Original.Height), Width: asInt(gifResult.Images.Original.Width), Mimetype: "image/gif", diff --git a/src/github.com/matrix-org/go-neb/services/github/github.go b/src/github.com/matrix-org/go-neb/services/github/github.go index 048c50c..5de367b 100644 --- a/src/github.com/matrix-org/go-neb/services/github/github.go +++ b/src/github.com/matrix-org/go-neb/services/github/github.go @@ -18,6 +18,7 @@ import ( "github.com/matrix-org/go-neb/realms/github" "github.com/matrix-org/go-neb/services/github/client" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the Github service @@ -71,7 +72,7 @@ func (s *Service) cmdGithubCreate(roomID, userID string, args []string) (interfa }, nil } if len(args) == 0 { - return &matrix.TextMessage{"m.notice", + return &gomatrix.TextMessage{"m.notice", `Usage: !github create owner/repo "issue title" "description"`}, nil } @@ -85,13 +86,13 @@ func (s *Service) cmdGithubCreate(roomID, userID string, args []string) (interfa // look for a default repo defaultRepo := s.defaultRepo(roomID) if defaultRepo == "" { - return &matrix.TextMessage{"m.notice", + return &gomatrix.TextMessage{"m.notice", `Usage: !github create owner/repo "issue title" "description"`}, nil } // default repo should pass the regexp ownerRepoGroups = ownerRepoRegex.FindStringSubmatch(defaultRepo) if len(ownerRepoGroups) == 0 { - return &matrix.TextMessage{"m.notice", + return &gomatrix.TextMessage{"m.notice", `Malformed default repo. Usage: !github create owner/repo "issue title" "description"`}, nil } @@ -127,7 +128,7 @@ func (s *Service) cmdGithubCreate(roomID, userID string, args []string) (interfa return nil, fmt.Errorf("Failed to create issue. HTTP %d", res.StatusCode) } - return matrix.TextMessage{"m.notice", fmt.Sprintf("Created issue: %s", *issue.HTMLURL)}, nil + return gomatrix.TextMessage{"m.notice", fmt.Sprintf("Created issue: %s", *issue.HTMLURL)}, nil } func (s *Service) expandIssue(roomID, userID, owner, repo string, issueNum int) interface{} { @@ -143,7 +144,7 @@ func (s *Service) expandIssue(roomID, userID, owner, repo string, issueNum int) return nil } - return &matrix.TextMessage{ + return &gomatrix.TextMessage{ "m.notice", fmt.Sprintf("%s : %s", *i.HTMLURL, *i.Title), } @@ -154,7 +155,7 @@ func (s *Service) expandIssue(roomID, userID, owner, repo string, issueNum int) // Responds with the outcome of the issue creation request. This command requires // a Github account to be linked to the Matrix user ID issuing the command. If there // is no link, it will return a Starter Link instead. -func (s *Service) Commands(cli *matrix.Client) []types.Command { +func (s *Service) Commands(cli *gomatrix.Client) []types.Command { return []types.Command{ types.Command{ Path: []string{"github", "create"}, @@ -171,7 +172,7 @@ func (s *Service) Commands(cli *matrix.Client) []types.Command { // it will also expand strings of the form: // #12 // using the default repository. -func (s *Service) Expansions(cli *matrix.Client) []types.Expansion { +func (s *Service) Expansions(cli *gomatrix.Client) []types.Expansion { return []types.Expansion{ types.Expansion{ Regexp: ownerRepoIssueRegex, @@ -220,7 +221,7 @@ func (s *Service) Expansions(cli *matrix.Client) []types.Expansion { } // Register makes sure that the given realm ID maps to a github realm. -func (s *Service) Register(oldService types.Service, client *matrix.Client) error { +func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error { if s.RealmID == "" { return fmt.Errorf("RealmID is required") } diff --git a/src/github.com/matrix-org/go-neb/services/github/github_webhook.go b/src/github.com/matrix-org/go-neb/services/github/github_webhook.go index c5b05b9..6951874 100644 --- a/src/github.com/matrix-org/go-neb/services/github/github_webhook.go +++ b/src/github.com/matrix-org/go-neb/services/github/github_webhook.go @@ -9,11 +9,11 @@ import ( log "github.com/Sirupsen/logrus" gogithub "github.com/google/go-github/github" "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/services/github/client" "github.com/matrix-org/go-neb/services/github/webhook" "github.com/matrix-org/go-neb/types" "github.com/matrix-org/go-neb/util" + "github.com/matrix-org/gomatrix" ) // WebhookServiceType of the Github Webhook service. @@ -80,7 +80,7 @@ type WebhookService struct { // // If the "owner/repo" string doesn't exist in this Service config, then the webhook will be deleted from // Github. -func (s *WebhookService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { +func (s *WebhookService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) { evType, repo, msg, err := webhook.OnReceiveRequest(req, s.SecretToken) if err != nil { w.WriteHeader(err.Code) @@ -145,7 +145,7 @@ func (s *WebhookService) OnReceiveWebhook(w http.ResponseWriter, req *http.Reque // // Hooks can get out of sync if a user manually deletes a hook in the Github UI. In this case, toggling the repo configuration will // force NEB to recreate the hook. -func (s *WebhookService) Register(oldService types.Service, client *matrix.Client) error { +func (s *WebhookService) Register(oldService types.Service, client *gomatrix.Client) error { if s.RealmID == "" || s.ClientUserID == "" { return fmt.Errorf("RealmID and ClientUserID is required") } @@ -251,9 +251,9 @@ func (s *WebhookService) PostRegister(oldService types.Service) { } } -func (s *WebhookService) joinWebhookRooms(client *matrix.Client) error { +func (s *WebhookService) joinWebhookRooms(client *gomatrix.Client) error { for roomID := range s.Rooms { - if _, err := client.JoinRoom(roomID, "", ""); err != nil { + if _, err := client.JoinRoom(roomID, "", nil); err != nil { // TODO: Leave the rooms we successfully joined? return err } diff --git a/src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go b/src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go index 87a8385..467d033 100644 --- a/src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go +++ b/src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go @@ -14,14 +14,14 @@ import ( log "github.com/Sirupsen/logrus" "github.com/google/go-github/github" "github.com/matrix-org/go-neb/errors" - "github.com/matrix-org/go-neb/matrix" + "github.com/matrix-org/gomatrix" ) // OnReceiveRequest processes incoming github webhook requests and returns a // matrix message to send, along with parsed repo information. // The secretToken, if supplied, will be used to verify the request is from // Github. If it isn't, an error is returned. -func OnReceiveRequest(r *http.Request, secretToken string) (string, *github.Repository, *matrix.HTMLMessage, *errors.HTTPError) { +func OnReceiveRequest(r *http.Request, secretToken string) (string, *github.Repository, *gomatrix.HTMLMessage, *errors.HTTPError) { // Verify the HMAC signature if NEB was configured with a secret token eventType := r.Header.Get("X-GitHub-Event") signatureSHA1 := r.Header.Get("X-Hub-Signature") @@ -67,7 +67,7 @@ func OnReceiveRequest(r *http.Request, secretToken string) (string, *github.Repo return "", nil, nil, &errors.HTTPError{nil, "Failed to parse github event", 500} } - msg := matrix.GetHTMLMessage("m.notice", htmlStr) + msg := gomatrix.GetHTMLMessage("m.notice", htmlStr) return refinedType, repo, &msg, nil } diff --git a/src/github.com/matrix-org/go-neb/services/guggy/guggy.go b/src/github.com/matrix-org/go-neb/services/guggy/guggy.go index 940a416..c027544 100644 --- a/src/github.com/matrix-org/go-neb/services/guggy/guggy.go +++ b/src/github.com/matrix-org/go-neb/services/guggy/guggy.go @@ -11,8 +11,8 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the Guggy service @@ -49,7 +49,7 @@ type Service struct { // Commands supported: // !guggy some search query without quotes // Responds with a suitable GIF into the same room as the command. -func (s *Service) Commands(client *matrix.Client) []types.Command { +func (s *Service) Commands(client *gomatrix.Client) []types.Command { return []types.Command{ types.Command{ Path: []string{"guggy"}, @@ -59,7 +59,7 @@ func (s *Service) Commands(client *matrix.Client) []types.Command { }, } } -func (s *Service) cmdGuggy(client *matrix.Client, roomID, userID string, args []string) (interface{}, error) { +func (s *Service) cmdGuggy(client *gomatrix.Client, roomID, userID string, args []string) (interface{}, error) { // only 1 arg which is the text to search for. querySentence := strings.Join(args, " ") gifResult, err := s.text2gifGuggy(querySentence) @@ -68,22 +68,22 @@ func (s *Service) cmdGuggy(client *matrix.Client, roomID, userID string, args [] } if gifResult.GIF == "" { - return matrix.TextMessage{ + return gomatrix.TextMessage{ MsgType: "m.text.notice", Body: "No GIF found!", }, nil } - mxc, err := client.UploadLink(gifResult.GIF) + resUpload, err := client.UploadLink(gifResult.GIF) if err != nil { return nil, fmt.Errorf("Failed to upload Guggy image to matrix: %s", err.Error()) } - return matrix.ImageMessage{ + return gomatrix.ImageMessage{ MsgType: "m.image", Body: querySentence, - URL: mxc, - Info: matrix.ImageInfo{ + URL: resUpload.ContentURI, + Info: gomatrix.ImageInfo{ Height: uint(math.Floor(gifResult.Height)), Width: uint(math.Floor(gifResult.Width)), Mimetype: "image/gif", diff --git a/src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go b/src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go index 480b8b8..688649c 100644 --- a/src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go +++ b/src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go @@ -4,15 +4,15 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" - "github.com/matrix-org/go-neb/testutils" - "github.com/matrix-org/go-neb/types" "io/ioutil" "net/http" - "net/url" "strings" "testing" + + "github.com/matrix-org/go-neb/database" + "github.com/matrix-org/go-neb/testutils" + "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // TODO: It would be nice to tabularise this test so we can try failing different combinations of responses to make @@ -86,8 +86,8 @@ func TestCommand(t *testing.T) { } return nil, fmt.Errorf("Unknown URL: %s", req.URL.String()) } - u, _ := url.Parse("https://hyrule") - matrixCli := matrix.NewClient(&http.Client{Transport: matrixTrans}, u, "its_a_secret", "@guggybot:hyrule") + matrixCli, _ := gomatrix.NewClient("https://hyrule", "@guggybot:hyrule", "its_a_secret") + matrixCli.Client = &http.Client{Transport: matrixTrans} // Execute the matrix !command cmds := guggy.Commands(matrixCli) diff --git a/src/github.com/matrix-org/go-neb/services/jira/jira.go b/src/github.com/matrix-org/go-neb/services/jira/jira.go index dcadd26..d4e8735 100644 --- a/src/github.com/matrix-org/go-neb/services/jira/jira.go +++ b/src/github.com/matrix-org/go-neb/services/jira/jira.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/go-neb/realms/jira/urls" "github.com/matrix-org/go-neb/services/jira/webhook" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the JIRA Service @@ -72,7 +73,7 @@ type Service struct { // Register ensures that the given realm IDs are valid JIRA realms and registers webhooks // with those JIRA endpoints. -func (s *Service) Register(oldService types.Service, client *matrix.Client) error { +func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error { // We only ever make 1 JIRA webhook which listens for all projects and then filter // on receive. So we simply need to know if we need to make a webhook or not. We // need to do this for each unique realm. @@ -163,7 +164,7 @@ func (s *Service) cmdJiraCreate(roomID, userID string, args []string) (interface return nil, fmt.Errorf("Failed to create issue: JIRA returned %d", res.StatusCode) } - return &matrix.TextMessage{ + return &gomatrix.TextMessage{ "m.notice", fmt.Sprintf("Created issue: %sbrowse/%s", r.JIRAEndpoint, i.Key), }, nil @@ -220,7 +221,7 @@ func (s *Service) expandIssue(roomID, userID string, issueKeyGroups []string) in logger.WithError(err).Print("Failed to GET issue") return err } - return matrix.GetHTMLMessage( + return gomatrix.GetHTMLMessage( "m.notice", fmt.Sprintf( "%sbrowse/%s : %s", @@ -238,7 +239,7 @@ func (s *Service) expandIssue(roomID, userID string, issueKeyGroups []string) in // same project key, which project is chosen is undefined. If there // is no JIRA account linked to the Matrix user ID, it will return a Starter Link // if there is a known public project with that project key. -func (s *Service) Commands(cli *matrix.Client) []types.Command { +func (s *Service) Commands(cli *gomatrix.Client) []types.Command { return []types.Command{ types.Command{ Path: []string{"jira", "create"}, @@ -255,7 +256,7 @@ func (s *Service) Commands(cli *matrix.Client) []types.Command { // to map the project key to a realm, and subsequently the JIRA endpoint to hit. // If there are multiple projects with the same project key in the Service Config, one will // be chosen arbitrarily. -func (s *Service) Expansions(cli *matrix.Client) []types.Expansion { +func (s *Service) Expansions(cli *gomatrix.Client) []types.Expansion { return []types.Expansion{ types.Expansion{ Regexp: issueKeyRegex, @@ -267,7 +268,7 @@ func (s *Service) Expansions(cli *matrix.Client) []types.Expansion { } // OnReceiveWebhook receives requests from JIRA and possibly sends requests to Matrix as a result. -func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { +func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) { eventProjectKey, event, httpErr := webhook.OnReceiveRequest(req) if httpErr != nil { log.WithError(httpErr).Print("Failed to handle JIRA webhook") @@ -296,7 +297,7 @@ func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli continue } _, msgErr := cli.SendMessageEvent( - roomID, "m.room.message", matrix.GetHTMLMessage("m.notice", htmlText), + roomID, "m.room.message", gomatrix.GetHTMLMessage("m.notice", htmlText), ) if msgErr != nil { log.WithFields(log.Fields{ diff --git a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go index ccc9885..a0112af 100644 --- a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go +++ b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go @@ -14,9 +14,9 @@ import ( "github.com/die-net/lrucache" "github.com/gregjones/httpcache" "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/polling" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" "github.com/mmcdole/gofeed" "github.com/prometheus/client_golang/prometheus" ) @@ -71,7 +71,7 @@ type Service struct { } // Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned. -func (s *Service) Register(oldService types.Service, client *matrix.Client) error { +func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error { if len(s.Feeds) == 0 { // this is an error UNLESS the old service had some feeds in which case they are deleting us :( var numOldFeeds int @@ -100,7 +100,7 @@ func (s *Service) Register(oldService types.Service, client *matrix.Client) erro return nil } -func (s *Service) joinRooms(client *matrix.Client) { +func (s *Service) joinRooms(client *gomatrix.Client) { roomSet := make(map[string]bool) for _, feedInfo := range s.Feeds { for _, roomID := range feedInfo.Rooms { @@ -109,7 +109,7 @@ func (s *Service) joinRooms(client *matrix.Client) { } for roomID := range roomSet { - if _, err := client.JoinRoom(roomID, "", ""); err != nil { + if _, err := client.JoinRoom(roomID, "", nil); err != nil { log.WithFields(log.Fields{ log.ErrorKey: err, "room_id": roomID, @@ -144,7 +144,7 @@ func (s *Service) PostRegister(oldService types.Service) { // - Else if there is a Title field, use it as the GUID. // // Returns a timestamp representing when this Service should have OnPoll called again. -func (s *Service) OnPoll(cli *matrix.Client) time.Time { +func (s *Service) OnPoll(cli *gomatrix.Client) time.Time { logger := log.WithFields(log.Fields{ "service_id": s.ServiceID(), "service_type": s.ServiceType(), @@ -340,7 +340,7 @@ func (s *Service) newItems(feedURL string, allItems []*gofeed.Item) (items []gof return } -func (s *Service) sendToRooms(cli *matrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { +func (s *Service) sendToRooms(cli *gomatrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error { logger := log.WithFields(log.Fields{ "feed_url": feedURL, "title": item.Title, @@ -356,8 +356,8 @@ func (s *Service) sendToRooms(cli *matrix.Client, feedURL string, feed *gofeed.F } // SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag ) -func itemToHTML(feed *gofeed.Feed, item gofeed.Item) matrix.HTMLMessage { - return matrix.GetHTMLMessage("m.notice", fmt.Sprintf( +func itemToHTML(feed *gofeed.Feed, item gofeed.Item) gomatrix.HTMLMessage { + return gomatrix.GetHTMLMessage("m.notice", fmt.Sprintf( "%s posted a new article: %s ( %s )", html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link), )) diff --git a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go index 4912d8c..5db87e6 100644 --- a/src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go +++ b/src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go @@ -6,16 +6,15 @@ import ( "errors" "io/ioutil" "net/http" - "net/url" "strings" "sync" "testing" "time" "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/testutils" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) const rssFeedXML = ` @@ -75,7 +74,7 @@ func TestHTMLEntities(t *testing.T) { matrixTrans.RT = func(req *http.Request) (*http.Response, error) { if strings.HasPrefix(req.URL.Path, "/_matrix/client/r0/rooms/!linksroom:hyrule/send/m.room.message") { // Check content body to make sure it is decoded - var msg matrix.HTMLMessage + var msg gomatrix.HTMLMessage if err := json.NewDecoder(req.Body).Decode(&msg); err != nil { t.Fatal("Failed to decode request JSON: ", err) return nil, errors.New("Error handling matrix client test request") @@ -95,8 +94,8 @@ func TestHTMLEntities(t *testing.T) { } return nil, errors.New("Unhandled matrix client test request") } - u, _ := url.Parse("https://hyrule") - matrixClient := matrix.NewClient(&http.Client{Transport: matrixTrans}, u, "its_a_secret", "@happy_mask_salesman:hyrule") + matrixClient, _ := gomatrix.NewClient("https://hyrule", "@happy_mask_salesman:hyrule", "its_a_secret") + matrixClient.Client = &http.Client{Transport: matrixTrans} // Invoke OnPoll to trigger the RSS feed update _ = rssbot.OnPoll(matrixClient) diff --git a/src/github.com/matrix-org/go-neb/services/travisci/travisci.go b/src/github.com/matrix-org/go-neb/services/travisci/travisci.go index 89ad304..62a0565 100644 --- a/src/github.com/matrix-org/go-neb/services/travisci/travisci.go +++ b/src/github.com/matrix-org/go-neb/services/travisci/travisci.go @@ -12,8 +12,8 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) // ServiceType of the Travis-CI service. @@ -178,7 +178,7 @@ func outputForTemplate(travisTmpl string, tmpl map[string]string) (out string) { // webhooks: http://go-neb-endpoint.com/notifications // // See https://docs.travis-ci.com/user/notifications#Webhook-notifications for more information. -func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { +func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) { if err := req.ParseForm(); err != nil { log.WithError(err).Error("Failed to read incoming Travis-CI webhook form") w.WriteHeader(400) @@ -222,7 +222,7 @@ func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli if ownerRepo != whForRepo { continue } - msg := matrix.TextMessage{ + msg := gomatrix.TextMessage{ Body: outputForTemplate(repoData.Template, tmplData), MsgType: "m.notice", } @@ -241,7 +241,7 @@ func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli } // Register makes sure the Config information supplied is valid. -func (s *Service) Register(oldService types.Service, client *matrix.Client) error { +func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error { s.WebhookURL = s.webhookEndpointURL for _, roomData := range s.Rooms { for repo := range roomData.Repos { @@ -273,9 +273,9 @@ func (s *Service) PostRegister(oldService types.Service) { } } -func (s *Service) joinRooms(client *matrix.Client) { +func (s *Service) joinRooms(client *gomatrix.Client) { for roomID := range s.Rooms { - if _, err := client.JoinRoom(roomID, "", ""); err != nil { + if _, err := client.JoinRoom(roomID, "", nil); err != nil { log.WithFields(log.Fields{ log.ErrorKey: err, "room_id": roomID, diff --git a/src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go b/src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go index 3c1d695..7bcbf61 100644 --- a/src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go +++ b/src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go @@ -7,14 +7,13 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "net/url" "strings" "testing" "github.com/matrix-org/go-neb/database" - "github.com/matrix-org/go-neb/matrix" "github.com/matrix-org/go-neb/testutils" "github.com/matrix-org/go-neb/types" + "github.com/matrix-org/gomatrix" ) const travisOrgPEMPublicKey = (`-----BEGIN PUBLIC KEY----- @@ -115,13 +114,13 @@ func TestTravisCI(t *testing.T) { httpClient = &http.Client{Transport: travisTransport} // Intercept message sending to Matrix and mock responses - msgs := []matrix.TextMessage{} + msgs := []gomatrix.TextMessage{} matrixTrans := struct{ testutils.MockTransport }{} matrixTrans.RT = func(req *http.Request) (*http.Response, error) { if !strings.Contains(req.URL.String(), "/send/m.room.message") { return nil, fmt.Errorf("Unhandled URL: %s", req.URL.String()) } - var msg matrix.TextMessage + var msg gomatrix.TextMessage if err := json.NewDecoder(req.Body).Decode(&msg); err != nil { return nil, fmt.Errorf("Failed to decode request JSON: %s", err) } @@ -131,13 +130,13 @@ func TestTravisCI(t *testing.T) { Body: ioutil.NopCloser(bytes.NewBufferString(`{"event_id":"$yup:event"}`)), }, nil } - u, _ := url.Parse("https://hyrule") - matrixCli := matrix.NewClient(&http.Client{Transport: matrixTrans}, u, "its_a_secret", "@travisci:hyrule") + matrixCli, _ := gomatrix.NewClient("https://hyrule", "@travisci:hyrule", "its_a_secret") + matrixCli.Client = &http.Client{Transport: matrixTrans} // BEGIN running the Travis-CI table tests // --------------------------------------- for _, test := range travisTests { - msgs = []matrix.TextMessage{} // reset sent messages + msgs = []gomatrix.TextMessage{} // reset sent messages mockWriter := httptest.NewRecorder() travis := makeService(t, test.Template) if travis == nil { @@ -173,7 +172,7 @@ func TestTravisCI(t *testing.T) { } } -func assertResponse(t *testing.T, w *httptest.ResponseRecorder, msgs []matrix.TextMessage, expectCode int, expectMsgLength int) bool { +func assertResponse(t *testing.T, w *httptest.ResponseRecorder, msgs []gomatrix.TextMessage, expectCode int, expectMsgLength int) bool { if w.Code != expectCode { t.Errorf("TestTravisCI OnReceiveWebhook want HTTP code %d, got %d", expectCode, w.Code) return false diff --git a/src/github.com/matrix-org/go-neb/types/service.go b/src/github.com/matrix-org/go-neb/types/service.go index e776cc8..832f7d6 100644 --- a/src/github.com/matrix-org/go-neb/types/service.go +++ b/src/github.com/matrix-org/go-neb/types/service.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/matrix-org/go-neb/matrix" + "github.com/matrix-org/gomatrix" ) // BotOptions for a given bot user in a given room @@ -23,7 +23,7 @@ type BotOptions struct { type Poller interface { // OnPoll is called when the poller should poll. Return the timestamp when you want to be polled again. // Return 0 to never be polled again. - OnPoll(client *matrix.Client) time.Time + OnPoll(client *gomatrix.Client) time.Time } // A Service is the configuration for a bot service. @@ -34,14 +34,14 @@ type Service interface { ServiceID() string // Return the type of service. This string MUST NOT change. ServiceType() string - Commands(cli *matrix.Client) []Command - Expansions(cli *matrix.Client) []Expansion - OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) + Commands(cli *gomatrix.Client) []Command + Expansions(cli *gomatrix.Client) []Expansion + OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) // A lifecycle function which is invoked when the service is being registered. The old service, if one exists, is provided, // along with a Client instance for ServiceUserID(). If this function returns an error, the service will not be registered // or persisted to the database, and the user's request will fail. This can be useful if you depend on external factors // such as registering webhooks. - Register(oldService Service, client *matrix.Client) error + Register(oldService Service, client *gomatrix.Client) error // A lifecycle function which is invoked after the service has been successfully registered and persisted to the database. // This function is invoked within the critical section for configuring services, guaranteeing that there will not be // concurrent modifications to this service whilst this function executes. This lifecycle hook should be used to clean @@ -82,23 +82,23 @@ func (s *DefaultService) ServiceType() string { } // Commands returns no commands. -func (s *DefaultService) Commands(cli *matrix.Client) []Command { +func (s *DefaultService) Commands(cli *gomatrix.Client) []Command { return []Command{} } // Expansions returns no expansions. -func (s *DefaultService) Expansions(cli *matrix.Client) []Expansion { +func (s *DefaultService) Expansions(cli *gomatrix.Client) []Expansion { return []Expansion{} } // Register does nothing and returns no error. -func (s *DefaultService) Register(oldService Service, client *matrix.Client) error { return nil } +func (s *DefaultService) Register(oldService Service, client *gomatrix.Client) error { return nil } // PostRegister does nothing. func (s *DefaultService) PostRegister(oldService Service) {} // OnReceiveWebhook does nothing but 200 OK the request. -func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *matrix.Client) { +func (s *DefaultService) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) { w.WriteHeader(200) // Do nothing } diff --git a/vendor/manifest b/vendor/manifest index 7ef95ff..1027008 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -135,6 +135,12 @@ "revision": "193b8f88e381d12f2d53023fba25e43fc81dc5ac", "branch": "master" }, + { + "importpath": "github.com/matrix-org/gomatrix", + "repository": "https://github.com/matrix-org/gomatrix", + "revision": "e66d1ef529b7851262b49dc42a26ff1f1d1d9e4d", + "branch": "master" + }, { "importpath": "github.com/mattn/go-shellwords", "repository": "https://github.com/mattn/go-shellwords", diff --git a/vendor/src/github.com/matrix-org/gomatrix/LICENSE b/vendor/src/github.com/matrix-org/gomatrix/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/src/github.com/matrix-org/gomatrix/README.md b/vendor/src/github.com/matrix-org/gomatrix/README.md new file mode 100644 index 0000000..4f6df02 --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/README.md @@ -0,0 +1,4 @@ +# gomatrix +[![GoDoc](https://godoc.org/github.com/matrix-org/gomatrix?status.svg)](https://godoc.org/github.com/matrix-org/gomatrix) + +A Golang Matrix client diff --git a/vendor/src/github.com/matrix-org/gomatrix/client.go b/vendor/src/github.com/matrix-org/gomatrix/client.go new file mode 100644 index 0000000..d061e3c --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/client.go @@ -0,0 +1,381 @@ +// Package gomatrix implements the Matrix Client-Server API. +// +// Specification can be found at http://matrix.org/docs/spec/client_server/r0.2.0.html +// +// Example usage of this library: (blocking version) +// cli, _ := gomatrix.NewClient("https://matrix.org", "@example:matrix.org", "MDAefhiuwehfuiwe") +// syncer := cli.Syncer.(*gomatrix.DefaultSyncer) +// syncer.OnEventType("m.room.message", func(ev *gomatrix.Event) { +// fmt.Println("Message: ", ev) +// }) +// if err := cli.Sync(); err != nil { +// fmt.Println("Sync() returned ", err) +// } +// +// To make the example non-blocking, call Sync() in a goroutine. +package gomatrix + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "sync" + "time" +) + +// Client represents a Matrix client. +type Client struct { + HomeserverURL *url.URL // The base homeserver URL + Prefix string // The API prefix eg '/_matrix/client/r0' + UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID. + AccessToken string // The access_token for the client. + syncingMutex sync.Mutex // protects syncingID + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + Client *http.Client // The underlying HTTP client which will be used to make HTTP requests. + Syncer Syncer // The thing which can process /sync responses + Store Storer // The thing which can store rooms/tokens/ids +} + +// HTTPError An HTTP Error response, which may wrap an underlying native Go Error. +type HTTPError struct { + WrappedError error + Message string + Code int +} + +func (e HTTPError) Error() string { + var wrappedErrMsg string + if e.WrappedError != nil { + wrappedErrMsg = e.WrappedError.Error() + } + return fmt.Sprintf("msg=%s code=%d wrapped=%s", e.Message, e.Code, wrappedErrMsg) +} + +// BuildURL builds a URL with the Client's homserver/prefix/access_token set already. +func (cli *Client) BuildURL(urlPath ...string) string { + ps := []string{cli.Prefix} + for _, p := range urlPath { + ps = append(ps, p) + } + return cli.BuildBaseURL(ps...) +} + +// BuildBaseURL builds a URL with the Client's homeserver/access_token set already. You must +// supply the prefix in the path. +func (cli *Client) BuildBaseURL(urlPath ...string) string { + // copy the URL. Purposefully ignore error as the input is from a valid URL already + hsURL, _ := url.Parse(cli.HomeserverURL.String()) + parts := []string{hsURL.Path} + parts = append(parts, urlPath...) + hsURL.Path = path.Join(parts...) + query := hsURL.Query() + query.Set("access_token", cli.AccessToken) + hsURL.RawQuery = query.Encode() + return hsURL.String() +} + +// BuildURLWithQuery builds a URL with query paramters in addition to the Client's homeserver/prefix/access_token set already. +func (cli *Client) BuildURLWithQuery(urlPath []string, urlQuery map[string]string) string { + u, _ := url.Parse(cli.BuildURL(urlPath...)) + q := u.Query() + for k, v := range urlQuery { + q.Set(k, v) + } + u.RawQuery = q.Encode() + return u.String() +} + +// Sync starts syncing with the provided Homeserver. This function will block until a fatal /sync error occurs, so should +// almost always be started as a new goroutine. If Sync() is called twice then the first sync will be stopped. +func (cli *Client) Sync() error { + // Mark the client as syncing. + // We will keep syncing until the syncing state changes. Either because + // Sync is called or StopSync is called. + syncingID := cli.incrementSyncingID() + nextBatch := cli.Store.LoadNextBatch(cli.UserID) + filterID := cli.Store.LoadFilterID(cli.UserID) + if filterID == "" { + filterJSON := cli.Syncer.GetFilterJSON(cli.UserID) + resFilter, err := cli.CreateFilter(filterJSON) + if err != nil { + return err + } + filterID = resFilter.FilterID + cli.Store.SaveFilterID(cli.UserID, filterID) + } + + for { + resSync, err := cli.SyncRequest(30000, nextBatch, filterID, false, "") + if err != nil { + duration, err2 := cli.Syncer.OnFailedSync(resSync, err) + if err2 != nil { + return err2 + } + time.Sleep(duration) + continue + } + + // Check that the syncing state hasn't changed + // Either because we've stopped syncing or another sync has been started. + // We discard the response from our sync. + if cli.getSyncingID() != syncingID { + return nil + } + + // Save the token now *before* processing it. This means it's possible + // to not process some events, but it means that we won't get constantly stuck processing + // a malformed/buggy event which keeps making us panic. + cli.Store.SaveNextBatch(cli.UserID, resSync.NextBatch) + if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil { + return err + } + + nextBatch = resSync.NextBatch + } +} + +func (cli *Client) incrementSyncingID() uint32 { + cli.syncingMutex.Lock() + defer cli.syncingMutex.Unlock() + cli.syncingID++ + return cli.syncingID +} + +func (cli *Client) getSyncingID() uint32 { + cli.syncingMutex.Lock() + defer cli.syncingMutex.Unlock() + return cli.syncingID +} + +// StopSync stops the ongoing sync started by Sync. +func (cli *Client) StopSync() { + // Advance the syncing state so that any running Syncs will terminate. + cli.incrementSyncingID() +} + +// SendJSON sends JSON to the given URL. +// +// Returns the HTTP body as bytes on 2xx. Returns an error if the response is not 2xx. This error +// is an HTTPError which includes the returned HTTP status code and possibly a RespError as the +// WrappedError, if the HTTP body could be decoded as a RespError. +func (cli *Client) SendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { + jsonStr, err := json.Marshal(contentJSON) + if err != nil { + return nil, err + } + req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + res, err := cli.Client.Do(req) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + contents, err := ioutil.ReadAll(res.Body) + if res.StatusCode >= 300 || res.StatusCode < 200 { + var wrap error + var respErr RespError + if _ = json.Unmarshal(contents, respErr); respErr.ErrCode != "" { + wrap = respErr + } + + // If we failed to decode as RespError, don't just drop the HTTP body, include it in the + // HTTP error instead (e.g proxy errors which return HTML). + msg := "Failed to " + method + " JSON" + if wrap == nil { + msg = msg + ": " + string(contents) + } + + return nil, HTTPError{ + Code: res.StatusCode, + Message: msg, + WrappedError: wrap, + } + } + if err != nil { + return nil, err + } + return contents, nil +} + +// CreateFilter makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter +func (cli *Client) CreateFilter(filter json.RawMessage) (*RespCreateFilter, error) { + urlPath := cli.BuildURL("user", cli.UserID, "filter") + resBytes, err := cli.SendJSON("POST", urlPath, &filter) + if err != nil { + return nil, err + } + var filterResponse RespCreateFilter + if err = json.Unmarshal(resBytes, &filterResponse); err != nil { + return nil, err + } + return &filterResponse, nil +} + +// SyncRequest makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync +func (cli *Client) SyncRequest(timeout int, since, filterID string, fullState bool, setPresence string) (*RespSync, error) { + query := map[string]string{ + "timeout": strconv.Itoa(timeout), + } + if since != "" { + query["since"] = since + } + if filterID != "" { + query["filter"] = filterID + } + if setPresence != "" { + query["set_presence"] = setPresence + } + if fullState { + query["full_state"] = "true" + } + urlPath := cli.BuildURLWithQuery([]string{"sync"}, query) + req, err := http.NewRequest("GET", urlPath, nil) + if err != nil { + return nil, err + } + res, err := cli.Client.Do(req) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + + var syncResponse RespSync + err = json.NewDecoder(res.Body).Decode(&syncResponse) + return &syncResponse, err +} + +// JoinRoom joins the client to a room ID or alias. See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-join-roomidoralias +// +// If serverName is specified, this will be added as a query param to instruct the homeserver to join via that server. If content is specified, it will +// be JSON encoded and used as the request body. +func (cli *Client) JoinRoom(roomIDorAlias, serverName string, content interface{}) (*RespJoinRoom, error) { + var urlPath string + if serverName != "" { + urlPath = cli.BuildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ + "server_name": serverName, + }) + } else { + urlPath = cli.BuildURL("join", roomIDorAlias) + } + + resBytes, err := cli.SendJSON("POST", urlPath, content) + if err != nil { + return nil, err + } + var joinRoomResponse RespJoinRoom + if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { + return nil, err + } + return &joinRoomResponse, nil +} + +// SetDisplayName sets the user's profile display name. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-profile-userid-displayname +func (cli *Client) SetDisplayName(displayName string) error { + urlPath := cli.BuildURL("profile", cli.UserID, "displayname") + s := struct { + DisplayName string `json:"displayname"` + }{displayName} + _, err := cli.SendJSON("PUT", urlPath, &s) + return err +} + +// SendMessageEvent sends a message event into a room. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid +// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal. +func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (*RespSendEvent, error) { + txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) + urlPath := cli.BuildURL("rooms", roomID, "send", eventType, txnID) + resBytes, err := cli.SendJSON("PUT", urlPath, contentJSON) + if err != nil { + return nil, err + } + var sendEventResponse RespSendEvent + if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { + return nil, err + } + return &sendEventResponse, nil +} + +// SendText sends an m.room.message event into the given room with a msgtype of m.text +// See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-text +func (cli *Client) SendText(roomID, text string) (*RespSendEvent, error) { + return cli.SendMessageEvent(roomID, "m.room.message", + TextMessage{"m.text", text}) +} + +// UploadLink uploads an HTTP URL and then returns an MXC URI. +func (cli *Client) UploadLink(link string) (*RespMediaUpload, error) { + res, err := cli.Client.Get(link) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) +} + +// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI. +// See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload +func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (*RespMediaUpload, error) { + req, err := http.NewRequest("POST", cli.BuildBaseURL("_matrix/media/r0/upload"), content) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentType) + req.ContentLength = contentLength + res, err := cli.Client.Do(req) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + return nil, HTTPError{ + Message: "Upload request failed", + Code: res.StatusCode, + } + } + var m RespMediaUpload + if err := json.NewDecoder(res.Body).Decode(&m); err != nil { + return nil, err + } + return &m, nil +} + +// NewClient creates a new Matrix Client ready for syncing +func NewClient(homeserverURL, userID, accessToken string) (*Client, error) { + hsURL, err := url.Parse(homeserverURL) + if err != nil { + return nil, err + } + // By default, use an in-memory store which will never save filter ids / next batch tokens to disk. + // The client will work with this storer: it just won't remember across restarts. + // In practice, a database backend should be used. + store := NewInMemoryStore() + cli := Client{ + AccessToken: accessToken, + HomeserverURL: hsURL, + UserID: userID, + Prefix: "/_matrix/client/r0", + Syncer: NewDefaultSyncer(userID, store), + Store: store, + } + // By default, use the default HTTP client. + cli.Client = http.DefaultClient + + return &cli, nil +} diff --git a/vendor/src/github.com/matrix-org/gomatrix/client_test.go b/vendor/src/github.com/matrix-org/gomatrix/client_test.go new file mode 100644 index 0000000..6a12a9a --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/client_test.go @@ -0,0 +1,28 @@ +package gomatrix + +import "fmt" + +func ExampleClient_BuildURLWithQuery() { + cli, _ := NewClient("https://matrix.org", "@example:matrix.org", "abcdef123456") + out := cli.BuildURLWithQuery([]string{"sync"}, map[string]string{ + "filter_id": "5", + }) + fmt.Println(out) + // Output: https://matrix.org/_matrix/client/r0/sync?access_token=abcdef123456&filter_id=5 +} + +func ExampleClient_BuildURL() { + userID := "@example:matrix.org" + cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") + out := cli.BuildURL("user", userID, "filter") + fmt.Println(out) + // Output: https://matrix.org/_matrix/client/r0/user/@example:matrix.org/filter?access_token=abcdef123456 +} + +func ExampleClient_BuildBaseURL() { + userID := "@example:matrix.org" + cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") + out := cli.BuildBaseURL("_matrix", "client", "r0", "directory", "room", "#matrix:matrix.org") + fmt.Println(out) + // Output: https://matrix.org/_matrix/client/r0/directory/room/%23matrix:matrix.org?access_token=abcdef123456 +} diff --git a/src/github.com/matrix-org/go-neb/matrix/types.go b/vendor/src/github.com/matrix-org/gomatrix/events.go similarity index 55% rename from src/github.com/matrix-org/go-neb/matrix/types.go rename to vendor/src/github.com/matrix-org/gomatrix/events.go index 08dc1ba..6ea259e 100644 --- a/src/github.com/matrix-org/go-neb/matrix/types.go +++ b/vendor/src/github.com/matrix-org/gomatrix/events.go @@ -1,61 +1,10 @@ -package matrix +package gomatrix import ( - "encoding/json" "html" "regexp" ) -// Room represents a single Matrix room. -type Room struct { - ID string - State map[string]map[string]*Event - Timeline []Event -} - -// UpdateState updates the room's current state with the given Event. This will clobber events based -// on the type/state_key combination. -func (room Room) UpdateState(event *Event) { - _, exists := room.State[event.Type] - if !exists { - room.State[event.Type] = make(map[string]*Event) - } - room.State[event.Type][event.StateKey] = event -} - -// GetStateEvent returns the state event for the given type/state_key combo, or nil. -func (room Room) GetStateEvent(eventType string, stateKey string) *Event { - stateEventMap, _ := room.State[eventType] - event, _ := stateEventMap[stateKey] - return event -} - -// GetMembershipState returns the membership state of the given user ID in this room. If there is -// no entry for this member, 'leave' is returned for consistency with left users. -func (room Room) GetMembershipState(userID string) string { - state := "leave" - event := room.GetStateEvent("m.room.member", userID) - if event != nil { - membershipState, found := event.Content["membership"] - if found { - mState, isString := membershipState.(string) - if isString { - state = mState - } - } - } - return state -} - -// NewRoom creates a new Room with the given ID -func NewRoom(roomID string) *Room { - // Init the State map and return a pointer to the Room - return &Room{ - ID: roomID, - State: make(map[string]map[string]*Event), - } -} - // Event represents a single Matrix event. type Event struct { StateKey string `json:"state_key"` // The state key for the event. Only present on State Events. @@ -131,29 +80,3 @@ func GetHTMLMessage(msgtype, htmlText string) HTMLMessage { FormattedBody: htmlText, } } - -// StarterLinkMessage represents a message with a starter_link custom data. -type StarterLinkMessage struct { - Body string - Link string -} - -// MarshalJSON converts this message into actual event content JSON. -func (m StarterLinkMessage) MarshalJSON() ([]byte, error) { - var data map[string]string - - if m.Link != "" { - data = map[string]string{ - "org.matrix.neb.starter_link": m.Link, - } - } - - msg := struct { - MsgType string `json:"msgtype"` - Body string `json:"body"` - Data map[string]string `json:"data,omitempty"` - }{ - "m.notice", m.Body, data, - } - return json.Marshal(msg) -} diff --git a/vendor/src/github.com/matrix-org/gomatrix/hooks/install.sh b/vendor/src/github.com/matrix-org/gomatrix/hooks/install.sh new file mode 100644 index 0000000..f8aa331 --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/hooks/install.sh @@ -0,0 +1,5 @@ +#! /bin/bash + +DOT_GIT="$(dirname $0)/../.git" + +ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" \ No newline at end of file diff --git a/vendor/src/github.com/matrix-org/gomatrix/hooks/pre-commit b/vendor/src/github.com/matrix-org/gomatrix/hooks/pre-commit new file mode 100644 index 0000000..6a14ccf --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/hooks/pre-commit @@ -0,0 +1,9 @@ +#! /bin/bash + +set -eu + +golint +go fmt +go tool vet --shadow . +gocyclo -over 12 . +go test -timeout 5s -test.v diff --git a/vendor/src/github.com/matrix-org/gomatrix/responses.go b/vendor/src/github.com/matrix-org/gomatrix/responses.go new file mode 100644 index 0000000..76bfbe2 --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/responses.go @@ -0,0 +1,61 @@ +package gomatrix + +// RespError is the standard JSON error response from Homeservers. It also implements the Golang "error" interface. +// See http://matrix.org/docs/spec/client_server/r0.2.0.html#api-standards +type RespError struct { + ErrCode string `json:"errcode"` + Err string `json:"error"` +} + +// Error returns the errcode and error message. +func (e RespError) Error() string { + return e.ErrCode + ": " + e.Err +} + +// RespCreateFilter is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter +type RespCreateFilter struct { + FilterID string `json:"filter_id"` +} + +// RespJoinRoom is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-rooms-roomid-join +type RespJoinRoom struct { + RoomID string `json:"room_id"` +} + +// RespSendEvent is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid +type RespSendEvent struct { + EventID string `json:"event_id"` +} + +// RespMediaUpload is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload +type RespMediaUpload struct { + ContentURI string `json:"content_uri"` +} + +// RespSync is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync +type RespSync struct { + NextBatch string `json:"next_batch"` + AccountData struct { + Events []Event `json:"events"` + } `json:"account_data"` + Presence struct { + Events []Event `json:"events"` + } `json:"presence"` + Rooms struct { + Join map[string]struct { + State struct { + Events []Event `json:"events"` + } `json:"state"` + Timeline struct { + Events []Event `json:"events"` + Limited bool `json:"limited"` + PrevBatch string `json:"prev_batch"` + } `json:"timeline"` + } `json:"join"` + Invite map[string]struct { + State struct { + Events []Event + } `json:"invite_state"` + } `json:"invite"` + } `json:"rooms"` +} diff --git a/vendor/src/github.com/matrix-org/gomatrix/room.go b/vendor/src/github.com/matrix-org/gomatrix/room.go new file mode 100644 index 0000000..0533b3e --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/room.go @@ -0,0 +1,50 @@ +package gomatrix + +// Room represents a single Matrix room. +type Room struct { + ID string + State map[string]map[string]*Event +} + +// UpdateState updates the room's current state with the given Event. This will clobber events based +// on the type/state_key combination. +func (room Room) UpdateState(event *Event) { + _, exists := room.State[event.Type] + if !exists { + room.State[event.Type] = make(map[string]*Event) + } + room.State[event.Type][event.StateKey] = event +} + +// GetStateEvent returns the state event for the given type/state_key combo, or nil. +func (room Room) GetStateEvent(eventType string, stateKey string) *Event { + stateEventMap, _ := room.State[eventType] + event, _ := stateEventMap[stateKey] + return event +} + +// GetMembershipState returns the membership state of the given user ID in this room. If there is +// no entry for this member, 'leave' is returned for consistency with left users. +func (room Room) GetMembershipState(userID string) string { + state := "leave" + event := room.GetStateEvent("m.room.member", userID) + if event != nil { + membershipState, found := event.Content["membership"] + if found { + mState, isString := membershipState.(string) + if isString { + state = mState + } + } + } + return state +} + +// NewRoom creates a new Room with the given ID +func NewRoom(roomID string) *Room { + // Init the State map and return a pointer to the Room + return &Room{ + ID: roomID, + State: make(map[string]map[string]*Event), + } +} diff --git a/vendor/src/github.com/matrix-org/gomatrix/store.go b/vendor/src/github.com/matrix-org/gomatrix/store.go new file mode 100644 index 0000000..6dc687e --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/store.go @@ -0,0 +1,65 @@ +package gomatrix + +// Storer is an interface which must be satisfied to store client data. +// +// You can either write a struct which persists this data to disk, or you can use the +// provided "InMemoryStore" which just keeps data around in-memory which is lost on +// restarts. +type Storer interface { + SaveFilterID(userID, filterID string) + LoadFilterID(userID string) string + SaveNextBatch(userID, nextBatchToken string) + LoadNextBatch(userID string) string + SaveRoom(room *Room) + LoadRoom(roomID string) *Room +} + +// InMemoryStore implements the Storer interface. +// +// Everything is persisted in-memory as maps. It is not safe to load/save filter IDs +// or next batch tokens on any goroutine other than the syncing goroutine: the one +// which called Client.Sync(). +type InMemoryStore struct { + Filters map[string]string + NextBatch map[string]string + Rooms map[string]*Room +} + +// SaveFilterID to memory. +func (s *InMemoryStore) SaveFilterID(userID, filterID string) { + s.Filters[userID] = filterID +} + +// LoadFilterID from memory. +func (s *InMemoryStore) LoadFilterID(userID string) string { + return s.Filters[userID] +} + +// SaveNextBatch to memory. +func (s *InMemoryStore) SaveNextBatch(userID, nextBatchToken string) { + s.NextBatch[userID] = nextBatchToken +} + +// LoadNextBatch from memory. +func (s *InMemoryStore) LoadNextBatch(userID string) string { + return s.NextBatch[userID] +} + +// SaveRoom to memory. +func (s *InMemoryStore) SaveRoom(room *Room) { + s.Rooms[room.ID] = room +} + +// LoadRoom from memory. +func (s *InMemoryStore) LoadRoom(roomID string) *Room { + return s.Rooms[roomID] +} + +// NewInMemoryStore constructs a new InMemoryStore. +func NewInMemoryStore() *InMemoryStore { + return &InMemoryStore{ + Filters: make(map[string]string), + NextBatch: make(map[string]string), + Rooms: make(map[string]*Room), + } +} diff --git a/vendor/src/github.com/matrix-org/gomatrix/sync.go b/vendor/src/github.com/matrix-org/gomatrix/sync.go new file mode 100644 index 0000000..347e5dc --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrix/sync.go @@ -0,0 +1,154 @@ +package gomatrix + +import ( + "encoding/json" + "fmt" + "runtime/debug" + "time" +) + +// Syncer represents an interface that must be satisfied in order to do /sync requests on a client. +type Syncer interface { + // Process the /sync response. The since parameter is the since= value that was used to produce the response. + // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped + // permanently. + ProcessResponse(resp *RespSync, since string) error + // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently. + OnFailedSync(res *RespSync, err error) (time.Duration, error) + // GetFilterJSON for the given user ID. NOT the filter ID. + GetFilterJSON(userID string) json.RawMessage +} + +// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively +// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer +// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information. +type DefaultSyncer struct { + UserID string + Store Storer + listeners map[string][]OnEventListener // event type to listeners array +} + +// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events. +type OnEventListener func(*Event) + +// NewDefaultSyncer returns an instantiated DefaultSyncer +func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer { + return &DefaultSyncer{ + UserID: userID, + Store: store, + listeners: make(map[string][]OnEventListener), + } +} + +// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of +// unrepeating events. Returns a fatal error if a listener panics. +func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) { + if !s.shouldProcessResponse(res, since) { + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack()) + } + }() + + for roomID, roomData := range res.Rooms.Join { + room := s.getOrCreateRoom(roomID) + for _, event := range roomData.State.Events { + event.RoomID = roomID + room.UpdateState(&event) + s.notifyListeners(&event) + } + for _, event := range roomData.Timeline.Events { + event.RoomID = roomID + s.notifyListeners(&event) + } + } + for roomID, roomData := range res.Rooms.Invite { + room := s.getOrCreateRoom(roomID) + for _, event := range roomData.State.Events { + event.RoomID = roomID + room.UpdateState(&event) + s.notifyListeners(&event) + } + } + return +} + +// OnEventType allows callers to be notified when there are new events for the given event type. +// There are no duplicate checks. +func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) { + _, exists := s.listeners[eventType] + if !exists { + s.listeners[eventType] = []OnEventListener{} + } + s.listeners[eventType] = append(s.listeners[eventType], callback) +} + +// shouldProcessResponse returns true if the response should be processed. May modify the response to remove +// stuff that shouldn't be processed. +func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool { + if since == "" { + return false + } + // This is a horrible hack because /sync will return the most recent messages for a room + // as soon as you /join it. We do NOT want to process those events in that particular room + // because they may have already been processed (if you toggle the bot in/out of the room). + // + // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us + // exists and is "join" and then discard processing that room entirely if so. + // TODO: We probably want to process messages from after the last join event in the timeline. + for roomID, roomData := range resp.Rooms.Join { + for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { + e := roomData.Timeline.Events[i] + if e.Type == "m.room.member" && e.StateKey == s.UserID { + m := e.Content["membership"] + mship, ok := m.(string) + if !ok { + continue + } + if mship == "join" { + _, ok := resp.Rooms.Join[roomID] + if !ok { + continue + } + delete(resp.Rooms.Join, roomID) // don't re-process messages + delete(resp.Rooms.Invite, roomID) // don't re-process invites + break + } + } + } + } + return true +} + +// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse() +func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room { + room := s.Store.LoadRoom(roomID) + if room == nil { // create a new Room + room = NewRoom(roomID) + s.Store.SaveRoom(room) + } + return room +} + +func (s *DefaultSyncer) notifyListeners(event *Event) { + listeners, exists := s.listeners[event.Type] + if !exists { + return + } + for _, fn := range listeners { + fn(event) + } +} + +// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error. +func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { + return 10 * time.Second, nil +} + +// GetFilterJSON returns a filter with a timeline limit of 50. +func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage { + return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) +}