mirror of https://github.com/matrix-org/go-neb.git
Kegsay
8 years ago
committed by
GitHub
31 changed files with 1155 additions and 756 deletions
-
6src/github.com/matrix-org/go-neb/api/handlers/service.go
-
95src/github.com/matrix-org/go-neb/clients/clients.go
-
16src/github.com/matrix-org/go-neb/clients/clients_test.go
-
460src/github.com/matrix-org/go-neb/matrix/matrix.go
-
40src/github.com/matrix-org/go-neb/matrix/responses.go
-
81src/github.com/matrix-org/go-neb/matrix/worker.go
-
6src/github.com/matrix-org/go-neb/services/echo/echo.go
-
14src/github.com/matrix-org/go-neb/services/giphy/giphy.go
-
17src/github.com/matrix-org/go-neb/services/github/github.go
-
10src/github.com/matrix-org/go-neb/services/github/github_webhook.go
-
6src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go
-
16src/github.com/matrix-org/go-neb/services/guggy/guggy.go
-
14src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go
-
15src/github.com/matrix-org/go-neb/services/jira/jira.go
-
16src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go
-
9src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go
-
12src/github.com/matrix-org/go-neb/services/travisci/travisci.go
-
15src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go
-
20src/github.com/matrix-org/go-neb/types/service.go
-
6vendor/manifest
-
201vendor/src/github.com/matrix-org/gomatrix/LICENSE
-
4vendor/src/github.com/matrix-org/gomatrix/README.md
-
381vendor/src/github.com/matrix-org/gomatrix/client.go
-
28vendor/src/github.com/matrix-org/gomatrix/client_test.go
-
79vendor/src/github.com/matrix-org/gomatrix/events.go
-
5vendor/src/github.com/matrix-org/gomatrix/hooks/install.sh
-
9vendor/src/github.com/matrix-org/gomatrix/hooks/pre-commit
-
61vendor/src/github.com/matrix-org/gomatrix/responses.go
-
50vendor/src/github.com/matrix-org/gomatrix/room.go
-
65vendor/src/github.com/matrix-org/gomatrix/store.go
-
154vendor/src/github.com/matrix-org/gomatrix/sync.go
@ -1,443 +1,69 @@ |
|||
// Package matrix provides an HTTP client that can interact with a Homeserver via r0 APIs (/sync).
|
|||
//
|
|||
// It is NOT safe to access the field (or any sub-fields of) 'Rooms' concurrently. In essence, this
|
|||
// structure MUST be treated as read-only. The matrix client will update this structure as new events
|
|||
// arrive from the homeserver.
|
|||
//
|
|||
// Internally, the client has 1 goroutine for polling the server, and 1 goroutine for processing data
|
|||
// returned. The polling goroutine communicates to the processing goroutine by a buffered channel
|
|||
// which feedback loops if processing takes a while as it will delay more data from being pulled down
|
|||
// if the buffer gets full. Modification of the 'Rooms' field of the client is done EXCLUSIVELY on the
|
|||
// processing goroutine.
|
|||
package matrix |
|||
|
|||
import ( |
|||
"bytes" |
|||
"encoding/json" |
|||
"fmt" |
|||
"io" |
|||
"io/ioutil" |
|||
"net/http" |
|||
"net/url" |
|||
"path" |
|||
"strconv" |
|||
"sync" |
|||
"time" |
|||
|
|||
log "github.com/Sirupsen/logrus" |
|||
"github.com/matrix-org/go-neb/api" |
|||
"github.com/matrix-org/go-neb/errors" |
|||
) |
|||
|
|||
var ( |
|||
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) |
|||
"github.com/matrix-org/go-neb/database" |
|||
"github.com/matrix-org/gomatrix" |
|||
) |
|||
|
|||
// NextBatchStorer controls loading/saving of next_batch tokens for users
|
|||
type NextBatchStorer interface { |
|||
// Save a next_batch token for a given user. Best effort.
|
|||
Save(userID, nextBatch string) |
|||
// Load a next_batch token for a given user. Return an empty string if no token exists.
|
|||
Load(userID string) string |
|||
} |
|||
|
|||
// noopNextBatchStore does not load or save next_batch tokens.
|
|||
type noopNextBatchStore struct{} |
|||
|
|||
func (s noopNextBatchStore) Save(userID, nextBatch string) {} |
|||
func (s noopNextBatchStore) Load(userID string) string { return "" } |
|||
|
|||
// Client represents a Matrix client.
|
|||
type Client struct { |
|||
HomeserverURL *url.URL |
|||
Prefix string |
|||
UserID string |
|||
AccessToken string |
|||
Rooms map[string]*Room |
|||
Worker *Worker |
|||
syncingMutex sync.Mutex |
|||
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
|||
httpClient *http.Client |
|||
filterID string |
|||
NextBatchStorer NextBatchStorer |
|||
ClientConfig api.ClientConfig |
|||
} |
|||
|
|||
func (cli *Client) buildURL(urlPath ...string) string { |
|||
ps := []string{cli.Prefix} |
|||
for _, p := range urlPath { |
|||
ps = append(ps, p) |
|||
} |
|||
return cli.buildBaseURL(ps...) |
|||
} |
|||
|
|||
func (cli *Client) buildBaseURL(urlPath ...string) string { |
|||
// copy the URL. Purposefully ignore error as the input is from a valid URL already
|
|||
hsURL, _ := url.Parse(cli.HomeserverURL.String()) |
|||
parts := []string{hsURL.Path} |
|||
parts = append(parts, urlPath...) |
|||
hsURL.Path = path.Join(parts...) |
|||
query := hsURL.Query() |
|||
query.Set("access_token", cli.AccessToken) |
|||
hsURL.RawQuery = query.Encode() |
|||
return hsURL.String() |
|||
} |
|||
|
|||
func (cli *Client) buildURLWithQuery(urlPath []string, urlQuery map[string]string) string { |
|||
u, _ := url.Parse(cli.buildURL(urlPath...)) |
|||
q := u.Query() |
|||
for k, v := range urlQuery { |
|||
q.Set(k, v) |
|||
} |
|||
u.RawQuery = q.Encode() |
|||
return u.String() |
|||
} |
|||
|
|||
// JoinRoom joins the client to a room ID or alias. If serverName is specified, this will be added as a query param
|
|||
// to instruct the homeserver to join via that server. If invitingUserID is specified, the inviting user ID will be
|
|||
// inserted into the content of the join request. Returns a room ID.
|
|||
func (cli *Client) JoinRoom(roomIDorAlias, serverName, invitingUserID string) (string, error) { |
|||
var urlPath string |
|||
if serverName != "" { |
|||
urlPath = cli.buildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ |
|||
"server_name": serverName, |
|||
}) |
|||
} else { |
|||
urlPath = cli.buildURL("join", roomIDorAlias) |
|||
} |
|||
|
|||
content := struct { |
|||
Inviter string `json:"inviter,omitempty"` |
|||
}{} |
|||
content.Inviter = invitingUserID |
|||
|
|||
resBytes, err := cli.sendJSON("POST", urlPath, content) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
var joinRoomResponse joinRoomHTTPResponse |
|||
if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { |
|||
return "", err |
|||
} |
|||
return joinRoomResponse.RoomID, nil |
|||
} |
|||
|
|||
// SetDisplayName sets the user's profile display name
|
|||
func (cli *Client) SetDisplayName(displayName string) error { |
|||
urlPath := cli.buildURL("profile", cli.UserID, "displayname") |
|||
s := struct { |
|||
DisplayName string `json:"displayname"` |
|||
}{displayName} |
|||
_, err := cli.sendJSON("PUT", urlPath, &s) |
|||
return err |
|||
// NEBStore implements the gomatrix.Storer interface.
|
|||
//
|
|||
// It persists the next batch token in the database, and includes a ClientConfig for the client.
|
|||
type NEBStore struct { |
|||
gomatrix.InMemoryStore |
|||
Database database.Storer |
|||
ClientConfig api.ClientConfig |
|||
} |
|||
|
|||
// SendMessageEvent sends a message event into a room, returning the event_id on success.
|
|||
// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal.
|
|||
func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (string, error) { |
|||
txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) |
|||
urlPath := cli.buildURL("rooms", roomID, "send", eventType, txnID) |
|||
resBytes, err := cli.sendJSON("PUT", urlPath, contentJSON) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
var sendEventResponse sendEventHTTPResponse |
|||
if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { |
|||
return "", err |
|||
// SaveNextBatch saves to the database.
|
|||
func (s *NEBStore) SaveNextBatch(userID, nextBatch string) { |
|||
if err := s.Database.UpdateNextBatch(userID, nextBatch); err != nil { |
|||
log.WithFields(log.Fields{ |
|||
log.ErrorKey: err, |
|||
"user_id": userID, |
|||
"next_batch": nextBatch, |
|||
}).Error("Failed to persist next_batch token") |
|||
} |
|||
return sendEventResponse.EventID, nil |
|||
} |
|||
|
|||
// SendText sends an m.room.message event into the given room with a msgtype of m.text
|
|||
func (cli *Client) SendText(roomID, text string) (string, error) { |
|||
return cli.SendMessageEvent(roomID, "m.room.message", |
|||
TextMessage{"m.text", text}) |
|||
} |
|||
|
|||
// UploadLink uploads an HTTP URL and then returns an MXC URI.
|
|||
func (cli *Client) UploadLink(link string) (string, error) { |
|||
res, err := cli.httpClient.Get(link) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
// LoadNextBatch loads from the database.
|
|||
func (s *NEBStore) LoadNextBatch(userID string) string { |
|||
token, err := s.Database.LoadNextBatch(userID) |
|||
if err != nil { |
|||
return "", err |
|||
log.WithFields(log.Fields{ |
|||
log.ErrorKey: err, |
|||
"user_id": userID, |
|||
}).Error("Failed to load next_batch token") |
|||
return "" |
|||
} |
|||
return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) |
|||
return token |
|||
} |
|||
|
|||
// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI.
|
|||
func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (string, error) { |
|||
req, err := http.NewRequest("POST", cli.buildBaseURL("_matrix/media/r0/upload"), content) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
req.Header.Set("Content-Type", contentType) |
|||
req.ContentLength = contentLength |
|||
log.WithFields(log.Fields{ |
|||
"content_type": contentType, |
|||
"content_length": contentLength, |
|||
}).Print("Uploading to content repo") |
|||
res, err := cli.httpClient.Do(req) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
if res.StatusCode != 200 { |
|||
return "", fmt.Errorf("Upload request returned HTTP %d", res.StatusCode) |
|||
} |
|||
m := struct { |
|||
ContentURI string `json:"content_uri"` |
|||
}{} |
|||
if err := json.NewDecoder(res.Body).Decode(&m); err != nil { |
|||
return "", err |
|||
} |
|||
return m.ContentURI, nil |
|||
// StarterLinkMessage represents a message with a starter_link custom data.
|
|||
type StarterLinkMessage struct { |
|||
Body string |
|||
Link string |
|||
} |
|||
|
|||
// Sync starts syncing with the provided Homeserver. This function will be invoked continually.
|
|||
// If Sync is called twice then the first sync will be stopped.
|
|||
func (cli *Client) Sync() { |
|||
// Mark the client as syncing.
|
|||
// We will keep syncing until the syncing state changes. Either because
|
|||
// Sync is called or StopSync is called.
|
|||
syncingID := cli.incrementSyncingID() |
|||
logger := log.WithFields(log.Fields{ |
|||
"syncing": syncingID, |
|||
"user_id": cli.UserID, |
|||
}) |
|||
|
|||
// TODO: Store the filter ID in the database
|
|||
filterID, err := cli.createFilter() |
|||
if err != nil { |
|||
logger.WithError(err).Fatal("Failed to create filter") |
|||
// TODO: Maybe do some sort of error handling here?
|
|||
} |
|||
cli.filterID = filterID |
|||
logger.WithField("filter", filterID).Print("Got filter ID") |
|||
nextToken := cli.NextBatchStorer.Load(cli.UserID) |
|||
|
|||
logger.WithField("next_batch", nextToken).Print("Starting sync") |
|||
|
|||
channel := make(chan syncHTTPResponse, 5) |
|||
|
|||
go func() { |
|||
for response := range channel { |
|||
cli.Worker.onSyncHTTPResponse(response) |
|||
} |
|||
}() |
|||
defer close(channel) |
|||
|
|||
for { |
|||
// Do a /sync
|
|||
syncBytes, err := cli.doSync(30000, nextToken) |
|||
if err != nil { |
|||
logger.WithError(err).Warn("doSync failed") |
|||
time.Sleep(5 * time.Second) |
|||
continue |
|||
} |
|||
|
|||
// Decode sync response into syncHTTPResponse
|
|||
var syncResponse syncHTTPResponse |
|||
if err = json.Unmarshal(syncBytes, &syncResponse); err != nil { |
|||
logger.WithError(err).Warn("Failed to decode sync data") |
|||
time.Sleep(5 * time.Second) |
|||
continue |
|||
} |
|||
// MarshalJSON converts this message into actual event content JSON.
|
|||
func (m StarterLinkMessage) MarshalJSON() ([]byte, error) { |
|||
var data map[string]string |
|||
|
|||
// Check that the syncing state hasn't changed
|
|||
// Either because we've stopped syncing or another sync has been started.
|
|||
// We discard the response from our sync.
|
|||
if cli.getSyncingID() != syncingID { |
|||
logger.Print("Stopping sync") |
|||
return |
|||
} |
|||
|
|||
processResponse := cli.shouldProcessResponse(nextToken, &syncResponse) |
|||
nextToken = syncResponse.NextBatch |
|||
|
|||
// Save the token now *before* passing it through to the worker. This means it's possible
|
|||
// to not process some events, but it means that we won't get constantly stuck processing
|
|||
// a malformed/buggy event which keeps making us panic.
|
|||
cli.NextBatchStorer.Save(cli.UserID, nextToken) |
|||
|
|||
if processResponse { |
|||
// Update client state
|
|||
channel <- syncResponse |
|||
if m.Link != "" { |
|||
data = map[string]string{ |
|||
"org.matrix.neb.starter_link": m.Link, |
|||
} |
|||
} |
|||
} |
|||
|
|||
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
|
|||
// stuff that shouldn't be processed.
|
|||
func (cli *Client) shouldProcessResponse(tokenOnSync string, syncResponse *syncHTTPResponse) bool { |
|||
if tokenOnSync == "" { |
|||
return false |
|||
} |
|||
// This is a horrible hack because /sync will return the most recent messages for a room
|
|||
// as soon as you /join it. We do NOT want to process those events in that particular room
|
|||
// because they may have already been processed (if you toggle the bot in/out of the room).
|
|||
//
|
|||
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
|
|||
// exists and is "join" and then discard processing that room entirely if so.
|
|||
// TODO: We probably want to process the !commands from after the last join event in the timeline.
|
|||
for roomID, roomData := range syncResponse.Rooms.Join { |
|||
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { |
|||
e := roomData.Timeline.Events[i] |
|||
if e.Type == "m.room.member" && e.StateKey == cli.UserID { |
|||
m := e.Content["membership"] |
|||
mship, ok := m.(string) |
|||
if !ok { |
|||
continue |
|||
} |
|||
if mship == "join" { |
|||
log.WithFields(log.Fields{ |
|||
"room_id": roomID, |
|||
"user_id": cli.UserID, |
|||
"start_token": tokenOnSync, |
|||
}).Info("Discarding /sync events in room: just joined it.") |
|||
_, ok := syncResponse.Rooms.Join[roomID] |
|||
if !ok { |
|||
panic("room " + roomID + " does not exist in Join?!") |
|||
} |
|||
delete(syncResponse.Rooms.Join, roomID) // don't re-process !commands
|
|||
delete(syncResponse.Rooms.Invite, roomID) // don't re-process invites
|
|||
break |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return true |
|||
} |
|||
|
|||
func (cli *Client) incrementSyncingID() uint32 { |
|||
cli.syncingMutex.Lock() |
|||
defer cli.syncingMutex.Unlock() |
|||
cli.syncingID++ |
|||
return cli.syncingID |
|||
} |
|||
|
|||
func (cli *Client) getSyncingID() uint32 { |
|||
cli.syncingMutex.Lock() |
|||
defer cli.syncingMutex.Unlock() |
|||
return cli.syncingID |
|||
} |
|||
|
|||
// StopSync stops the ongoing sync started by Sync.
|
|||
func (cli *Client) StopSync() { |
|||
// Advance the syncing state so that any running Syncs will terminate.
|
|||
cli.incrementSyncingID() |
|||
} |
|||
|
|||
// This should only be called by the worker goroutine
|
|||
func (cli *Client) getOrCreateRoom(roomID string) *Room { |
|||
room := cli.Rooms[roomID] |
|||
if room == nil { // create a new Room
|
|||
room = NewRoom(roomID) |
|||
cli.Rooms[roomID] = room |
|||
} |
|||
return room |
|||
} |
|||
|
|||
func (cli *Client) sendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { |
|||
jsonStr, err := json.Marshal(contentJSON) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
req.Header.Set("Content-Type", "application/json") |
|||
logger := log.WithFields(log.Fields{ |
|||
"method": method, |
|||
"url": httpURL, |
|||
"json": string(jsonStr), |
|||
}) |
|||
logger.Print("Sending JSON request") |
|||
res, err := cli.httpClient.Do(req) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
logger.WithError(err).Warn("Failed to send JSON request") |
|||
return nil, err |
|||
msg := struct { |
|||
MsgType string `json:"msgtype"` |
|||
Body string `json:"body"` |
|||
Data map[string]string `json:"data,omitempty"` |
|||
}{ |
|||
"m.notice", m.Body, data, |
|||
} |
|||
contents, err := ioutil.ReadAll(res.Body) |
|||
if res.StatusCode >= 300 { |
|||
logger.WithFields(log.Fields{ |
|||
"code": res.StatusCode, |
|||
"body": string(contents), |
|||
}).Warn("Failed to send JSON request") |
|||
return nil, errors.HTTPError{ |
|||
Code: res.StatusCode, |
|||
Message: "Failed to " + method + " JSON: HTTP " + strconv.Itoa(res.StatusCode), |
|||
} |
|||
} |
|||
if err != nil { |
|||
logger.WithError(err).Warn("Failed to read response") |
|||
return nil, err |
|||
} |
|||
return contents, nil |
|||
} |
|||
|
|||
func (cli *Client) createFilter() (string, error) { |
|||
urlPath := cli.buildURL("user", cli.UserID, "filter") |
|||
resBytes, err := cli.sendJSON("POST", urlPath, &filterJSON) |
|||
if err != nil { |
|||
return "", err |
|||
} |
|||
var filterResponse filterHTTPResponse |
|||
if err = json.Unmarshal(resBytes, &filterResponse); err != nil { |
|||
return "", err |
|||
} |
|||
return filterResponse.FilterID, nil |
|||
} |
|||
|
|||
func (cli *Client) doSync(timeout int, since string) ([]byte, error) { |
|||
query := map[string]string{ |
|||
"timeout": strconv.Itoa(timeout), |
|||
} |
|||
if since != "" { |
|||
query["since"] = since |
|||
} |
|||
if cli.filterID != "" { |
|||
query["filter"] = cli.filterID |
|||
} |
|||
urlPath := cli.buildURLWithQuery([]string{"sync"}, query) |
|||
req, err := http.NewRequest("GET", urlPath, nil) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
res, err := cli.httpClient.Do(req) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
defer res.Body.Close() |
|||
contents, err := ioutil.ReadAll(res.Body) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return contents, nil |
|||
} |
|||
|
|||
// NewClient creates a new Matrix Client ready for syncing
|
|||
func NewClient(httpClient *http.Client, homeserverURL *url.URL, accessToken, userID string) *Client { |
|||
cli := Client{ |
|||
AccessToken: accessToken, |
|||
HomeserverURL: homeserverURL, |
|||
UserID: userID, |
|||
Prefix: "/_matrix/client/r0", |
|||
} |
|||
cli.Worker = newWorker(&cli) |
|||
// By default, use a no-op next_batch storer which will never save tokens and always
|
|||
// "load" the empty string as a token. The client will work with this storer: it just won't
|
|||
// remember the token across restarts. In practice, a database backend should be used.
|
|||
cli.NextBatchStorer = noopNextBatchStore{} |
|||
cli.Rooms = make(map[string]*Room) |
|||
cli.httpClient = httpClient |
|||
|
|||
return &cli |
|||
return json.Marshal(msg) |
|||
} |
@ -1,40 +0,0 @@ |
|||
package matrix |
|||
|
|||
type filterHTTPResponse struct { |
|||
FilterID string `json:"filter_id"` |
|||
} |
|||
|
|||
type joinRoomHTTPResponse struct { |
|||
RoomID string `json:"room_id"` |
|||
} |
|||
|
|||
type sendEventHTTPResponse struct { |
|||
EventID string `json:"event_id"` |
|||
} |
|||
|
|||
type syncHTTPResponse struct { |
|||
NextBatch string `json:"next_batch"` |
|||
AccountData struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"account_data"` |
|||
Presence struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"presence"` |
|||
Rooms struct { |
|||
Join map[string]struct { |
|||
State struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"state"` |
|||
Timeline struct { |
|||
Events []Event `json:"events"` |
|||
Limited bool `json:"limited"` |
|||
PrevBatch string `json:"prev_batch"` |
|||
} `json:"timeline"` |
|||
} `json:"join"` |
|||
Invite map[string]struct { |
|||
State struct { |
|||
Events []Event |
|||
} `json:"invite_state"` |
|||
} `json:"invite"` |
|||
} `json:"rooms"` |
|||
} |
@ -1,81 +0,0 @@ |
|||
package matrix |
|||
|
|||
import ( |
|||
log "github.com/Sirupsen/logrus" |
|||
"runtime/debug" |
|||
) |
|||
|
|||
// Worker processes incoming events and updates the Matrix client's data structures. It also informs
|
|||
// any attached listeners of the new events.
|
|||
type Worker struct { |
|||
client *Client |
|||
listeners map[string][]OnEventListener // event type to listeners array
|
|||
} |
|||
|
|||
// OnEventListener can be used with Worker.OnEventType to be informed of incoming events.
|
|||
type OnEventListener func(*Event) |
|||
|
|||
func newWorker(client *Client) *Worker { |
|||
return &Worker{ |
|||
client, |
|||
make(map[string][]OnEventListener), |
|||
} |
|||
} |
|||
|
|||
// OnEventType allows callers to be notified when there are new events for the given event type.
|
|||
// There are no duplicate checks.
|
|||
func (worker *Worker) OnEventType(eventType string, callback OnEventListener) { |
|||
_, exists := worker.listeners[eventType] |
|||
if !exists { |
|||
worker.listeners[eventType] = []OnEventListener{} |
|||
} |
|||
worker.listeners[eventType] = append(worker.listeners[eventType], callback) |
|||
} |
|||
|
|||
func (worker *Worker) notifyListeners(event *Event) { |
|||
listeners, exists := worker.listeners[event.Type] |
|||
if !exists { |
|||
return |
|||
} |
|||
for _, fn := range listeners { |
|||
fn(event) |
|||
} |
|||
} |
|||
|
|||
func (worker *Worker) onSyncHTTPResponse(res syncHTTPResponse) { |
|||
defer func() { |
|||
if r := recover(); r != nil { |
|||
userID := "" |
|||
if worker.client != nil { |
|||
userID = worker.client.UserID |
|||
} |
|||
log.WithFields(log.Fields{ |
|||
"panic": r, |
|||
"user_id": userID, |
|||
}).Errorf( |
|||
"onSyncHTTPResponse panicked!\n%s", debug.Stack(), |
|||
) |
|||
} |
|||
}() |
|||
|
|||
for roomID, roomData := range res.Rooms.Join { |
|||
room := worker.client.getOrCreateRoom(roomID) |
|||
for _, event := range roomData.State.Events { |
|||
event.RoomID = roomID |
|||
room.UpdateState(&event) |
|||
worker.notifyListeners(&event) |
|||
} |
|||
for _, event := range roomData.Timeline.Events { |
|||
event.RoomID = roomID |
|||
worker.notifyListeners(&event) |
|||
} |
|||
} |
|||
for roomID, roomData := range res.Rooms.Invite { |
|||
room := worker.client.getOrCreateRoom(roomID) |
|||
for _, event := range roomData.State.Events { |
|||
event.RoomID = roomID |
|||
room.UpdateState(&event) |
|||
worker.notifyListeners(&event) |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,201 @@ |
|||
Apache License |
|||
Version 2.0, January 2004 |
|||
http://www.apache.org/licenses/ |
|||
|
|||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION |
|||
|
|||
1. Definitions. |
|||
|
|||
"License" shall mean the terms and conditions for use, reproduction, |
|||
and distribution as defined by Sections 1 through 9 of this document. |
|||
|
|||
"Licensor" shall mean the copyright owner or entity authorized by |
|||
the copyright owner that is granting the License. |
|||
|
|||
"Legal Entity" shall mean the union of the acting entity and all |
|||
other entities that control, are controlled by, or are under common |
|||
control with that entity. For the purposes of this definition, |
|||
"control" means (i) the power, direct or indirect, to cause the |
|||
direction or management of such entity, whether by contract or |
|||
otherwise, or (ii) ownership of fifty percent (50%) or more of the |
|||
outstanding shares, or (iii) beneficial ownership of such entity. |
|||
|
|||
"You" (or "Your") shall mean an individual or Legal Entity |
|||
exercising permissions granted by this License. |
|||
|
|||
"Source" form shall mean the preferred form for making modifications, |
|||
including but not limited to software source code, documentation |
|||
source, and configuration files. |
|||
|
|||
"Object" form shall mean any form resulting from mechanical |
|||
transformation or translation of a Source form, including but |
|||
not limited to compiled object code, generated documentation, |
|||
and conversions to other media types. |
|||
|
|||
"Work" shall mean the work of authorship, whether in Source or |
|||
Object form, made available under the License, as indicated by a |
|||
copyright notice that is included in or attached to the work |
|||
(an example is provided in the Appendix below). |
|||
|
|||
"Derivative Works" shall mean any work, whether in Source or Object |
|||
form, that is based on (or derived from) the Work and for which the |
|||
editorial revisions, annotations, elaborations, or other modifications |
|||
represent, as a whole, an original work of authorship. For the purposes |
|||
of this License, Derivative Works shall not include works that remain |
|||
separable from, or merely link (or bind by name) to the interfaces of, |
|||
the Work and Derivative Works thereof. |
|||
|
|||
"Contribution" shall mean any work of authorship, including |
|||
the original version of the Work and any modifications or additions |
|||
to that Work or Derivative Works thereof, that is intentionally |
|||
submitted to Licensor for inclusion in the Work by the copyright owner |
|||
or by an individual or Legal Entity authorized to submit on behalf of |
|||
the copyright owner. For the purposes of this definition, "submitted" |
|||
means any form of electronic, verbal, or written communication sent |
|||
to the Licensor or its representatives, including but not limited to |
|||
communication on electronic mailing lists, source code control systems, |
|||
and issue tracking systems that are managed by, or on behalf of, the |
|||
Licensor for the purpose of discussing and improving the Work, but |
|||
excluding communication that is conspicuously marked or otherwise |
|||
designated in writing by the copyright owner as "Not a Contribution." |
|||
|
|||
"Contributor" shall mean Licensor and any individual or Legal Entity |
|||
on behalf of whom a Contribution has been received by Licensor and |
|||
subsequently incorporated within the Work. |
|||
|
|||
2. Grant of Copyright License. Subject to the terms and conditions of |
|||
this License, each Contributor hereby grants to You a perpetual, |
|||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
|||
copyright license to reproduce, prepare Derivative Works of, |
|||
publicly display, publicly perform, sublicense, and distribute the |
|||
Work and such Derivative Works in Source or Object form. |
|||
|
|||
3. Grant of Patent License. Subject to the terms and conditions of |
|||
this License, each Contributor hereby grants to You a perpetual, |
|||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
|||
(except as stated in this section) patent license to make, have made, |
|||
use, offer to sell, sell, import, and otherwise transfer the Work, |
|||
where such license applies only to those patent claims licensable |
|||
by such Contributor that are necessarily infringed by their |
|||
Contribution(s) alone or by combination of their Contribution(s) |
|||
with the Work to which such Contribution(s) was submitted. If You |
|||
institute patent litigation against any entity (including a |
|||
cross-claim or counterclaim in a lawsuit) alleging that the Work |
|||
or a Contribution incorporated within the Work constitutes direct |
|||
or contributory patent infringement, then any patent licenses |
|||
granted to You under this License for that Work shall terminate |
|||
as of the date such litigation is filed. |
|||
|
|||
4. Redistribution. You may reproduce and distribute copies of the |
|||
Work or Derivative Works thereof in any medium, with or without |
|||
modifications, and in Source or Object form, provided that You |
|||
meet the following conditions: |
|||
|
|||
(a) You must give any other recipients of the Work or |
|||
Derivative Works a copy of this License; and |
|||
|
|||
(b) You must cause any modified files to carry prominent notices |
|||
stating that You changed the files; and |
|||
|
|||
(c) You must retain, in the Source form of any Derivative Works |
|||
that You distribute, all copyright, patent, trademark, and |
|||
attribution notices from the Source form of the Work, |
|||
excluding those notices that do not pertain to any part of |
|||
the Derivative Works; and |
|||
|
|||
(d) If the Work includes a "NOTICE" text file as part of its |
|||
distribution, then any Derivative Works that You distribute must |
|||
include a readable copy of the attribution notices contained |
|||
within such NOTICE file, excluding those notices that do not |
|||
pertain to any part of the Derivative Works, in at least one |
|||
of the following places: within a NOTICE text file distributed |
|||
as part of the Derivative Works; within the Source form or |
|||
documentation, if provided along with the Derivative Works; or, |
|||
within a display generated by the Derivative Works, if and |
|||
wherever such third-party notices normally appear. The contents |
|||
of the NOTICE file are for informational purposes only and |
|||
do not modify the License. You may add Your own attribution |
|||
notices within Derivative Works that You distribute, alongside |
|||
or as an addendum to the NOTICE text from the Work, provided |
|||
that such additional attribution notices cannot be construed |
|||
as modifying the License. |
|||
|
|||
You may add Your own copyright statement to Your modifications and |
|||
may provide additional or different license terms and conditions |
|||
for use, reproduction, or distribution of Your modifications, or |
|||
for any such Derivative Works as a whole, provided Your use, |
|||
reproduction, and distribution of the Work otherwise complies with |
|||
the conditions stated in this License. |
|||
|
|||
5. Submission of Contributions. Unless You explicitly state otherwise, |
|||
any Contribution intentionally submitted for inclusion in the Work |
|||
by You to the Licensor shall be under the terms and conditions of |
|||
this License, without any additional terms or conditions. |
|||
Notwithstanding the above, nothing herein shall supersede or modify |
|||
the terms of any separate license agreement you may have executed |
|||
with Licensor regarding such Contributions. |
|||
|
|||
6. Trademarks. This License does not grant permission to use the trade |
|||
names, trademarks, service marks, or product names of the Licensor, |
|||
except as required for reasonable and customary use in describing the |
|||
origin of the Work and reproducing the content of the NOTICE file. |
|||
|
|||
7. Disclaimer of Warranty. Unless required by applicable law or |
|||
agreed to in writing, Licensor provides the Work (and each |
|||
Contributor provides its Contributions) on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
|||
implied, including, without limitation, any warranties or conditions |
|||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A |
|||
PARTICULAR PURPOSE. You are solely responsible for determining the |
|||
appropriateness of using or redistributing the Work and assume any |
|||
risks associated with Your exercise of permissions under this License. |
|||
|
|||
8. Limitation of Liability. In no event and under no legal theory, |
|||
whether in tort (including negligence), contract, or otherwise, |
|||
unless required by applicable law (such as deliberate and grossly |
|||
negligent acts) or agreed to in writing, shall any Contributor be |
|||
liable to You for damages, including any direct, indirect, special, |
|||
incidental, or consequential damages of any character arising as a |
|||
result of this License or out of the use or inability to use the |
|||
Work (including but not limited to damages for loss of goodwill, |
|||
work stoppage, computer failure or malfunction, or any and all |
|||
other commercial damages or losses), even if such Contributor |
|||
has been advised of the possibility of such damages. |
|||
|
|||
9. Accepting Warranty or Additional Liability. While redistributing |
|||
the Work or Derivative Works thereof, You may choose to offer, |
|||
and charge a fee for, acceptance of support, warranty, indemnity, |
|||
or other liability obligations and/or rights consistent with this |
|||
License. However, in accepting such obligations, You may act only |
|||
on Your own behalf and on Your sole responsibility, not on behalf |
|||
of any other Contributor, and only if You agree to indemnify, |
|||
defend, and hold each Contributor harmless for any liability |
|||
incurred by, or claims asserted against, such Contributor by reason |
|||
of your accepting any such warranty or additional liability. |
|||
|
|||
END OF TERMS AND CONDITIONS |
|||
|
|||
APPENDIX: How to apply the Apache License to your work. |
|||
|
|||
To apply the Apache License to your work, attach the following |
|||
boilerplate notice, with the fields enclosed by brackets "{}" |
|||
replaced with your own identifying information. (Don't include |
|||
the brackets!) The text should be enclosed in the appropriate |
|||
comment syntax for the file format. We also recommend that a |
|||
file or class name and description of purpose be included on the |
|||
same "printed page" as the copyright notice for easier |
|||
identification within third-party archives. |
|||
|
|||
Copyright {yyyy} {name of copyright owner} |
|||
|
|||
Licensed under the Apache License, Version 2.0 (the "License"); |
|||
you may not use this file except in compliance with the License. |
|||
You may obtain a copy of the License at |
|||
|
|||
http://www.apache.org/licenses/LICENSE-2.0 |
|||
|
|||
Unless required by applicable law or agreed to in writing, software |
|||
distributed under the License is distributed on an "AS IS" BASIS, |
|||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
See the License for the specific language governing permissions and |
|||
limitations under the License. |
@ -0,0 +1,4 @@ |
|||
# gomatrix |
|||
[![GoDoc](https://godoc.org/github.com/matrix-org/gomatrix?status.svg)](https://godoc.org/github.com/matrix-org/gomatrix) |
|||
|
|||
A Golang Matrix client |
@ -0,0 +1,381 @@ |
|||
// Package gomatrix implements the Matrix Client-Server API.
|
|||
//
|
|||
// Specification can be found at http://matrix.org/docs/spec/client_server/r0.2.0.html
|
|||
//
|
|||
// Example usage of this library: (blocking version)
|
|||
// cli, _ := gomatrix.NewClient("https://matrix.org", "@example:matrix.org", "MDAefhiuwehfuiwe")
|
|||
// syncer := cli.Syncer.(*gomatrix.DefaultSyncer)
|
|||
// syncer.OnEventType("m.room.message", func(ev *gomatrix.Event) {
|
|||
// fmt.Println("Message: ", ev)
|
|||
// })
|
|||
// if err := cli.Sync(); err != nil {
|
|||
// fmt.Println("Sync() returned ", err)
|
|||
// }
|
|||
//
|
|||
// To make the example non-blocking, call Sync() in a goroutine.
|
|||
package gomatrix |
|||
|
|||
import ( |
|||
"bytes" |
|||
"encoding/json" |
|||
"fmt" |
|||
"io" |
|||
"io/ioutil" |
|||
"net/http" |
|||
"net/url" |
|||
"path" |
|||
"strconv" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
// Client represents a Matrix client.
|
|||
type Client struct { |
|||
HomeserverURL *url.URL // The base homeserver URL
|
|||
Prefix string // The API prefix eg '/_matrix/client/r0'
|
|||
UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID.
|
|||
AccessToken string // The access_token for the client.
|
|||
syncingMutex sync.Mutex // protects syncingID
|
|||
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
|||
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
|
|||
Syncer Syncer // The thing which can process /sync responses
|
|||
Store Storer // The thing which can store rooms/tokens/ids
|
|||
} |
|||
|
|||
// HTTPError An HTTP Error response, which may wrap an underlying native Go Error.
|
|||
type HTTPError struct { |
|||
WrappedError error |
|||
Message string |
|||
Code int |
|||
} |
|||
|
|||
func (e HTTPError) Error() string { |
|||
var wrappedErrMsg string |
|||
if e.WrappedError != nil { |
|||
wrappedErrMsg = e.WrappedError.Error() |
|||
} |
|||
return fmt.Sprintf("msg=%s code=%d wrapped=%s", e.Message, e.Code, wrappedErrMsg) |
|||
} |
|||
|
|||
// BuildURL builds a URL with the Client's homserver/prefix/access_token set already.
|
|||
func (cli *Client) BuildURL(urlPath ...string) string { |
|||
ps := []string{cli.Prefix} |
|||
for _, p := range urlPath { |
|||
ps = append(ps, p) |
|||
} |
|||
return cli.BuildBaseURL(ps...) |
|||
} |
|||
|
|||
// BuildBaseURL builds a URL with the Client's homeserver/access_token set already. You must
|
|||
// supply the prefix in the path.
|
|||
func (cli *Client) BuildBaseURL(urlPath ...string) string { |
|||
// copy the URL. Purposefully ignore error as the input is from a valid URL already
|
|||
hsURL, _ := url.Parse(cli.HomeserverURL.String()) |
|||
parts := []string{hsURL.Path} |
|||
parts = append(parts, urlPath...) |
|||
hsURL.Path = path.Join(parts...) |
|||
query := hsURL.Query() |
|||
query.Set("access_token", cli.AccessToken) |
|||
hsURL.RawQuery = query.Encode() |
|||
return hsURL.String() |
|||
} |
|||
|
|||
// BuildURLWithQuery builds a URL with query paramters in addition to the Client's homeserver/prefix/access_token set already.
|
|||
func (cli *Client) BuildURLWithQuery(urlPath []string, urlQuery map[string]string) string { |
|||
u, _ := url.Parse(cli.BuildURL(urlPath...)) |
|||
q := u.Query() |
|||
for k, v := range urlQuery { |
|||
q.Set(k, v) |
|||
} |
|||
u.RawQuery = q.Encode() |
|||
return u.String() |
|||
} |
|||
|
|||
// Sync starts syncing with the provided Homeserver. This function will block until a fatal /sync error occurs, so should
|
|||
// almost always be started as a new goroutine. If Sync() is called twice then the first sync will be stopped.
|
|||
func (cli *Client) Sync() error { |
|||
// Mark the client as syncing.
|
|||
// We will keep syncing until the syncing state changes. Either because
|
|||
// Sync is called or StopSync is called.
|
|||
syncingID := cli.incrementSyncingID() |
|||
nextBatch := cli.Store.LoadNextBatch(cli.UserID) |
|||
filterID := cli.Store.LoadFilterID(cli.UserID) |
|||
if filterID == "" { |
|||
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID) |
|||
resFilter, err := cli.CreateFilter(filterJSON) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
filterID = resFilter.FilterID |
|||
cli.Store.SaveFilterID(cli.UserID, filterID) |
|||
} |
|||
|
|||
for { |
|||
resSync, err := cli.SyncRequest(30000, nextBatch, filterID, false, "") |
|||
if err != nil { |
|||
duration, err2 := cli.Syncer.OnFailedSync(resSync, err) |
|||
if err2 != nil { |
|||
return err2 |
|||
} |
|||
time.Sleep(duration) |
|||
continue |
|||
} |
|||
|
|||
// Check that the syncing state hasn't changed
|
|||
// Either because we've stopped syncing or another sync has been started.
|
|||
// We discard the response from our sync.
|
|||
if cli.getSyncingID() != syncingID { |
|||
return nil |
|||
} |
|||
|
|||
// Save the token now *before* processing it. This means it's possible
|
|||
// to not process some events, but it means that we won't get constantly stuck processing
|
|||
// a malformed/buggy event which keeps making us panic.
|
|||
cli.Store.SaveNextBatch(cli.UserID, resSync.NextBatch) |
|||
if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil { |
|||
return err |
|||
} |
|||
|
|||
nextBatch = resSync.NextBatch |
|||
} |
|||
} |
|||
|
|||
func (cli *Client) incrementSyncingID() uint32 { |
|||
cli.syncingMutex.Lock() |
|||
defer cli.syncingMutex.Unlock() |
|||
cli.syncingID++ |
|||
return cli.syncingID |
|||
} |
|||
|
|||
func (cli *Client) getSyncingID() uint32 { |
|||
cli.syncingMutex.Lock() |
|||
defer cli.syncingMutex.Unlock() |
|||
return cli.syncingID |
|||
} |
|||
|
|||
// StopSync stops the ongoing sync started by Sync.
|
|||
func (cli *Client) StopSync() { |
|||
// Advance the syncing state so that any running Syncs will terminate.
|
|||
cli.incrementSyncingID() |
|||
} |
|||
|
|||
// SendJSON sends JSON to the given URL.
|
|||
//
|
|||
// Returns the HTTP body as bytes on 2xx. Returns an error if the response is not 2xx. This error
|
|||
// is an HTTPError which includes the returned HTTP status code and possibly a RespError as the
|
|||
// WrappedError, if the HTTP body could be decoded as a RespError.
|
|||
func (cli *Client) SendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { |
|||
jsonStr, err := json.Marshal(contentJSON) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
req.Header.Set("Content-Type", "application/json") |
|||
res, err := cli.Client.Do(req) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
contents, err := ioutil.ReadAll(res.Body) |
|||
if res.StatusCode >= 300 || res.StatusCode < 200 { |
|||
var wrap error |
|||
var respErr RespError |
|||
if _ = json.Unmarshal(contents, respErr); respErr.ErrCode != "" { |
|||
wrap = respErr |
|||
} |
|||
|
|||
// If we failed to decode as RespError, don't just drop the HTTP body, include it in the
|
|||
// HTTP error instead (e.g proxy errors which return HTML).
|
|||
msg := "Failed to " + method + " JSON" |
|||
if wrap == nil { |
|||
msg = msg + ": " + string(contents) |
|||
} |
|||
|
|||
return nil, HTTPError{ |
|||
Code: res.StatusCode, |
|||
Message: msg, |
|||
WrappedError: wrap, |
|||
} |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return contents, nil |
|||
} |
|||
|
|||
// CreateFilter makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter
|
|||
func (cli *Client) CreateFilter(filter json.RawMessage) (*RespCreateFilter, error) { |
|||
urlPath := cli.BuildURL("user", cli.UserID, "filter") |
|||
resBytes, err := cli.SendJSON("POST", urlPath, &filter) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
var filterResponse RespCreateFilter |
|||
if err = json.Unmarshal(resBytes, &filterResponse); err != nil { |
|||
return nil, err |
|||
} |
|||
return &filterResponse, nil |
|||
} |
|||
|
|||
// SyncRequest makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
|||
func (cli *Client) SyncRequest(timeout int, since, filterID string, fullState bool, setPresence string) (*RespSync, error) { |
|||
query := map[string]string{ |
|||
"timeout": strconv.Itoa(timeout), |
|||
} |
|||
if since != "" { |
|||
query["since"] = since |
|||
} |
|||
if filterID != "" { |
|||
query["filter"] = filterID |
|||
} |
|||
if setPresence != "" { |
|||
query["set_presence"] = setPresence |
|||
} |
|||
if fullState { |
|||
query["full_state"] = "true" |
|||
} |
|||
urlPath := cli.BuildURLWithQuery([]string{"sync"}, query) |
|||
req, err := http.NewRequest("GET", urlPath, nil) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
res, err := cli.Client.Do(req) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
var syncResponse RespSync |
|||
err = json.NewDecoder(res.Body).Decode(&syncResponse) |
|||
return &syncResponse, err |
|||
} |
|||
|
|||
// JoinRoom joins the client to a room ID or alias. See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-join-roomidoralias
|
|||
//
|
|||
// If serverName is specified, this will be added as a query param to instruct the homeserver to join via that server. If content is specified, it will
|
|||
// be JSON encoded and used as the request body.
|
|||
func (cli *Client) JoinRoom(roomIDorAlias, serverName string, content interface{}) (*RespJoinRoom, error) { |
|||
var urlPath string |
|||
if serverName != "" { |
|||
urlPath = cli.BuildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ |
|||
"server_name": serverName, |
|||
}) |
|||
} else { |
|||
urlPath = cli.BuildURL("join", roomIDorAlias) |
|||
} |
|||
|
|||
resBytes, err := cli.SendJSON("POST", urlPath, content) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
var joinRoomResponse RespJoinRoom |
|||
if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { |
|||
return nil, err |
|||
} |
|||
return &joinRoomResponse, nil |
|||
} |
|||
|
|||
// SetDisplayName sets the user's profile display name. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-profile-userid-displayname
|
|||
func (cli *Client) SetDisplayName(displayName string) error { |
|||
urlPath := cli.BuildURL("profile", cli.UserID, "displayname") |
|||
s := struct { |
|||
DisplayName string `json:"displayname"` |
|||
}{displayName} |
|||
_, err := cli.SendJSON("PUT", urlPath, &s) |
|||
return err |
|||
} |
|||
|
|||
// SendMessageEvent sends a message event into a room. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
|
|||
// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal.
|
|||
func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (*RespSendEvent, error) { |
|||
txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) |
|||
urlPath := cli.BuildURL("rooms", roomID, "send", eventType, txnID) |
|||
resBytes, err := cli.SendJSON("PUT", urlPath, contentJSON) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
var sendEventResponse RespSendEvent |
|||
if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { |
|||
return nil, err |
|||
} |
|||
return &sendEventResponse, nil |
|||
} |
|||
|
|||
// SendText sends an m.room.message event into the given room with a msgtype of m.text
|
|||
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-text
|
|||
func (cli *Client) SendText(roomID, text string) (*RespSendEvent, error) { |
|||
return cli.SendMessageEvent(roomID, "m.room.message", |
|||
TextMessage{"m.text", text}) |
|||
} |
|||
|
|||
// UploadLink uploads an HTTP URL and then returns an MXC URI.
|
|||
func (cli *Client) UploadLink(link string) (*RespMediaUpload, error) { |
|||
res, err := cli.Client.Get(link) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) |
|||
} |
|||
|
|||
// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI.
|
|||
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload
|
|||
func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (*RespMediaUpload, error) { |
|||
req, err := http.NewRequest("POST", cli.BuildBaseURL("_matrix/media/r0/upload"), content) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
req.Header.Set("Content-Type", contentType) |
|||
req.ContentLength = contentLength |
|||
res, err := cli.Client.Do(req) |
|||
if res != nil { |
|||
defer res.Body.Close() |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if res.StatusCode != 200 { |
|||
return nil, HTTPError{ |
|||
Message: "Upload request failed", |
|||
Code: res.StatusCode, |
|||
} |
|||
} |
|||
var m RespMediaUpload |
|||
if err := json.NewDecoder(res.Body).Decode(&m); err != nil { |
|||
return nil, err |
|||
} |
|||
return &m, nil |
|||
} |
|||
|
|||
// NewClient creates a new Matrix Client ready for syncing
|
|||
func NewClient(homeserverURL, userID, accessToken string) (*Client, error) { |
|||
hsURL, err := url.Parse(homeserverURL) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
// By default, use an in-memory store which will never save filter ids / next batch tokens to disk.
|
|||
// The client will work with this storer: it just won't remember across restarts.
|
|||
// In practice, a database backend should be used.
|
|||
store := NewInMemoryStore() |
|||
cli := Client{ |
|||
AccessToken: accessToken, |
|||
HomeserverURL: hsURL, |
|||
UserID: userID, |
|||
Prefix: "/_matrix/client/r0", |
|||
Syncer: NewDefaultSyncer(userID, store), |
|||
Store: store, |
|||
} |
|||
// By default, use the default HTTP client.
|
|||
cli.Client = http.DefaultClient |
|||
|
|||
return &cli, nil |
|||
} |
@ -0,0 +1,28 @@ |
|||
package gomatrix |
|||
|
|||
import "fmt" |
|||
|
|||
func ExampleClient_BuildURLWithQuery() { |
|||
cli, _ := NewClient("https://matrix.org", "@example:matrix.org", "abcdef123456") |
|||
out := cli.BuildURLWithQuery([]string{"sync"}, map[string]string{ |
|||
"filter_id": "5", |
|||
}) |
|||
fmt.Println(out) |
|||
// Output: https://matrix.org/_matrix/client/r0/sync?access_token=abcdef123456&filter_id=5
|
|||
} |
|||
|
|||
func ExampleClient_BuildURL() { |
|||
userID := "@example:matrix.org" |
|||
cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") |
|||
out := cli.BuildURL("user", userID, "filter") |
|||
fmt.Println(out) |
|||
// Output: https://matrix.org/_matrix/client/r0/user/@example:matrix.org/filter?access_token=abcdef123456
|
|||
} |
|||
|
|||
func ExampleClient_BuildBaseURL() { |
|||
userID := "@example:matrix.org" |
|||
cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") |
|||
out := cli.BuildBaseURL("_matrix", "client", "r0", "directory", "room", "#matrix:matrix.org") |
|||
fmt.Println(out) |
|||
// Output: https://matrix.org/_matrix/client/r0/directory/room/%23matrix:matrix.org?access_token=abcdef123456
|
|||
} |
@ -0,0 +1,5 @@ |
|||
#! /bin/bash |
|||
|
|||
DOT_GIT="$(dirname $0)/../.git" |
|||
|
|||
ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" |
@ -0,0 +1,9 @@ |
|||
#! /bin/bash |
|||
|
|||
set -eu |
|||
|
|||
golint |
|||
go fmt |
|||
go tool vet --shadow . |
|||
gocyclo -over 12 . |
|||
go test -timeout 5s -test.v |
@ -0,0 +1,61 @@ |
|||
package gomatrix |
|||
|
|||
// RespError is the standard JSON error response from Homeservers. It also implements the Golang "error" interface.
|
|||
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#api-standards
|
|||
type RespError struct { |
|||
ErrCode string `json:"errcode"` |
|||
Err string `json:"error"` |
|||
} |
|||
|
|||
// Error returns the errcode and error message.
|
|||
func (e RespError) Error() string { |
|||
return e.ErrCode + ": " + e.Err |
|||
} |
|||
|
|||
// RespCreateFilter is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter
|
|||
type RespCreateFilter struct { |
|||
FilterID string `json:"filter_id"` |
|||
} |
|||
|
|||
// RespJoinRoom is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-rooms-roomid-join
|
|||
type RespJoinRoom struct { |
|||
RoomID string `json:"room_id"` |
|||
} |
|||
|
|||
// RespSendEvent is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
|
|||
type RespSendEvent struct { |
|||
EventID string `json:"event_id"` |
|||
} |
|||
|
|||
// RespMediaUpload is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload
|
|||
type RespMediaUpload struct { |
|||
ContentURI string `json:"content_uri"` |
|||
} |
|||
|
|||
// RespSync is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
|||
type RespSync struct { |
|||
NextBatch string `json:"next_batch"` |
|||
AccountData struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"account_data"` |
|||
Presence struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"presence"` |
|||
Rooms struct { |
|||
Join map[string]struct { |
|||
State struct { |
|||
Events []Event `json:"events"` |
|||
} `json:"state"` |
|||
Timeline struct { |
|||
Events []Event `json:"events"` |
|||
Limited bool `json:"limited"` |
|||
PrevBatch string `json:"prev_batch"` |
|||
} `json:"timeline"` |
|||
} `json:"join"` |
|||
Invite map[string]struct { |
|||
State struct { |
|||
Events []Event |
|||
} `json:"invite_state"` |
|||
} `json:"invite"` |
|||
} `json:"rooms"` |
|||
} |
@ -0,0 +1,50 @@ |
|||
package gomatrix |
|||
|
|||
// Room represents a single Matrix room.
|
|||
type Room struct { |
|||
ID string |
|||
State map[string]map[string]*Event |
|||
} |
|||
|
|||
// UpdateState updates the room's current state with the given Event. This will clobber events based
|
|||
// on the type/state_key combination.
|
|||
func (room Room) UpdateState(event *Event) { |
|||
_, exists := room.State[event.Type] |
|||
if !exists { |
|||
room.State[event.Type] = make(map[string]*Event) |
|||
} |
|||
room.State[event.Type][event.StateKey] = event |
|||
} |
|||
|
|||
// GetStateEvent returns the state event for the given type/state_key combo, or nil.
|
|||
func (room Room) GetStateEvent(eventType string, stateKey string) *Event { |
|||
stateEventMap, _ := room.State[eventType] |
|||
event, _ := stateEventMap[stateKey] |
|||
return event |
|||
} |
|||
|
|||
// GetMembershipState returns the membership state of the given user ID in this room. If there is
|
|||
// no entry for this member, 'leave' is returned for consistency with left users.
|
|||
func (room Room) GetMembershipState(userID string) string { |
|||
state := "leave" |
|||
event := room.GetStateEvent("m.room.member", userID) |
|||
if event != nil { |
|||
membershipState, found := event.Content["membership"] |
|||
if found { |
|||
mState, isString := membershipState.(string) |
|||
if isString { |
|||
state = mState |
|||
} |
|||
} |
|||
} |
|||
return state |
|||
} |
|||
|
|||
// NewRoom creates a new Room with the given ID
|
|||
func NewRoom(roomID string) *Room { |
|||
// Init the State map and return a pointer to the Room
|
|||
return &Room{ |
|||
ID: roomID, |
|||
State: make(map[string]map[string]*Event), |
|||
} |
|||
} |
@ -0,0 +1,65 @@ |
|||
package gomatrix |
|||
|
|||
// Storer is an interface which must be satisfied to store client data.
|
|||
//
|
|||
// You can either write a struct which persists this data to disk, or you can use the
|
|||
// provided "InMemoryStore" which just keeps data around in-memory which is lost on
|
|||
// restarts.
|
|||
type Storer interface { |
|||
SaveFilterID(userID, filterID string) |
|||
LoadFilterID(userID string) string |
|||
SaveNextBatch(userID, nextBatchToken string) |
|||
LoadNextBatch(userID string) string |
|||
SaveRoom(room *Room) |
|||
LoadRoom(roomID string) *Room |
|||
} |
|||
|
|||
// InMemoryStore implements the Storer interface.
|
|||
//
|
|||
// Everything is persisted in-memory as maps. It is not safe to load/save filter IDs
|
|||
// or next batch tokens on any goroutine other than the syncing goroutine: the one
|
|||
// which called Client.Sync().
|
|||
type InMemoryStore struct { |
|||
Filters map[string]string |
|||
NextBatch map[string]string |
|||
Rooms map[string]*Room |
|||
} |
|||
|
|||
// SaveFilterID to memory.
|
|||
func (s *InMemoryStore) SaveFilterID(userID, filterID string) { |
|||
s.Filters[userID] = filterID |
|||
} |
|||
|
|||
// LoadFilterID from memory.
|
|||
func (s *InMemoryStore) LoadFilterID(userID string) string { |
|||
return s.Filters[userID] |
|||
} |
|||
|
|||
// SaveNextBatch to memory.
|
|||
func (s *InMemoryStore) SaveNextBatch(userID, nextBatchToken string) { |
|||
s.NextBatch[userID] = nextBatchToken |
|||
} |
|||
|
|||
// LoadNextBatch from memory.
|
|||
func (s *InMemoryStore) LoadNextBatch(userID string) string { |
|||
return s.NextBatch[userID] |
|||
} |
|||
|
|||
// SaveRoom to memory.
|
|||
func (s *InMemoryStore) SaveRoom(room *Room) { |
|||
s.Rooms[room.ID] = room |
|||
} |
|||
|
|||
// LoadRoom from memory.
|
|||
func (s *InMemoryStore) LoadRoom(roomID string) *Room { |
|||
return s.Rooms[roomID] |
|||
} |
|||
|
|||
// NewInMemoryStore constructs a new InMemoryStore.
|
|||
func NewInMemoryStore() *InMemoryStore { |
|||
return &InMemoryStore{ |
|||
Filters: make(map[string]string), |
|||
NextBatch: make(map[string]string), |
|||
Rooms: make(map[string]*Room), |
|||
} |
|||
} |
@ -0,0 +1,154 @@ |
|||
package gomatrix |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"fmt" |
|||
"runtime/debug" |
|||
"time" |
|||
) |
|||
|
|||
// Syncer represents an interface that must be satisfied in order to do /sync requests on a client.
|
|||
type Syncer interface { |
|||
// Process the /sync response. The since parameter is the since= value that was used to produce the response.
|
|||
// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
|
|||
// permanently.
|
|||
ProcessResponse(resp *RespSync, since string) error |
|||
// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
|
|||
OnFailedSync(res *RespSync, err error) (time.Duration, error) |
|||
// GetFilterJSON for the given user ID. NOT the filter ID.
|
|||
GetFilterJSON(userID string) json.RawMessage |
|||
} |
|||
|
|||
// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
|
|||
// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
|
|||
// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information.
|
|||
type DefaultSyncer struct { |
|||
UserID string |
|||
Store Storer |
|||
listeners map[string][]OnEventListener // event type to listeners array
|
|||
} |
|||
|
|||
// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
|
|||
type OnEventListener func(*Event) |
|||
|
|||
// NewDefaultSyncer returns an instantiated DefaultSyncer
|
|||
func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer { |
|||
return &DefaultSyncer{ |
|||
UserID: userID, |
|||
Store: store, |
|||
listeners: make(map[string][]OnEventListener), |
|||
} |
|||
} |
|||
|
|||
// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
|
|||
// unrepeating events. Returns a fatal error if a listener panics.
|
|||
func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) { |
|||
if !s.shouldProcessResponse(res, since) { |
|||
return |
|||
} |
|||
|
|||
defer func() { |
|||
if r := recover(); r != nil { |
|||
err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack()) |
|||
} |
|||
}() |
|||
|
|||
for roomID, roomData := range res.Rooms.Join { |
|||
room := s.getOrCreateRoom(roomID) |
|||
for _, event := range roomData.State.Events { |
|||
event.RoomID = roomID |
|||
room.UpdateState(&event) |
|||
s.notifyListeners(&event) |
|||
} |
|||
for _, event := range roomData.Timeline.Events { |
|||
event.RoomID = roomID |
|||
s.notifyListeners(&event) |
|||
} |
|||
} |
|||
for roomID, roomData := range res.Rooms.Invite { |
|||
room := s.getOrCreateRoom(roomID) |
|||
for _, event := range roomData.State.Events { |
|||
event.RoomID = roomID |
|||
room.UpdateState(&event) |
|||
s.notifyListeners(&event) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
// OnEventType allows callers to be notified when there are new events for the given event type.
|
|||
// There are no duplicate checks.
|
|||
func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) { |
|||
_, exists := s.listeners[eventType] |
|||
if !exists { |
|||
s.listeners[eventType] = []OnEventListener{} |
|||
} |
|||
s.listeners[eventType] = append(s.listeners[eventType], callback) |
|||
} |
|||
|
|||
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
|
|||
// stuff that shouldn't be processed.
|
|||
func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool { |
|||
if since == "" { |
|||
return false |
|||
} |
|||
// This is a horrible hack because /sync will return the most recent messages for a room
|
|||
// as soon as you /join it. We do NOT want to process those events in that particular room
|
|||
// because they may have already been processed (if you toggle the bot in/out of the room).
|
|||
//
|
|||
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
|
|||
// exists and is "join" and then discard processing that room entirely if so.
|
|||
// TODO: We probably want to process messages from after the last join event in the timeline.
|
|||
for roomID, roomData := range resp.Rooms.Join { |
|||
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { |
|||
e := roomData.Timeline.Events[i] |
|||
if e.Type == "m.room.member" && e.StateKey == s.UserID { |
|||
m := e.Content["membership"] |
|||
mship, ok := m.(string) |
|||
if !ok { |
|||
continue |
|||
} |
|||
if mship == "join" { |
|||
_, ok := resp.Rooms.Join[roomID] |
|||
if !ok { |
|||
continue |
|||
} |
|||
delete(resp.Rooms.Join, roomID) // don't re-process messages
|
|||
delete(resp.Rooms.Invite, roomID) // don't re-process invites
|
|||
break |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return true |
|||
} |
|||
|
|||
// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
|
|||
func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room { |
|||
room := s.Store.LoadRoom(roomID) |
|||
if room == nil { // create a new Room
|
|||
room = NewRoom(roomID) |
|||
s.Store.SaveRoom(room) |
|||
} |
|||
return room |
|||
} |
|||
|
|||
func (s *DefaultSyncer) notifyListeners(event *Event) { |
|||
listeners, exists := s.listeners[event.Type] |
|||
if !exists { |
|||
return |
|||
} |
|||
for _, fn := range listeners { |
|||
fn(event) |
|||
} |
|||
} |
|||
|
|||
// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
|
|||
func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { |
|||
return 10 * time.Second, nil |
|||
} |
|||
|
|||
// GetFilterJSON returns a filter with a timeline limit of 50.
|
|||
func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage { |
|||
return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue