mirror of https://github.com/matrix-org/go-neb.git
Browse Source
Merge branch 'master' into feature-slack-api
Merge branch 'master' into feature-slack-api
Conflicts: src/github.com/matrix-org/go-neb/goneb.gopull/116/head
Luke Barnard
8 years ago
42 changed files with 2244 additions and 847 deletions
-
18.travis.yml
-
51README.md
-
2gendoc.sh
-
6src/github.com/matrix-org/go-neb/api/handlers/service.go
-
114src/github.com/matrix-org/go-neb/clients/clients.go
-
95src/github.com/matrix-org/go-neb/clients/clients_test.go
-
3src/github.com/matrix-org/go-neb/database/schema.go
-
8src/github.com/matrix-org/go-neb/goneb.go
-
460src/github.com/matrix-org/go-neb/matrix/matrix.go
-
40src/github.com/matrix-org/go-neb/matrix/responses.go
-
81src/github.com/matrix-org/go-neb/matrix/worker.go
-
8src/github.com/matrix-org/go-neb/polling/polling.go
-
3src/github.com/matrix-org/go-neb/server/server_test.go
-
6src/github.com/matrix-org/go-neb/services/echo/echo.go
-
34src/github.com/matrix-org/go-neb/services/giphy/giphy.go
-
30src/github.com/matrix-org/go-neb/services/github/github.go
-
19src/github.com/matrix-org/go-neb/services/github/github_webhook.go
-
67src/github.com/matrix-org/go-neb/services/github/webhook/webhook.go
-
22src/github.com/matrix-org/go-neb/services/github/webhook/webhook_test.go
-
22src/github.com/matrix-org/go-neb/services/guggy/guggy.go
-
102src/github.com/matrix-org/go-neb/services/guggy/guggy_test.go
-
15src/github.com/matrix-org/go-neb/services/jira/jira.go
-
146src/github.com/matrix-org/go-neb/services/rssbot/rssbot.go
-
27src/github.com/matrix-org/go-neb/services/rssbot/rssbot_test.go
-
295src/github.com/matrix-org/go-neb/services/travisci/travisci.go
-
206src/github.com/matrix-org/go-neb/services/travisci/travisci_test.go
-
109src/github.com/matrix-org/go-neb/services/travisci/verify.go
-
28src/github.com/matrix-org/go-neb/testutils/testutils.go
-
7src/github.com/matrix-org/go-neb/types/actions.go
-
20src/github.com/matrix-org/go-neb/types/service.go
-
6vendor/manifest
-
201vendor/src/github.com/matrix-org/gomatrix/LICENSE
-
4vendor/src/github.com/matrix-org/gomatrix/README.md
-
381vendor/src/github.com/matrix-org/gomatrix/client.go
-
28vendor/src/github.com/matrix-org/gomatrix/client_test.go
-
79vendor/src/github.com/matrix-org/gomatrix/events.go
-
5vendor/src/github.com/matrix-org/gomatrix/hooks/install.sh
-
9vendor/src/github.com/matrix-org/gomatrix/hooks/pre-commit
-
61vendor/src/github.com/matrix-org/gomatrix/responses.go
-
50vendor/src/github.com/matrix-org/gomatrix/room.go
-
65vendor/src/github.com/matrix-org/gomatrix/store.go
-
154vendor/src/github.com/matrix-org/gomatrix/sync.go
@ -0,0 +1,18 @@ |
|||||
|
language: go |
||||
|
go: |
||||
|
- 1.6 |
||||
|
install: |
||||
|
- go get github.com/constabulary/gb/... |
||||
|
- go get github.com/golang/lint/golint |
||||
|
- go get github.com/fzipp/gocyclo |
||||
|
|
||||
|
script: gb build github.com/matrix-org/go-neb && ./hooks/pre-commit |
||||
|
|
||||
|
notifications: |
||||
|
webhooks: |
||||
|
urls: |
||||
|
- "https://scalar.vector.im/api/neb/services/hooks/dHJhdmlzLWNpLyU0MGtlZ2FuJTNBbWF0cml4Lm9yZy8lMjFhWmthbkFuV0VkeGNSSVFrV24lM0FtYXRyaXgub3Jn" |
||||
|
on_success: change # always|never|change |
||||
|
on_failure: always |
||||
|
on_start: never |
||||
|
|
@ -0,0 +1,95 @@ |
|||||
|
package clients |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"net/http" |
||||
|
"reflect" |
||||
|
"testing" |
||||
|
|
||||
|
"github.com/matrix-org/go-neb/database" |
||||
|
"github.com/matrix-org/go-neb/types" |
||||
|
"github.com/matrix-org/gomatrix" |
||||
|
) |
||||
|
|
||||
|
var commandParseTests = []struct { |
||||
|
body string |
||||
|
expectArgs []string |
||||
|
}{ |
||||
|
{"!test word", []string{"word"}}, |
||||
|
{"!test two words", []string{"two", "words"}}, |
||||
|
{`!test "words with double quotes"`, []string{"words with double quotes"}}, |
||||
|
{"!test 'words with single quotes'", []string{"words with single quotes"}}, |
||||
|
{`!test 'single quotes' "double quotes"`, []string{"single quotes", "double quotes"}}, |
||||
|
{`!test ‘smart single quotes’ “smart double quotes”`, []string{"smart single quotes", "smart double quotes"}}, |
||||
|
} |
||||
|
|
||||
|
type MockService struct { |
||||
|
types.DefaultService |
||||
|
commands []types.Command |
||||
|
} |
||||
|
|
||||
|
func (s *MockService) Commands(cli *gomatrix.Client) []types.Command { |
||||
|
return s.commands |
||||
|
} |
||||
|
|
||||
|
type MockStore struct { |
||||
|
database.NopStorage |
||||
|
service types.Service |
||||
|
} |
||||
|
|
||||
|
func (d *MockStore) LoadServicesForUser(userID string) ([]types.Service, error) { |
||||
|
return []types.Service{d.service}, nil |
||||
|
} |
||||
|
|
||||
|
type MockTransport struct { |
||||
|
roundTrip func(*http.Request) (*http.Response, error) |
||||
|
} |
||||
|
|
||||
|
func (t MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { |
||||
|
return t.roundTrip(req) |
||||
|
} |
||||
|
|
||||
|
func TestCommandParsing(t *testing.T) { |
||||
|
var executedCmdArgs []string |
||||
|
cmds := []types.Command{ |
||||
|
types.Command{ |
||||
|
Path: []string{"test"}, |
||||
|
Command: func(roomID, userID string, args []string) (interface{}, error) { |
||||
|
executedCmdArgs = args |
||||
|
return nil, nil |
||||
|
}, |
||||
|
}, |
||||
|
} |
||||
|
s := MockService{commands: cmds} |
||||
|
store := MockStore{service: &s} |
||||
|
database.SetServiceDB(&store) |
||||
|
|
||||
|
trans := struct{ MockTransport }{} |
||||
|
trans.roundTrip = func(*http.Request) (*http.Response, error) { |
||||
|
return nil, fmt.Errorf("unhandled test path") |
||||
|
} |
||||
|
cli := &http.Client{ |
||||
|
Transport: trans, |
||||
|
} |
||||
|
clients := New(&store, cli) |
||||
|
mxCli, _ := gomatrix.NewClient("https://someplace.somewhere", "@service:user", "token") |
||||
|
mxCli.Client = cli |
||||
|
|
||||
|
for _, input := range commandParseTests { |
||||
|
executedCmdArgs = []string{} |
||||
|
event := gomatrix.Event{ |
||||
|
Type: "m.room.message", |
||||
|
Sender: "@someone:somewhere", |
||||
|
RoomID: "!foo:bar", |
||||
|
Content: map[string]interface{}{ |
||||
|
"body": input.body, |
||||
|
"msgtype": "m.text", |
||||
|
}, |
||||
|
} |
||||
|
clients.onMessageEvent(mxCli, &event) |
||||
|
if !reflect.DeepEqual(executedCmdArgs, input.expectArgs) { |
||||
|
t.Errorf("TestCommandParsing want %s, got %s", input.expectArgs, executedCmdArgs) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
@ -1,443 +1,69 @@ |
|||||
// Package matrix provides an HTTP client that can interact with a Homeserver via r0 APIs (/sync).
|
|
||||
//
|
|
||||
// It is NOT safe to access the field (or any sub-fields of) 'Rooms' concurrently. In essence, this
|
|
||||
// structure MUST be treated as read-only. The matrix client will update this structure as new events
|
|
||||
// arrive from the homeserver.
|
|
||||
//
|
|
||||
// Internally, the client has 1 goroutine for polling the server, and 1 goroutine for processing data
|
|
||||
// returned. The polling goroutine communicates to the processing goroutine by a buffered channel
|
|
||||
// which feedback loops if processing takes a while as it will delay more data from being pulled down
|
|
||||
// if the buffer gets full. Modification of the 'Rooms' field of the client is done EXCLUSIVELY on the
|
|
||||
// processing goroutine.
|
|
||||
package matrix |
package matrix |
||||
|
|
||||
import ( |
import ( |
||||
"bytes" |
|
||||
"encoding/json" |
"encoding/json" |
||||
"fmt" |
|
||||
"io" |
|
||||
"io/ioutil" |
|
||||
"net/http" |
|
||||
"net/url" |
|
||||
"path" |
|
||||
"strconv" |
|
||||
"sync" |
|
||||
"time" |
|
||||
|
|
||||
log "github.com/Sirupsen/logrus" |
log "github.com/Sirupsen/logrus" |
||||
"github.com/matrix-org/go-neb/api" |
"github.com/matrix-org/go-neb/api" |
||||
"github.com/matrix-org/go-neb/errors" |
|
||||
) |
|
||||
|
|
||||
var ( |
|
||||
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) |
|
||||
|
"github.com/matrix-org/go-neb/database" |
||||
|
"github.com/matrix-org/gomatrix" |
||||
) |
) |
||||
|
|
||||
// NextBatchStorer controls loading/saving of next_batch tokens for users
|
|
||||
type NextBatchStorer interface { |
|
||||
// Save a next_batch token for a given user. Best effort.
|
|
||||
Save(userID, nextBatch string) |
|
||||
// Load a next_batch token for a given user. Return an empty string if no token exists.
|
|
||||
Load(userID string) string |
|
||||
} |
|
||||
|
|
||||
// noopNextBatchStore does not load or save next_batch tokens.
|
|
||||
type noopNextBatchStore struct{} |
|
||||
|
|
||||
func (s noopNextBatchStore) Save(userID, nextBatch string) {} |
|
||||
func (s noopNextBatchStore) Load(userID string) string { return "" } |
|
||||
|
|
||||
// Client represents a Matrix client.
|
|
||||
type Client struct { |
|
||||
HomeserverURL *url.URL |
|
||||
Prefix string |
|
||||
UserID string |
|
||||
AccessToken string |
|
||||
Rooms map[string]*Room |
|
||||
Worker *Worker |
|
||||
syncingMutex sync.Mutex |
|
||||
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
|
||||
httpClient *http.Client |
|
||||
filterID string |
|
||||
NextBatchStorer NextBatchStorer |
|
||||
ClientConfig api.ClientConfig |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) buildURL(urlPath ...string) string { |
|
||||
ps := []string{cli.Prefix} |
|
||||
for _, p := range urlPath { |
|
||||
ps = append(ps, p) |
|
||||
} |
|
||||
return cli.buildBaseURL(ps...) |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) buildBaseURL(urlPath ...string) string { |
|
||||
// copy the URL. Purposefully ignore error as the input is from a valid URL already
|
|
||||
hsURL, _ := url.Parse(cli.HomeserverURL.String()) |
|
||||
parts := []string{hsURL.Path} |
|
||||
parts = append(parts, urlPath...) |
|
||||
hsURL.Path = path.Join(parts...) |
|
||||
query := hsURL.Query() |
|
||||
query.Set("access_token", cli.AccessToken) |
|
||||
hsURL.RawQuery = query.Encode() |
|
||||
return hsURL.String() |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) buildURLWithQuery(urlPath []string, urlQuery map[string]string) string { |
|
||||
u, _ := url.Parse(cli.buildURL(urlPath...)) |
|
||||
q := u.Query() |
|
||||
for k, v := range urlQuery { |
|
||||
q.Set(k, v) |
|
||||
} |
|
||||
u.RawQuery = q.Encode() |
|
||||
return u.String() |
|
||||
} |
|
||||
|
|
||||
// JoinRoom joins the client to a room ID or alias. If serverName is specified, this will be added as a query param
|
|
||||
// to instruct the homeserver to join via that server. If invitingUserID is specified, the inviting user ID will be
|
|
||||
// inserted into the content of the join request. Returns a room ID.
|
|
||||
func (cli *Client) JoinRoom(roomIDorAlias, serverName, invitingUserID string) (string, error) { |
|
||||
var urlPath string |
|
||||
if serverName != "" { |
|
||||
urlPath = cli.buildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ |
|
||||
"server_name": serverName, |
|
||||
}) |
|
||||
} else { |
|
||||
urlPath = cli.buildURL("join", roomIDorAlias) |
|
||||
} |
|
||||
|
|
||||
content := struct { |
|
||||
Inviter string `json:"inviter,omitempty"` |
|
||||
}{} |
|
||||
content.Inviter = invitingUserID |
|
||||
|
|
||||
resBytes, err := cli.sendJSON("POST", urlPath, content) |
|
||||
if err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
var joinRoomResponse joinRoomHTTPResponse |
|
||||
if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
return joinRoomResponse.RoomID, nil |
|
||||
} |
|
||||
|
|
||||
// SetDisplayName sets the user's profile display name
|
|
||||
func (cli *Client) SetDisplayName(displayName string) error { |
|
||||
urlPath := cli.buildURL("profile", cli.UserID, "displayname") |
|
||||
s := struct { |
|
||||
DisplayName string `json:"displayname"` |
|
||||
}{displayName} |
|
||||
_, err := cli.sendJSON("PUT", urlPath, &s) |
|
||||
return err |
|
||||
|
// NEBStore implements the gomatrix.Storer interface.
|
||||
|
//
|
||||
|
// It persists the next batch token in the database, and includes a ClientConfig for the client.
|
||||
|
type NEBStore struct { |
||||
|
gomatrix.InMemoryStore |
||||
|
Database database.Storer |
||||
|
ClientConfig api.ClientConfig |
||||
} |
} |
||||
|
|
||||
// SendMessageEvent sends a message event into a room, returning the event_id on success.
|
|
||||
// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal.
|
|
||||
func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (string, error) { |
|
||||
txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) |
|
||||
urlPath := cli.buildURL("rooms", roomID, "send", eventType, txnID) |
|
||||
resBytes, err := cli.sendJSON("PUT", urlPath, contentJSON) |
|
||||
if err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
var sendEventResponse sendEventHTTPResponse |
|
||||
if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { |
|
||||
return "", err |
|
||||
|
// SaveNextBatch saves to the database.
|
||||
|
func (s *NEBStore) SaveNextBatch(userID, nextBatch string) { |
||||
|
if err := s.Database.UpdateNextBatch(userID, nextBatch); err != nil { |
||||
|
log.WithFields(log.Fields{ |
||||
|
log.ErrorKey: err, |
||||
|
"user_id": userID, |
||||
|
"next_batch": nextBatch, |
||||
|
}).Error("Failed to persist next_batch token") |
||||
} |
} |
||||
return sendEventResponse.EventID, nil |
|
||||
} |
} |
||||
|
|
||||
// SendText sends an m.room.message event into the given room with a msgtype of m.text
|
|
||||
func (cli *Client) SendText(roomID, text string) (string, error) { |
|
||||
return cli.SendMessageEvent(roomID, "m.room.message", |
|
||||
TextMessage{"m.text", text}) |
|
||||
} |
|
||||
|
|
||||
// UploadLink uploads an HTTP URL and then returns an MXC URI.
|
|
||||
func (cli *Client) UploadLink(link string) (string, error) { |
|
||||
res, err := http.Get(link) |
|
||||
if res != nil { |
|
||||
defer res.Body.Close() |
|
||||
} |
|
||||
|
// LoadNextBatch loads from the database.
|
||||
|
func (s *NEBStore) LoadNextBatch(userID string) string { |
||||
|
token, err := s.Database.LoadNextBatch(userID) |
||||
if err != nil { |
if err != nil { |
||||
return "", err |
|
||||
|
log.WithFields(log.Fields{ |
||||
|
log.ErrorKey: err, |
||||
|
"user_id": userID, |
||||
|
}).Error("Failed to load next_batch token") |
||||
|
return "" |
||||
} |
} |
||||
return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) |
|
||||
|
return token |
||||
} |
} |
||||
|
|
||||
// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI.
|
|
||||
func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (string, error) { |
|
||||
req, err := http.NewRequest("POST", cli.buildBaseURL("_matrix/media/r0/upload"), content) |
|
||||
if err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
req.Header.Set("Content-Type", contentType) |
|
||||
req.ContentLength = contentLength |
|
||||
log.WithFields(log.Fields{ |
|
||||
"content_type": contentType, |
|
||||
"content_length": contentLength, |
|
||||
}).Print("Uploading to content repo") |
|
||||
res, err := cli.httpClient.Do(req) |
|
||||
if res != nil { |
|
||||
defer res.Body.Close() |
|
||||
} |
|
||||
if err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
if res.StatusCode != 200 { |
|
||||
return "", fmt.Errorf("Upload request returned HTTP %d", res.StatusCode) |
|
||||
} |
|
||||
m := struct { |
|
||||
ContentURI string `json:"content_uri"` |
|
||||
}{} |
|
||||
if err := json.NewDecoder(res.Body).Decode(&m); err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
return m.ContentURI, nil |
|
||||
|
// StarterLinkMessage represents a message with a starter_link custom data.
|
||||
|
type StarterLinkMessage struct { |
||||
|
Body string |
||||
|
Link string |
||||
} |
} |
||||
|
|
||||
// Sync starts syncing with the provided Homeserver. This function will be invoked continually.
|
|
||||
// If Sync is called twice then the first sync will be stopped.
|
|
||||
func (cli *Client) Sync() { |
|
||||
// Mark the client as syncing.
|
|
||||
// We will keep syncing until the syncing state changes. Either because
|
|
||||
// Sync is called or StopSync is called.
|
|
||||
syncingID := cli.incrementSyncingID() |
|
||||
logger := log.WithFields(log.Fields{ |
|
||||
"syncing": syncingID, |
|
||||
"user_id": cli.UserID, |
|
||||
}) |
|
||||
|
|
||||
// TODO: Store the filter ID in the database
|
|
||||
filterID, err := cli.createFilter() |
|
||||
if err != nil { |
|
||||
logger.WithError(err).Fatal("Failed to create filter") |
|
||||
// TODO: Maybe do some sort of error handling here?
|
|
||||
} |
|
||||
cli.filterID = filterID |
|
||||
logger.WithField("filter", filterID).Print("Got filter ID") |
|
||||
nextToken := cli.NextBatchStorer.Load(cli.UserID) |
|
||||
|
|
||||
logger.WithField("next_batch", nextToken).Print("Starting sync") |
|
||||
|
|
||||
channel := make(chan syncHTTPResponse, 5) |
|
||||
|
|
||||
go func() { |
|
||||
for response := range channel { |
|
||||
cli.Worker.onSyncHTTPResponse(response) |
|
||||
} |
|
||||
}() |
|
||||
defer close(channel) |
|
||||
|
|
||||
for { |
|
||||
// Do a /sync
|
|
||||
syncBytes, err := cli.doSync(30000, nextToken) |
|
||||
if err != nil { |
|
||||
logger.WithError(err).Warn("doSync failed") |
|
||||
time.Sleep(5 * time.Second) |
|
||||
continue |
|
||||
} |
|
||||
|
|
||||
// Decode sync response into syncHTTPResponse
|
|
||||
var syncResponse syncHTTPResponse |
|
||||
if err = json.Unmarshal(syncBytes, &syncResponse); err != nil { |
|
||||
logger.WithError(err).Warn("Failed to decode sync data") |
|
||||
time.Sleep(5 * time.Second) |
|
||||
continue |
|
||||
} |
|
||||
|
// MarshalJSON converts this message into actual event content JSON.
|
||||
|
func (m StarterLinkMessage) MarshalJSON() ([]byte, error) { |
||||
|
var data map[string]string |
||||
|
|
||||
// Check that the syncing state hasn't changed
|
|
||||
// Either because we've stopped syncing or another sync has been started.
|
|
||||
// We discard the response from our sync.
|
|
||||
if cli.getSyncingID() != syncingID { |
|
||||
logger.Print("Stopping sync") |
|
||||
return |
|
||||
} |
|
||||
|
|
||||
processResponse := cli.shouldProcessResponse(nextToken, &syncResponse) |
|
||||
nextToken = syncResponse.NextBatch |
|
||||
|
|
||||
// Save the token now *before* passing it through to the worker. This means it's possible
|
|
||||
// to not process some events, but it means that we won't get constantly stuck processing
|
|
||||
// a malformed/buggy event which keeps making us panic.
|
|
||||
cli.NextBatchStorer.Save(cli.UserID, nextToken) |
|
||||
|
|
||||
if processResponse { |
|
||||
// Update client state
|
|
||||
channel <- syncResponse |
|
||||
|
if m.Link != "" { |
||||
|
data = map[string]string{ |
||||
|
"org.matrix.neb.starter_link": m.Link, |
||||
} |
} |
||||
} |
} |
||||
} |
|
||||
|
|
||||
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
|
|
||||
// stuff that shouldn't be processed.
|
|
||||
func (cli *Client) shouldProcessResponse(tokenOnSync string, syncResponse *syncHTTPResponse) bool { |
|
||||
if tokenOnSync == "" { |
|
||||
return false |
|
||||
} |
|
||||
// This is a horrible hack because /sync will return the most recent messages for a room
|
|
||||
// as soon as you /join it. We do NOT want to process those events in that particular room
|
|
||||
// because they may have already been processed (if you toggle the bot in/out of the room).
|
|
||||
//
|
|
||||
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
|
|
||||
// exists and is "join" and then discard processing that room entirely if so.
|
|
||||
// TODO: We probably want to process the !commands from after the last join event in the timeline.
|
|
||||
for roomID, roomData := range syncResponse.Rooms.Join { |
|
||||
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { |
|
||||
e := roomData.Timeline.Events[i] |
|
||||
if e.Type == "m.room.member" && e.StateKey == cli.UserID { |
|
||||
m := e.Content["membership"] |
|
||||
mship, ok := m.(string) |
|
||||
if !ok { |
|
||||
continue |
|
||||
} |
|
||||
if mship == "join" { |
|
||||
log.WithFields(log.Fields{ |
|
||||
"room_id": roomID, |
|
||||
"user_id": cli.UserID, |
|
||||
"start_token": tokenOnSync, |
|
||||
}).Info("Discarding /sync events in room: just joined it.") |
|
||||
_, ok := syncResponse.Rooms.Join[roomID] |
|
||||
if !ok { |
|
||||
panic("room " + roomID + " does not exist in Join?!") |
|
||||
} |
|
||||
delete(syncResponse.Rooms.Join, roomID) // don't re-process !commands
|
|
||||
delete(syncResponse.Rooms.Invite, roomID) // don't re-process invites
|
|
||||
break |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
return true |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) incrementSyncingID() uint32 { |
|
||||
cli.syncingMutex.Lock() |
|
||||
defer cli.syncingMutex.Unlock() |
|
||||
cli.syncingID++ |
|
||||
return cli.syncingID |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) getSyncingID() uint32 { |
|
||||
cli.syncingMutex.Lock() |
|
||||
defer cli.syncingMutex.Unlock() |
|
||||
return cli.syncingID |
|
||||
} |
|
||||
|
|
||||
// StopSync stops the ongoing sync started by Sync.
|
|
||||
func (cli *Client) StopSync() { |
|
||||
// Advance the syncing state so that any running Syncs will terminate.
|
|
||||
cli.incrementSyncingID() |
|
||||
} |
|
||||
|
|
||||
// This should only be called by the worker goroutine
|
|
||||
func (cli *Client) getOrCreateRoom(roomID string) *Room { |
|
||||
room := cli.Rooms[roomID] |
|
||||
if room == nil { // create a new Room
|
|
||||
room = NewRoom(roomID) |
|
||||
cli.Rooms[roomID] = room |
|
||||
} |
|
||||
return room |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) sendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { |
|
||||
jsonStr, err := json.Marshal(contentJSON) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
req.Header.Set("Content-Type", "application/json") |
|
||||
logger := log.WithFields(log.Fields{ |
|
||||
"method": method, |
|
||||
"url": httpURL, |
|
||||
"json": string(jsonStr), |
|
||||
}) |
|
||||
logger.Print("Sending JSON request") |
|
||||
res, err := cli.httpClient.Do(req) |
|
||||
if res != nil { |
|
||||
defer res.Body.Close() |
|
||||
} |
|
||||
if err != nil { |
|
||||
logger.WithError(err).Warn("Failed to send JSON request") |
|
||||
return nil, err |
|
||||
|
msg := struct { |
||||
|
MsgType string `json:"msgtype"` |
||||
|
Body string `json:"body"` |
||||
|
Data map[string]string `json:"data,omitempty"` |
||||
|
}{ |
||||
|
"m.notice", m.Body, data, |
||||
} |
} |
||||
contents, err := ioutil.ReadAll(res.Body) |
|
||||
if res.StatusCode >= 300 { |
|
||||
logger.WithFields(log.Fields{ |
|
||||
"code": res.StatusCode, |
|
||||
"body": string(contents), |
|
||||
}).Warn("Failed to send JSON request") |
|
||||
return nil, errors.HTTPError{ |
|
||||
Code: res.StatusCode, |
|
||||
Message: "Failed to " + method + " JSON: HTTP " + strconv.Itoa(res.StatusCode), |
|
||||
} |
|
||||
} |
|
||||
if err != nil { |
|
||||
logger.WithError(err).Warn("Failed to read response") |
|
||||
return nil, err |
|
||||
} |
|
||||
return contents, nil |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) createFilter() (string, error) { |
|
||||
urlPath := cli.buildURL("user", cli.UserID, "filter") |
|
||||
resBytes, err := cli.sendJSON("POST", urlPath, &filterJSON) |
|
||||
if err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
var filterResponse filterHTTPResponse |
|
||||
if err = json.Unmarshal(resBytes, &filterResponse); err != nil { |
|
||||
return "", err |
|
||||
} |
|
||||
return filterResponse.FilterID, nil |
|
||||
} |
|
||||
|
|
||||
func (cli *Client) doSync(timeout int, since string) ([]byte, error) { |
|
||||
query := map[string]string{ |
|
||||
"timeout": strconv.Itoa(timeout), |
|
||||
} |
|
||||
if since != "" { |
|
||||
query["since"] = since |
|
||||
} |
|
||||
if cli.filterID != "" { |
|
||||
query["filter"] = cli.filterID |
|
||||
} |
|
||||
urlPath := cli.buildURLWithQuery([]string{"sync"}, query) |
|
||||
req, err := http.NewRequest("GET", urlPath, nil) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
res, err := cli.httpClient.Do(req) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
defer res.Body.Close() |
|
||||
contents, err := ioutil.ReadAll(res.Body) |
|
||||
if err != nil { |
|
||||
return nil, err |
|
||||
} |
|
||||
return contents, nil |
|
||||
} |
|
||||
|
|
||||
// NewClient creates a new Matrix Client ready for syncing
|
|
||||
func NewClient(httpClient *http.Client, homeserverURL *url.URL, accessToken, userID string) *Client { |
|
||||
cli := Client{ |
|
||||
AccessToken: accessToken, |
|
||||
HomeserverURL: homeserverURL, |
|
||||
UserID: userID, |
|
||||
Prefix: "/_matrix/client/r0", |
|
||||
} |
|
||||
cli.Worker = newWorker(&cli) |
|
||||
// By default, use a no-op next_batch storer which will never save tokens and always
|
|
||||
// "load" the empty string as a token. The client will work with this storer: it just won't
|
|
||||
// remember the token across restarts. In practice, a database backend should be used.
|
|
||||
cli.NextBatchStorer = noopNextBatchStore{} |
|
||||
cli.Rooms = make(map[string]*Room) |
|
||||
cli.httpClient = httpClient |
|
||||
|
|
||||
return &cli |
|
||||
|
return json.Marshal(msg) |
||||
} |
} |
@ -1,40 +0,0 @@ |
|||||
package matrix |
|
||||
|
|
||||
type filterHTTPResponse struct { |
|
||||
FilterID string `json:"filter_id"` |
|
||||
} |
|
||||
|
|
||||
type joinRoomHTTPResponse struct { |
|
||||
RoomID string `json:"room_id"` |
|
||||
} |
|
||||
|
|
||||
type sendEventHTTPResponse struct { |
|
||||
EventID string `json:"event_id"` |
|
||||
} |
|
||||
|
|
||||
type syncHTTPResponse struct { |
|
||||
NextBatch string `json:"next_batch"` |
|
||||
AccountData struct { |
|
||||
Events []Event `json:"events"` |
|
||||
} `json:"account_data"` |
|
||||
Presence struct { |
|
||||
Events []Event `json:"events"` |
|
||||
} `json:"presence"` |
|
||||
Rooms struct { |
|
||||
Join map[string]struct { |
|
||||
State struct { |
|
||||
Events []Event `json:"events"` |
|
||||
} `json:"state"` |
|
||||
Timeline struct { |
|
||||
Events []Event `json:"events"` |
|
||||
Limited bool `json:"limited"` |
|
||||
PrevBatch string `json:"prev_batch"` |
|
||||
} `json:"timeline"` |
|
||||
} `json:"join"` |
|
||||
Invite map[string]struct { |
|
||||
State struct { |
|
||||
Events []Event |
|
||||
} `json:"invite_state"` |
|
||||
} `json:"invite"` |
|
||||
} `json:"rooms"` |
|
||||
} |
|
@ -1,81 +0,0 @@ |
|||||
package matrix |
|
||||
|
|
||||
import ( |
|
||||
log "github.com/Sirupsen/logrus" |
|
||||
"runtime/debug" |
|
||||
) |
|
||||
|
|
||||
// Worker processes incoming events and updates the Matrix client's data structures. It also informs
|
|
||||
// any attached listeners of the new events.
|
|
||||
type Worker struct { |
|
||||
client *Client |
|
||||
listeners map[string][]OnEventListener // event type to listeners array
|
|
||||
} |
|
||||
|
|
||||
// OnEventListener can be used with Worker.OnEventType to be informed of incoming events.
|
|
||||
type OnEventListener func(*Event) |
|
||||
|
|
||||
func newWorker(client *Client) *Worker { |
|
||||
return &Worker{ |
|
||||
client, |
|
||||
make(map[string][]OnEventListener), |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
// OnEventType allows callers to be notified when there are new events for the given event type.
|
|
||||
// There are no duplicate checks.
|
|
||||
func (worker *Worker) OnEventType(eventType string, callback OnEventListener) { |
|
||||
_, exists := worker.listeners[eventType] |
|
||||
if !exists { |
|
||||
worker.listeners[eventType] = []OnEventListener{} |
|
||||
} |
|
||||
worker.listeners[eventType] = append(worker.listeners[eventType], callback) |
|
||||
} |
|
||||
|
|
||||
func (worker *Worker) notifyListeners(event *Event) { |
|
||||
listeners, exists := worker.listeners[event.Type] |
|
||||
if !exists { |
|
||||
return |
|
||||
} |
|
||||
for _, fn := range listeners { |
|
||||
fn(event) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
func (worker *Worker) onSyncHTTPResponse(res syncHTTPResponse) { |
|
||||
defer func() { |
|
||||
if r := recover(); r != nil { |
|
||||
userID := "" |
|
||||
if worker.client != nil { |
|
||||
userID = worker.client.UserID |
|
||||
} |
|
||||
log.WithFields(log.Fields{ |
|
||||
"panic": r, |
|
||||
"user_id": userID, |
|
||||
}).Errorf( |
|
||||
"onSyncHTTPResponse panicked!\n%s", debug.Stack(), |
|
||||
) |
|
||||
} |
|
||||
}() |
|
||||
|
|
||||
for roomID, roomData := range res.Rooms.Join { |
|
||||
room := worker.client.getOrCreateRoom(roomID) |
|
||||
for _, event := range roomData.State.Events { |
|
||||
event.RoomID = roomID |
|
||||
room.UpdateState(&event) |
|
||||
worker.notifyListeners(&event) |
|
||||
} |
|
||||
for _, event := range roomData.Timeline.Events { |
|
||||
event.RoomID = roomID |
|
||||
worker.notifyListeners(&event) |
|
||||
} |
|
||||
} |
|
||||
for roomID, roomData := range res.Rooms.Invite { |
|
||||
room := worker.client.getOrCreateRoom(roomID) |
|
||||
for _, event := range roomData.State.Events { |
|
||||
event.RoomID = roomID |
|
||||
room.UpdateState(&event) |
|
||||
worker.notifyListeners(&event) |
|
||||
} |
|
||||
} |
|
||||
} |
|
@ -0,0 +1,102 @@ |
|||||
|
package guggy |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"io/ioutil" |
||||
|
"net/http" |
||||
|
"strings" |
||||
|
"testing" |
||||
|
|
||||
|
"github.com/matrix-org/go-neb/database" |
||||
|
"github.com/matrix-org/go-neb/testutils" |
||||
|
"github.com/matrix-org/go-neb/types" |
||||
|
"github.com/matrix-org/gomatrix" |
||||
|
) |
||||
|
|
||||
|
// TODO: It would be nice to tabularise this test so we can try failing different combinations of responses to make
|
||||
|
// sure all cases are handled, rather than just the general case as is here.
|
||||
|
func TestCommand(t *testing.T) { |
||||
|
database.SetServiceDB(&database.NopStorage{}) |
||||
|
apiKey := "secret" |
||||
|
guggyImageURL := "https://guggy.com/gifs/23ryf872fg" |
||||
|
|
||||
|
// Mock the response from Guggy
|
||||
|
guggyTrans := testutils.NewRoundTripper(func(req *http.Request) (*http.Response, error) { |
||||
|
guggyURL := "https://text2gif.guggy.com/guggify" |
||||
|
if req.URL.String() != guggyURL { |
||||
|
t.Fatalf("Bad URL: got %s want %s", req.URL.String(), guggyURL) |
||||
|
} |
||||
|
if req.Method != "POST" { |
||||
|
t.Fatalf("Bad method: got %s want POST", req.Method) |
||||
|
} |
||||
|
if req.Header.Get("apiKey") != apiKey { |
||||
|
t.Fatalf("Bad apiKey: got %s want %s", req.Header.Get("apiKey"), apiKey) |
||||
|
} |
||||
|
// check the query is in the request body
|
||||
|
var reqBody guggyQuery |
||||
|
if err := json.NewDecoder(req.Body).Decode(&reqBody); err != nil { |
||||
|
t.Fatalf("Failed to read request body: %s", err) |
||||
|
} |
||||
|
if reqBody.Sentence != "hey listen!" { |
||||
|
t.Fatalf("Bad query: got '%s' want '%s'", reqBody.Sentence, "hey listen!") |
||||
|
} |
||||
|
|
||||
|
res := guggyGifResult{ |
||||
|
Width: 64, |
||||
|
Height: 64, |
||||
|
ReqID: "12345", |
||||
|
GIF: guggyImageURL, |
||||
|
} |
||||
|
b, err := json.Marshal(res) |
||||
|
if err != nil { |
||||
|
t.Fatalf("Failed to marshal guggy response", err) |
||||
|
} |
||||
|
return &http.Response{ |
||||
|
StatusCode: 200, |
||||
|
Body: ioutil.NopCloser(bytes.NewBuffer(b)), |
||||
|
}, nil |
||||
|
}) |
||||
|
// clobber the guggy service http client instance
|
||||
|
httpClient = &http.Client{Transport: guggyTrans} |
||||
|
|
||||
|
// Create the Guggy service
|
||||
|
srv, err := types.CreateService("id", ServiceType, "@guggybot:hyrule", []byte( |
||||
|
`{"api_key":"`+apiKey+`"}`, |
||||
|
)) |
||||
|
if err != nil { |
||||
|
t.Fatal("Failed to create Guggy service: ", err) |
||||
|
} |
||||
|
guggy := srv.(*Service) |
||||
|
|
||||
|
// Mock the response from Matrix
|
||||
|
matrixTrans := struct{ testutils.MockTransport }{} |
||||
|
matrixTrans.RT = func(req *http.Request) (*http.Response, error) { |
||||
|
if req.URL.String() == guggyImageURL { // getting the guggy image
|
||||
|
return &http.Response{ |
||||
|
StatusCode: 200, |
||||
|
Body: ioutil.NopCloser(bytes.NewBufferString("some image data")), |
||||
|
}, nil |
||||
|
} else if strings.Contains(req.URL.String(), "_matrix/media/r0/upload") { // uploading the image to matrix
|
||||
|
return &http.Response{ |
||||
|
StatusCode: 200, |
||||
|
Body: ioutil.NopCloser(bytes.NewBufferString(`{"content_uri":"mxc://foo/bar"}`)), |
||||
|
}, nil |
||||
|
} |
||||
|
return nil, fmt.Errorf("Unknown URL: %s", req.URL.String()) |
||||
|
} |
||||
|
matrixCli, _ := gomatrix.NewClient("https://hyrule", "@guggybot:hyrule", "its_a_secret") |
||||
|
matrixCli.Client = &http.Client{Transport: matrixTrans} |
||||
|
|
||||
|
// Execute the matrix !command
|
||||
|
cmds := guggy.Commands(matrixCli) |
||||
|
if len(cmds) != 1 { |
||||
|
t.Fatalf("Unexpected number of commands: %d", len(cmds)) |
||||
|
} |
||||
|
cmd := cmds[0] |
||||
|
_, err = cmd.Command("!someroom:hyrule", "@navi:hyrule", []string{"hey", "listen!"}) |
||||
|
if err != nil { |
||||
|
t.Fatalf("Failed to process command: ", err.Error()) |
||||
|
} |
||||
|
} |
@ -0,0 +1,295 @@ |
|||||
|
// Package travisci implements a Service capable of processing webhooks from Travis-CI.
|
||||
|
package travisci |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"net/http" |
||||
|
"regexp" |
||||
|
"strconv" |
||||
|
"strings" |
||||
|
"time" |
||||
|
|
||||
|
log "github.com/Sirupsen/logrus" |
||||
|
"github.com/matrix-org/go-neb/database" |
||||
|
"github.com/matrix-org/go-neb/types" |
||||
|
"github.com/matrix-org/gomatrix" |
||||
|
) |
||||
|
|
||||
|
// ServiceType of the Travis-CI service.
|
||||
|
const ServiceType = "travis-ci" |
||||
|
|
||||
|
// DefaultTemplate contains the template that will be used if none is supplied.
|
||||
|
// This matches the default mentioned at: https://docs.travis-ci.com/user/notifications#Customizing-slack-notifications
|
||||
|
const DefaultTemplate = (`%{repository}#%{build_number} (%{branch} - %{commit} : %{author}): %{message} |
||||
|
Change view : %{compare_url} |
||||
|
Build details : %{build_url}`) |
||||
|
|
||||
|
// Matches 'owner/repo'
|
||||
|
var ownerRepoRegex = regexp.MustCompile(`^([A-z0-9-_.]+)/([A-z0-9-_.]+)$`) |
||||
|
|
||||
|
var httpClient = &http.Client{} |
||||
|
|
||||
|
// Service contains the Config fields for the Travis-CI service.
|
||||
|
//
|
||||
|
// This service will send notifications into a Matrix room when Travis-CI sends
|
||||
|
// webhook events to it. It requires a public domain which Travis-CI can reach.
|
||||
|
// Notices will be sent as the service user ID.
|
||||
|
//
|
||||
|
// Example JSON request:
|
||||
|
// {
|
||||
|
// rooms: {
|
||||
|
// "!ewfug483gsfe:localhost": {
|
||||
|
// repos: {
|
||||
|
// "matrix-org/go-neb": {
|
||||
|
// template: "%{repository}#%{build_number} (%{branch} - %{commit} : %{author}): %{message}\nBuild details : %{build_url}"
|
||||
|
// }
|
||||
|
// }
|
||||
|
// }
|
||||
|
// }
|
||||
|
// }
|
||||
|
type Service struct { |
||||
|
types.DefaultService |
||||
|
webhookEndpointURL string |
||||
|
// The URL which should be added to .travis.yml - Populated by Go-NEB after Service registration.
|
||||
|
WebhookURL string `json:"webhook_url"` |
||||
|
// A map from Matrix room ID to Github-style owner/repo repositories.
|
||||
|
Rooms map[string]struct { |
||||
|
// A map of "owner/repo" to configuration information
|
||||
|
Repos map[string]struct { |
||||
|
// The template string to use when creating notifications.
|
||||
|
//
|
||||
|
// This is identical to the format of Slack Notifications for Travis-CI:
|
||||
|
// https://docs.travis-ci.com/user/notifications#Customizing-slack-notifications
|
||||
|
//
|
||||
|
// The following variables are available:
|
||||
|
// repository_slug: your GitHub repo identifier (like svenfuchs/minimal)
|
||||
|
// repository_name: the slug without the username
|
||||
|
// build_number: build number
|
||||
|
// build_id: build id
|
||||
|
// branch: branch build name
|
||||
|
// commit: shortened commit SHA
|
||||
|
// author: commit author name
|
||||
|
// commit_message: commit message of build
|
||||
|
// commit_subject: first line of the commit message
|
||||
|
// result: result of build
|
||||
|
// message: Travis CI message to the build
|
||||
|
// duration: total duration of all builds in the matrix
|
||||
|
// elapsed_time: time between build start and finish
|
||||
|
// compare_url: commit change view URL
|
||||
|
// build_url: URL of the build detail
|
||||
|
Template string `json:"template"` |
||||
|
} `json:"repos"` |
||||
|
} `json:"rooms"` |
||||
|
} |
||||
|
|
||||
|
// The payload from Travis-CI
|
||||
|
type webhookNotification struct { |
||||
|
ID int `json:"id"` |
||||
|
Number string `json:"number"` |
||||
|
Status *int `json:"status"` // 0 (success) or 1 (incomplete/fail).
|
||||
|
StartedAt *string `json:"started_at"` |
||||
|
FinishedAt *string `json:"finished_at"` |
||||
|
StatusMessage string `json:"status_message"` |
||||
|
Commit string `json:"commit"` |
||||
|
Branch string `json:"branch"` |
||||
|
Message string `json:"message"` |
||||
|
CompareURL string `json:"compare_url"` |
||||
|
CommittedAt string `json:"committed_at"` |
||||
|
CommitterName string `json:"committer_name"` |
||||
|
CommitterEmail string `json:"committer_email"` |
||||
|
AuthorName string `json:"author_name"` |
||||
|
AuthorEmail string `json:"author_email"` |
||||
|
Type string `json:"type"` |
||||
|
BuildURL string `json:"build_url"` |
||||
|
Repository struct { |
||||
|
Name string `json:"name"` |
||||
|
OwnerName string `json:"owner_name"` |
||||
|
URL string `json:"url"` |
||||
|
} `json:"repository"` |
||||
|
} |
||||
|
|
||||
|
// Converts a webhook notification into a map of template var name to value
|
||||
|
func notifToTemplate(n webhookNotification) map[string]string { |
||||
|
t := make(map[string]string) |
||||
|
t["repository_slug"] = n.Repository.OwnerName + "/" + n.Repository.Name |
||||
|
t["repository"] = t["repository_slug"] // Deprecated form but still used everywhere in people's templates
|
||||
|
t["repository_name"] = n.Repository.Name |
||||
|
t["build_number"] = n.Number |
||||
|
t["build_id"] = strconv.Itoa(n.ID) |
||||
|
t["branch"] = n.Branch |
||||
|
shaLength := len(n.Commit) |
||||
|
if shaLength > 10 { |
||||
|
shaLength = 10 |
||||
|
} |
||||
|
t["commit"] = n.Commit[:shaLength] // shortened commit SHA
|
||||
|
t["author"] = n.CommitterName // author: commit author name
|
||||
|
// commit_message: commit message of build
|
||||
|
// commit_subject: first line of the commit message
|
||||
|
t["commit_message"] = n.Message |
||||
|
subjAndMsg := strings.SplitN(n.Message, "\n", 2) |
||||
|
t["commit_subject"] = subjAndMsg[0] |
||||
|
if n.Status != nil { |
||||
|
t["result"] = strconv.Itoa(*n.Status) |
||||
|
} |
||||
|
t["message"] = n.StatusMessage // message: Travis CI message to the build
|
||||
|
|
||||
|
if n.StartedAt != nil && n.FinishedAt != nil { |
||||
|
// duration: total duration of all builds in the matrix -- TODO
|
||||
|
// elapsed_time: time between build start and finish
|
||||
|
// Example from docs: "2011-11-11T11:11:11Z"
|
||||
|
start, err := time.Parse("2006-01-02T15:04:05Z", *n.StartedAt) |
||||
|
finish, err2 := time.Parse("2006-01-02T15:04:05Z", *n.FinishedAt) |
||||
|
if err != nil || err2 != nil { |
||||
|
log.WithFields(log.Fields{ |
||||
|
"started_at": *n.StartedAt, |
||||
|
"finished_at": *n.FinishedAt, |
||||
|
}).Warn("Failed to parse Travis-CI start/finish times.") |
||||
|
} else { |
||||
|
t["duration"] = finish.Sub(start).String() |
||||
|
t["elapsed_time"] = t["duration"] |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
t["compare_url"] = n.CompareURL |
||||
|
t["build_url"] = n.BuildURL |
||||
|
return t |
||||
|
} |
||||
|
|
||||
|
func outputForTemplate(travisTmpl string, tmpl map[string]string) (out string) { |
||||
|
if travisTmpl == "" { |
||||
|
travisTmpl = DefaultTemplate |
||||
|
} |
||||
|
out = travisTmpl |
||||
|
for tmplVar, tmplValue := range tmpl { |
||||
|
out = strings.Replace(out, "%{"+tmplVar+"}", tmplValue, -1) |
||||
|
} |
||||
|
return out |
||||
|
} |
||||
|
|
||||
|
// OnReceiveWebhook receives requests from Travis-CI and possibly sends requests to Matrix as a result.
|
||||
|
//
|
||||
|
// If the repository matches a known Github repository, a notification will be formed from the
|
||||
|
// template for that repository and a notice will be sent to Matrix.
|
||||
|
//
|
||||
|
// Go-NEB cannot register with Travis-CI for webhooks automatically. The user must manually add the
|
||||
|
// webhook endpoint URL to their .travis.yml file:
|
||||
|
// notifications:
|
||||
|
// webhooks: http://go-neb-endpoint.com/notifications
|
||||
|
//
|
||||
|
// See https://docs.travis-ci.com/user/notifications#Webhook-notifications for more information.
|
||||
|
func (s *Service) OnReceiveWebhook(w http.ResponseWriter, req *http.Request, cli *gomatrix.Client) { |
||||
|
if err := req.ParseForm(); err != nil { |
||||
|
log.WithError(err).Error("Failed to read incoming Travis-CI webhook form") |
||||
|
w.WriteHeader(400) |
||||
|
return |
||||
|
} |
||||
|
payload := req.PostFormValue("payload") |
||||
|
if payload == "" { |
||||
|
log.Error("Travis-CI webhook is missing payload= form value") |
||||
|
w.WriteHeader(400) |
||||
|
return |
||||
|
} |
||||
|
if err := verifyOrigin([]byte(payload), req.Header.Get("Signature")); err != nil { |
||||
|
log.WithFields(log.Fields{ |
||||
|
"Signature": req.Header.Get("Signature"), |
||||
|
log.ErrorKey: err, |
||||
|
}).Warn("Received unauthorised Travis-CI webhook request.") |
||||
|
w.WriteHeader(403) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
var notif webhookNotification |
||||
|
if err := json.Unmarshal([]byte(payload), ¬if); err != nil { |
||||
|
log.WithError(err).Error("Travis-CI webhook received an invalid JSON payload=") |
||||
|
w.WriteHeader(400) |
||||
|
return |
||||
|
} |
||||
|
if notif.Repository.OwnerName == "" || notif.Repository.Name == "" { |
||||
|
log.WithField("repo", notif.Repository).Error("Travis-CI webhook missing repository fields") |
||||
|
w.WriteHeader(400) |
||||
|
return |
||||
|
} |
||||
|
whForRepo := notif.Repository.OwnerName + "/" + notif.Repository.Name |
||||
|
tmplData := notifToTemplate(notif) |
||||
|
|
||||
|
logger := log.WithFields(log.Fields{ |
||||
|
"repo": whForRepo, |
||||
|
}) |
||||
|
|
||||
|
for roomID, roomData := range s.Rooms { |
||||
|
for ownerRepo, repoData := range roomData.Repos { |
||||
|
if ownerRepo != whForRepo { |
||||
|
continue |
||||
|
} |
||||
|
msg := gomatrix.TextMessage{ |
||||
|
Body: outputForTemplate(repoData.Template, tmplData), |
||||
|
MsgType: "m.notice", |
||||
|
} |
||||
|
|
||||
|
logger.WithFields(log.Fields{ |
||||
|
"msg": msg, |
||||
|
"room_id": roomID, |
||||
|
}).Print("Sending Travis-CI notification to room") |
||||
|
if _, e := cli.SendMessageEvent(roomID, "m.room.message", msg); e != nil { |
||||
|
logger.WithError(e).WithField("room_id", roomID).Print( |
||||
|
"Failed to send Travis-CI notification to room.") |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
w.WriteHeader(200) |
||||
|
} |
||||
|
|
||||
|
// Register makes sure the Config information supplied is valid.
|
||||
|
func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error { |
||||
|
s.WebhookURL = s.webhookEndpointURL |
||||
|
for _, roomData := range s.Rooms { |
||||
|
for repo := range roomData.Repos { |
||||
|
match := ownerRepoRegex.FindStringSubmatch(repo) |
||||
|
if len(match) == 0 { |
||||
|
return fmt.Errorf("Repository '%s' is not a valid repository name.", repo) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
s.joinRooms(client) |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// PostRegister deletes this service if there are no registered repos.
|
||||
|
func (s *Service) PostRegister(oldService types.Service) { |
||||
|
for _, roomData := range s.Rooms { |
||||
|
for _ = range roomData.Repos { |
||||
|
return // at least 1 repo exists
|
||||
|
} |
||||
|
} |
||||
|
// Delete this service since no repos are configured
|
||||
|
logger := log.WithFields(log.Fields{ |
||||
|
"service_type": s.ServiceType(), |
||||
|
"service_id": s.ServiceID(), |
||||
|
}) |
||||
|
logger.Info("Removing service as no repositories are registered.") |
||||
|
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil { |
||||
|
logger.WithError(err).Error("Failed to delete service") |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (s *Service) joinRooms(client *gomatrix.Client) { |
||||
|
for roomID := range s.Rooms { |
||||
|
if _, err := client.JoinRoom(roomID, "", nil); err != nil { |
||||
|
log.WithFields(log.Fields{ |
||||
|
log.ErrorKey: err, |
||||
|
"room_id": roomID, |
||||
|
"user_id": client.UserID, |
||||
|
}).Error("Failed to join room") |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func init() { |
||||
|
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service { |
||||
|
return &Service{ |
||||
|
DefaultService: types.NewDefaultService(serviceID, serviceUserID, ServiceType), |
||||
|
webhookEndpointURL: webhookEndpointURL, |
||||
|
} |
||||
|
}) |
||||
|
} |
@ -0,0 +1,206 @@ |
|||||
|
package travisci |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"io/ioutil" |
||||
|
"net/http" |
||||
|
"net/http/httptest" |
||||
|
"strings" |
||||
|
"testing" |
||||
|
|
||||
|
"github.com/matrix-org/go-neb/database" |
||||
|
"github.com/matrix-org/go-neb/testutils" |
||||
|
"github.com/matrix-org/go-neb/types" |
||||
|
"github.com/matrix-org/gomatrix" |
||||
|
) |
||||
|
|
||||
|
const travisOrgPEMPublicKey = (`-----BEGIN PUBLIC KEY----- |
||||
|
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvtjdLkS+FP+0fPC09j25 |
||||
|
y/PiuYDDivIT86COVedvlElk99BBYTrqNaJybxjXbIZ1Q6xFNhOY+iTcBr4E1zJu |
||||
|
tizF3Xi0V9tOuP/M8Wn4Y/1lCWbQKlWrNQuqNBmhovF4K3mDCYswVbpgTmp+JQYu |
||||
|
Bm9QMdieZMNry5s6aiMA9aSjDlNyedvSENYo18F+NYg1J0C0JiPYTxheCb4optr1 |
||||
|
5xNzFKhAkuGs4XTOA5C7Q06GCKtDNf44s/CVE30KODUxBi0MCKaxiXw/yy55zxX2 |
||||
|
/YdGphIyQiA5iO1986ZmZCLLW8udz9uhW5jUr3Jlp9LbmphAC61bVSf4ou2YsJaN |
||||
|
0QIDAQAB |
||||
|
-----END PUBLIC KEY-----`) |
||||
|
|
||||
|
const travisComPEMPublicKey = (`-----BEGIN PUBLIC KEY----- |
||||
|
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnQU2j9lnRtyuW36arNOc |
||||
|
dzCzyKVirLUi3/aLh6UfnTVXzTnx8eHUnBn1ZeQl7Eh3J3qqdbIKl6npS27ONzCy |
||||
|
3PIcfjpLPaVyGagIL8c8XgDEvB45AesC0osVP5gkXQkPUM3B2rrUmp1AZzG+Fuo0 |
||||
|
SAeNnS71gN63U3brL9fN/MTCXJJ6TvMt3GrcJUq5uq56qNeJTsiowK6eiiWFUSfh |
||||
|
e1qapOdMFmcEs9J/R1XQ/scxbAnLcWfl8lqH/MjMdCMe0j3X2ZYMTqOHsb3cQGSS |
||||
|
dMPwZGeLWV+OaxjJ7TrJ+riqMANOgqCBGpvWUnUfo046ACOx7p6u4fFc3aRiuqYK |
||||
|
VQIDAQAB |
||||
|
-----END PUBLIC KEY-----`) |
||||
|
|
||||
|
const exampleSignature = ("pW0CDpmcAeWw3qf2Ufx8UvzfrZRUpYx30HBl9nJcDkh2v9FrF1GjJVsrcqx7ly0FPjb7dkfMJ/d0Q3JpDb1EL4p509cN4Vy8+HpfINw35Wg6JqzOQqTa" + |
||||
|
"TidwoDLXo0NgL78zfiL3dra7ZwOGTA+LmnLSuNp38ROxn70u26uqJzWprGSdVNbRu1LkF1QKLa61NZegfxK7RZn1PlIsznWIyTS0qj81mg2sXMDLH1J4" + |
||||
|
"pHxjEpzydjSb5b8tCjrN+vFLDdAtP5RjU8NwvQM4LRRGbLDIlRsO77HDwfXrPgUE3DjPIqVpHhMcCusygp0ClH2b1J1O3LkhxSS9ol5w99Hkpg==") |
||||
|
const exampleBody = ("payload=%7B%22id%22%3A176075135%2C%22repository%22%3A%7B%22id%22%3A6461770%2C%22name%22%3A%22flow-jsdoc%22%2C%22owner_" + |
||||
|
"name%22%3A%22Kegsay%22%2C%22url%22%3Anull%7D%2C%22number%22%3A%2218%22%2C%22config%22%3A%7B%22notifications%22%3A%7B%22web" + |
||||
|
"hooks%22%3A%5B%22http%3A%2F%2F7abbe705.ngrok.io%22%5D%7D%2C%22language%22%3A%22node_js%22%2C%22node_js%22%3A%5B%224.1%22%5D%2C%22.resu" + |
||||
|
"lt%22%3A%22configured%22%2C%22group%22%3A%22stable%22%2C%22dist%22%3A%22precise%22%7D%2C%22status%22%3A0%2C%22result%22%3A0%2C%22status_" + |
||||
|
"message%22%3A%22Passed%22%2C%22result_message%22%3A%22Passed%22%2C%22started_at%22%3A%222016-11-15T15%3A10%3A22Z%22%2C%22finished_" + |
||||
|
"at%22%3A%222016-11-15T15%3A10%3A54Z%22%2C%22duration%22%3A32%2C%22build_url%22%3A%22https%3A%2F%2Ftravis-ci.org%2FKegsay%2Fflow-js" + |
||||
|
"doc%2Fbuilds%2F176075135%22%2C%22commit_id%22%3A50222535%2C%22commit%22%3A%223a092c3a6032ebb50384c99b445f947e9ce86e2a%22%2C%22base_com" + |
||||
|
"mit%22%3Anull%2C%22head_commit%22%3Anull%2C%22branch%22%3A%22master%22%2C%22message%22%3A%22Test+Travis+webhook+support%22%2C%22compare_" + |
||||
|
"url%22%3A%22https%3A%2F%2Fgithub.com%2FKegsay%2Fflow-jsdoc%2Fcompare%2F9f9d459ba082...3a092c3a6032%22%2C%22committed_at%22%3A%222016-1" + |
||||
|
"1-15T15%3A08%3A16Z%22%2C%22author_name%22%3A%22Kegan+Dougal%22%2C%22author_email%22%3A%22kegan%40matrix.org%22%2C%22committer_" + |
||||
|
"name%22%3A%22Kegan+Dougal%22%2C%22committer_email%22%3A%22kegan%40matrix.org%22%2C%22matrix%22%3A%5B%7B%22id%22%3A176075137%2C%22reposit" + |
||||
|
"ory_id%22%3A6461770%2C%22parent_id%22%3A176075135%2C%22number%22%3A%2218.1%22%2C%22state%22%3A%22finished%22%2C%22config%22%3A%7B%22notifi" + |
||||
|
"cations%22%3A%7B%22webhooks%22%3A%5B%22http%3A%2F%2F7abbe705.ngrok.io%22%5D%7D%2C%22language%22%3A%22node_js%22%2C%22node_" + |
||||
|
"js%22%3A%224.1%22%2C%22.result%22%3A%22configured%22%2C%22group%22%3A%22stable%22%2C%22dist%22%3A%22precise%22%2C%22os%22%3A%22li" + |
||||
|
"nux%22%7D%2C%22status%22%3A0%2C%22result%22%3A0%2C%22commit%22%3A%223a092c3a6032ebb50384c99b445f947e9ce86e2a%22%2C%22branch%22%3A%22mas" + |
||||
|
"ter%22%2C%22message%22%3A%22Test+Travis+webhook+support%22%2C%22compare_url%22%3A%22https%3A%2F%2Fgithub.com%2FKegsay%2Fflow-jsdoc%2Fcomp" + |
||||
|
"are%2F9f9d459ba082...3a092c3a6032%22%2C%22started_at%22%3A%222016-11-15T15%3A10%3A22Z%22%2C%22finished_at%22%3A%222016-11-" + |
||||
|
"15T15%3A10%3A54Z%22%2C%22committed_at%22%3A%222016-11-15T15%3A08%3A16Z%22%2C%22author_name%22%3A%22Kegan+Dougal%22%2C%22author_ema" + |
||||
|
"il%22%3A%22kegan%40matrix.org%22%2C%22committer_name%22%3A%22Kegan+Dougal%22%2C%22committer_email%22%3A%22kegan%40matrix.org%22%2C%22allow_f" + |
||||
|
"ailure%22%3Afalse%7D%5D%2C%22type%22%3A%22push%22%2C%22state%22%3A%22passed%22%2C%22pull_request%22%3Afalse%2C%22pull_request_number%22%3Anu" + |
||||
|
"ll%2C%22pull_request_title%22%3Anull%2C%22tag%22%3Anull%7D") |
||||
|
|
||||
|
var travisTests = []struct { |
||||
|
Signature string |
||||
|
ValidSignature bool |
||||
|
Body string |
||||
|
Template string |
||||
|
ExpectedOutput string |
||||
|
}{ |
||||
|
{ |
||||
|
exampleSignature, true, exampleBody, |
||||
|
"%{repository}#%{build_number} (%{branch} - %{commit} : %{author}): %{message}", |
||||
|
"Kegsay/flow-jsdoc#18 (master - 3a092c3a60 : Kegan Dougal): Passed", |
||||
|
}, |
||||
|
{ |
||||
|
"obviously_invalid_signature", false, exampleBody, |
||||
|
"%{repository}#%{build_number} (%{branch} - %{commit} : %{author}): %{message}", |
||||
|
"Kegsay/flow-jsdoc#18 (master - 3a092c3a60 : Kegan Dougal): Passed", |
||||
|
}, |
||||
|
{ |
||||
|
// Payload is valid but doesn't match signature now
|
||||
|
exampleSignature, false, strings.TrimSuffix(exampleBody, "%7D") + "%2C%22EXTRA_KEY%22%3Anull%7D", |
||||
|
"%{repository}#%{build_number} (%{branch} - %{commit} : %{author}): %{message}", |
||||
|
"Kegsay/flow-jsdoc#18 (master - 3a092c3a60 : Kegan Dougal): Passed", |
||||
|
}, |
||||
|
{ |
||||
|
exampleSignature, true, exampleBody, |
||||
|
"%{repository}#%{build_number} %{duration}", |
||||
|
"Kegsay/flow-jsdoc#18 32s", |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
func TestTravisCI(t *testing.T) { |
||||
|
database.SetServiceDB(&database.NopStorage{}) |
||||
|
|
||||
|
// When the service tries to get Travis' public key, return the constant
|
||||
|
urlToKey := make(map[string]string) |
||||
|
urlToKey["https://api.travis-ci.org/config"] = travisOrgPEMPublicKey |
||||
|
urlToKey["https://api.travis-ci.com/config"] = travisComPEMPublicKey |
||||
|
travisTransport := testutils.NewRoundTripper(func(req *http.Request) (*http.Response, error) { |
||||
|
if key := urlToKey[req.URL.String()]; key != "" { |
||||
|
escKey, _ := json.Marshal(key) |
||||
|
return &http.Response{ |
||||
|
StatusCode: 200, |
||||
|
Body: ioutil.NopCloser(bytes.NewBufferString( |
||||
|
`{"config":{"notifications":{"webhook":{"public_key":` + string(escKey) + `}}}}`, |
||||
|
)), |
||||
|
}, nil |
||||
|
} |
||||
|
return nil, fmt.Errorf("Unhandled URL %s", req.URL.String()) |
||||
|
}) |
||||
|
// clobber the http client that the service uses to talk to Travis
|
||||
|
httpClient = &http.Client{Transport: travisTransport} |
||||
|
|
||||
|
// Intercept message sending to Matrix and mock responses
|
||||
|
msgs := []gomatrix.TextMessage{} |
||||
|
matrixTrans := struct{ testutils.MockTransport }{} |
||||
|
matrixTrans.RT = func(req *http.Request) (*http.Response, error) { |
||||
|
if !strings.Contains(req.URL.String(), "/send/m.room.message") { |
||||
|
return nil, fmt.Errorf("Unhandled URL: %s", req.URL.String()) |
||||
|
} |
||||
|
var msg gomatrix.TextMessage |
||||
|
if err := json.NewDecoder(req.Body).Decode(&msg); err != nil { |
||||
|
return nil, fmt.Errorf("Failed to decode request JSON: %s", err) |
||||
|
} |
||||
|
msgs = append(msgs, msg) |
||||
|
return &http.Response{ |
||||
|
StatusCode: 200, |
||||
|
Body: ioutil.NopCloser(bytes.NewBufferString(`{"event_id":"$yup:event"}`)), |
||||
|
}, nil |
||||
|
} |
||||
|
matrixCli, _ := gomatrix.NewClient("https://hyrule", "@travisci:hyrule", "its_a_secret") |
||||
|
matrixCli.Client = &http.Client{Transport: matrixTrans} |
||||
|
|
||||
|
// BEGIN running the Travis-CI table tests
|
||||
|
// ---------------------------------------
|
||||
|
for _, test := range travisTests { |
||||
|
msgs = []gomatrix.TextMessage{} // reset sent messages
|
||||
|
mockWriter := httptest.NewRecorder() |
||||
|
travis := makeService(t, test.Template) |
||||
|
if travis == nil { |
||||
|
t.Error("TestTravisCI Failed to create service") |
||||
|
continue |
||||
|
} |
||||
|
if err := travis.Register(nil, matrixCli); err != nil { |
||||
|
t.Errorf("TestTravisCI Failed to Register(): %s", err) |
||||
|
continue |
||||
|
} |
||||
|
req, err := http.NewRequest( |
||||
|
"POST", "https://neb.endpoint/travis-ci-service", bytes.NewBufferString(test.Body), |
||||
|
) |
||||
|
if err != nil { |
||||
|
t.Errorf("TestTravisCI Failed to create webhook request: %s", err) |
||||
|
continue |
||||
|
} |
||||
|
req.Header.Set("Signature", test.Signature) |
||||
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") |
||||
|
travis.OnReceiveWebhook(mockWriter, req, matrixCli) |
||||
|
|
||||
|
if test.ValidSignature { |
||||
|
if !assertResponse(t, mockWriter, msgs, 200, 1) { |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
if msgs[0].Body != test.ExpectedOutput { |
||||
|
t.Errorf("TestTravisCI want matrix body '%s', got '%s'", test.ExpectedOutput, msgs[0].Body) |
||||
|
} |
||||
|
} else { |
||||
|
assertResponse(t, mockWriter, msgs, 403, 0) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func assertResponse(t *testing.T, w *httptest.ResponseRecorder, msgs []gomatrix.TextMessage, expectCode int, expectMsgLength int) bool { |
||||
|
if w.Code != expectCode { |
||||
|
t.Errorf("TestTravisCI OnReceiveWebhook want HTTP code %d, got %d", expectCode, w.Code) |
||||
|
return false |
||||
|
} |
||||
|
if len(msgs) != expectMsgLength { |
||||
|
t.Errorf("TestTravisCI want %d sent messages, got %d ", expectMsgLength, len(msgs)) |
||||
|
return false |
||||
|
} |
||||
|
return true |
||||
|
} |
||||
|
|
||||
|
func makeService(t *testing.T, template string) *Service { |
||||
|
srv, err := types.CreateService("id", ServiceType, "@travisci:hyrule", []byte( |
||||
|
`{ |
||||
|
"rooms":{ |
||||
|
"!ewfug483gsfe:localhost": { |
||||
|
"repos": { |
||||
|
"Kegsay/flow-jsdoc": { |
||||
|
"template": "`+template+`" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}`, |
||||
|
)) |
||||
|
if err != nil { |
||||
|
t.Error("Failed to create Travis-CI service: ", err) |
||||
|
return nil |
||||
|
} |
||||
|
return srv.(*Service) |
||||
|
} |
@ -0,0 +1,109 @@ |
|||||
|
package travisci |
||||
|
|
||||
|
import ( |
||||
|
"crypto" |
||||
|
"crypto/rsa" |
||||
|
"crypto/sha1" |
||||
|
"crypto/x509" |
||||
|
"encoding/base64" |
||||
|
"encoding/json" |
||||
|
"encoding/pem" |
||||
|
"fmt" |
||||
|
"net/http" |
||||
|
"strings" |
||||
|
) |
||||
|
|
||||
|
// Host => Public Key.
|
||||
|
// Travis has a .com and .org with different public keys.
|
||||
|
// .org is the public one and is one we will try first, then .com
|
||||
|
var travisPublicKeyMap = map[string]*rsa.PublicKey{ |
||||
|
"api.travis-ci.org": nil, |
||||
|
"api.travis-ci.com": nil, |
||||
|
} |
||||
|
|
||||
|
func verifyOrigin(payload []byte, sigHeader string) error { |
||||
|
/* |
||||
|
From: https://docs.travis-ci.com/user/notifications#Verifying-Webhook-requests
|
||||
|
1. Pick up the payload data from the HTTP request’s body. |
||||
|
2. Obtain the Signature header value, and base64-decode it. |
||||
|
3. Obtain the public key corresponding to the private key that signed the payload. |
||||
|
This is available at the /config endpoint’s config.notifications.webhook.public_key on |
||||
|
the relevant API server. (e.g., https://api.travis-ci.org/config)
|
||||
|
4. Verify the signature using the public key and SHA1 digest. |
||||
|
*/ |
||||
|
sig, err := base64.StdEncoding.DecodeString(sigHeader) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("verifyOrigin: Failed to decode signature as base64: %s", err) |
||||
|
} |
||||
|
|
||||
|
if err := loadPublicKeys(); err != nil { |
||||
|
return fmt.Errorf("verifyOrigin: Failed to cache Travis public keys: %s", err) |
||||
|
} |
||||
|
|
||||
|
// 4. Verify with SHA1
|
||||
|
// NB: We don't know who sent this request (no Referer header or anything) so we need to try
|
||||
|
// both public keys at both endpoints. We use the .org one first since it's more popular.
|
||||
|
var verifyErr error |
||||
|
for _, host := range []string{"api.travis-ci.org", "api.travis-ci.com"} { |
||||
|
h := sha1.New() |
||||
|
h.Write(payload) |
||||
|
digest := h.Sum(nil) |
||||
|
verifyErr = rsa.VerifyPKCS1v15(travisPublicKeyMap[host], crypto.SHA1, digest, sig) |
||||
|
if verifyErr == nil { |
||||
|
return nil // Valid for this key
|
||||
|
} |
||||
|
} |
||||
|
return fmt.Errorf("verifyOrigin: Signature verification failed: %s", verifyErr) |
||||
|
} |
||||
|
|
||||
|
func loadPublicKeys() error { |
||||
|
for _, host := range []string{"api.travis-ci.com", "api.travis-ci.org"} { |
||||
|
pubKey := travisPublicKeyMap[host] |
||||
|
if pubKey == nil { |
||||
|
pemPubKey, err := fetchPEMPublicKey("https://" + host + "/config") |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
block, _ := pem.Decode([]byte(pemPubKey)) |
||||
|
if block == nil { |
||||
|
return fmt.Errorf("public_key at %s doesn't have a valid PEM block", host) |
||||
|
} |
||||
|
|
||||
|
k, err := x509.ParsePKIXPublicKey(block.Bytes) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
pubKey = k.(*rsa.PublicKey) |
||||
|
travisPublicKeyMap[host] = pubKey |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func fetchPEMPublicKey(travisURL string) (key string, err error) { |
||||
|
var res *http.Response |
||||
|
res, err = httpClient.Get(travisURL) |
||||
|
if res != nil { |
||||
|
defer res.Body.Close() |
||||
|
} |
||||
|
if err != nil { |
||||
|
return |
||||
|
} |
||||
|
configStruct := struct { |
||||
|
Config struct { |
||||
|
Notifications struct { |
||||
|
Webhook struct { |
||||
|
PublicKey string `json:"public_key"` |
||||
|
} `json:"webhook"` |
||||
|
} `json:"notifications"` |
||||
|
} `json:"config"` |
||||
|
}{} |
||||
|
if err = json.NewDecoder(res.Body).Decode(&configStruct); err != nil { |
||||
|
return |
||||
|
} |
||||
|
key = configStruct.Config.Notifications.Webhook.PublicKey |
||||
|
if key == "" || !strings.HasPrefix(key, "-----BEGIN PUBLIC KEY-----") { |
||||
|
err = fmt.Errorf("Couldn't fetch Travis-CI public key. Missing or malformed key: %s", key) |
||||
|
} |
||||
|
return |
||||
|
} |
@ -0,0 +1,28 @@ |
|||||
|
package testutils |
||||
|
|
||||
|
import ( |
||||
|
"net/http" |
||||
|
) |
||||
|
|
||||
|
// MockTransport implements RoundTripper
|
||||
|
type MockTransport struct { |
||||
|
// RT is the RoundTrip function. Replace this function with your test function.
|
||||
|
// For example:
|
||||
|
// t := MockTransport{}
|
||||
|
// t.RT = func(req *http.Request) (*http.Response, error) {
|
||||
|
// // assert req args, return res or error
|
||||
|
// }
|
||||
|
RT func(*http.Request) (*http.Response, error) |
||||
|
} |
||||
|
|
||||
|
// RoundTrip is a RoundTripper
|
||||
|
func (t MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { |
||||
|
return t.RT(req) |
||||
|
} |
||||
|
|
||||
|
// NewRoundTripper returns a new RoundTripper which will call the provided function.
|
||||
|
func NewRoundTripper(roundTrip func(*http.Request) (*http.Response, error)) http.RoundTripper { |
||||
|
rt := MockTransport{} |
||||
|
rt.RT = roundTrip |
||||
|
return rt |
||||
|
} |
@ -0,0 +1,201 @@ |
|||||
|
Apache License |
||||
|
Version 2.0, January 2004 |
||||
|
http://www.apache.org/licenses/ |
||||
|
|
||||
|
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION |
||||
|
|
||||
|
1. Definitions. |
||||
|
|
||||
|
"License" shall mean the terms and conditions for use, reproduction, |
||||
|
and distribution as defined by Sections 1 through 9 of this document. |
||||
|
|
||||
|
"Licensor" shall mean the copyright owner or entity authorized by |
||||
|
the copyright owner that is granting the License. |
||||
|
|
||||
|
"Legal Entity" shall mean the union of the acting entity and all |
||||
|
other entities that control, are controlled by, or are under common |
||||
|
control with that entity. For the purposes of this definition, |
||||
|
"control" means (i) the power, direct or indirect, to cause the |
||||
|
direction or management of such entity, whether by contract or |
||||
|
otherwise, or (ii) ownership of fifty percent (50%) or more of the |
||||
|
outstanding shares, or (iii) beneficial ownership of such entity. |
||||
|
|
||||
|
"You" (or "Your") shall mean an individual or Legal Entity |
||||
|
exercising permissions granted by this License. |
||||
|
|
||||
|
"Source" form shall mean the preferred form for making modifications, |
||||
|
including but not limited to software source code, documentation |
||||
|
source, and configuration files. |
||||
|
|
||||
|
"Object" form shall mean any form resulting from mechanical |
||||
|
transformation or translation of a Source form, including but |
||||
|
not limited to compiled object code, generated documentation, |
||||
|
and conversions to other media types. |
||||
|
|
||||
|
"Work" shall mean the work of authorship, whether in Source or |
||||
|
Object form, made available under the License, as indicated by a |
||||
|
copyright notice that is included in or attached to the work |
||||
|
(an example is provided in the Appendix below). |
||||
|
|
||||
|
"Derivative Works" shall mean any work, whether in Source or Object |
||||
|
form, that is based on (or derived from) the Work and for which the |
||||
|
editorial revisions, annotations, elaborations, or other modifications |
||||
|
represent, as a whole, an original work of authorship. For the purposes |
||||
|
of this License, Derivative Works shall not include works that remain |
||||
|
separable from, or merely link (or bind by name) to the interfaces of, |
||||
|
the Work and Derivative Works thereof. |
||||
|
|
||||
|
"Contribution" shall mean any work of authorship, including |
||||
|
the original version of the Work and any modifications or additions |
||||
|
to that Work or Derivative Works thereof, that is intentionally |
||||
|
submitted to Licensor for inclusion in the Work by the copyright owner |
||||
|
or by an individual or Legal Entity authorized to submit on behalf of |
||||
|
the copyright owner. For the purposes of this definition, "submitted" |
||||
|
means any form of electronic, verbal, or written communication sent |
||||
|
to the Licensor or its representatives, including but not limited to |
||||
|
communication on electronic mailing lists, source code control systems, |
||||
|
and issue tracking systems that are managed by, or on behalf of, the |
||||
|
Licensor for the purpose of discussing and improving the Work, but |
||||
|
excluding communication that is conspicuously marked or otherwise |
||||
|
designated in writing by the copyright owner as "Not a Contribution." |
||||
|
|
||||
|
"Contributor" shall mean Licensor and any individual or Legal Entity |
||||
|
on behalf of whom a Contribution has been received by Licensor and |
||||
|
subsequently incorporated within the Work. |
||||
|
|
||||
|
2. Grant of Copyright License. Subject to the terms and conditions of |
||||
|
this License, each Contributor hereby grants to You a perpetual, |
||||
|
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
||||
|
copyright license to reproduce, prepare Derivative Works of, |
||||
|
publicly display, publicly perform, sublicense, and distribute the |
||||
|
Work and such Derivative Works in Source or Object form. |
||||
|
|
||||
|
3. Grant of Patent License. Subject to the terms and conditions of |
||||
|
this License, each Contributor hereby grants to You a perpetual, |
||||
|
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
||||
|
(except as stated in this section) patent license to make, have made, |
||||
|
use, offer to sell, sell, import, and otherwise transfer the Work, |
||||
|
where such license applies only to those patent claims licensable |
||||
|
by such Contributor that are necessarily infringed by their |
||||
|
Contribution(s) alone or by combination of their Contribution(s) |
||||
|
with the Work to which such Contribution(s) was submitted. If You |
||||
|
institute patent litigation against any entity (including a |
||||
|
cross-claim or counterclaim in a lawsuit) alleging that the Work |
||||
|
or a Contribution incorporated within the Work constitutes direct |
||||
|
or contributory patent infringement, then any patent licenses |
||||
|
granted to You under this License for that Work shall terminate |
||||
|
as of the date such litigation is filed. |
||||
|
|
||||
|
4. Redistribution. You may reproduce and distribute copies of the |
||||
|
Work or Derivative Works thereof in any medium, with or without |
||||
|
modifications, and in Source or Object form, provided that You |
||||
|
meet the following conditions: |
||||
|
|
||||
|
(a) You must give any other recipients of the Work or |
||||
|
Derivative Works a copy of this License; and |
||||
|
|
||||
|
(b) You must cause any modified files to carry prominent notices |
||||
|
stating that You changed the files; and |
||||
|
|
||||
|
(c) You must retain, in the Source form of any Derivative Works |
||||
|
that You distribute, all copyright, patent, trademark, and |
||||
|
attribution notices from the Source form of the Work, |
||||
|
excluding those notices that do not pertain to any part of |
||||
|
the Derivative Works; and |
||||
|
|
||||
|
(d) If the Work includes a "NOTICE" text file as part of its |
||||
|
distribution, then any Derivative Works that You distribute must |
||||
|
include a readable copy of the attribution notices contained |
||||
|
within such NOTICE file, excluding those notices that do not |
||||
|
pertain to any part of the Derivative Works, in at least one |
||||
|
of the following places: within a NOTICE text file distributed |
||||
|
as part of the Derivative Works; within the Source form or |
||||
|
documentation, if provided along with the Derivative Works; or, |
||||
|
within a display generated by the Derivative Works, if and |
||||
|
wherever such third-party notices normally appear. The contents |
||||
|
of the NOTICE file are for informational purposes only and |
||||
|
do not modify the License. You may add Your own attribution |
||||
|
notices within Derivative Works that You distribute, alongside |
||||
|
or as an addendum to the NOTICE text from the Work, provided |
||||
|
that such additional attribution notices cannot be construed |
||||
|
as modifying the License. |
||||
|
|
||||
|
You may add Your own copyright statement to Your modifications and |
||||
|
may provide additional or different license terms and conditions |
||||
|
for use, reproduction, or distribution of Your modifications, or |
||||
|
for any such Derivative Works as a whole, provided Your use, |
||||
|
reproduction, and distribution of the Work otherwise complies with |
||||
|
the conditions stated in this License. |
||||
|
|
||||
|
5. Submission of Contributions. Unless You explicitly state otherwise, |
||||
|
any Contribution intentionally submitted for inclusion in the Work |
||||
|
by You to the Licensor shall be under the terms and conditions of |
||||
|
this License, without any additional terms or conditions. |
||||
|
Notwithstanding the above, nothing herein shall supersede or modify |
||||
|
the terms of any separate license agreement you may have executed |
||||
|
with Licensor regarding such Contributions. |
||||
|
|
||||
|
6. Trademarks. This License does not grant permission to use the trade |
||||
|
names, trademarks, service marks, or product names of the Licensor, |
||||
|
except as required for reasonable and customary use in describing the |
||||
|
origin of the Work and reproducing the content of the NOTICE file. |
||||
|
|
||||
|
7. Disclaimer of Warranty. Unless required by applicable law or |
||||
|
agreed to in writing, Licensor provides the Work (and each |
||||
|
Contributor provides its Contributions) on an "AS IS" BASIS, |
||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
||||
|
implied, including, without limitation, any warranties or conditions |
||||
|
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A |
||||
|
PARTICULAR PURPOSE. You are solely responsible for determining the |
||||
|
appropriateness of using or redistributing the Work and assume any |
||||
|
risks associated with Your exercise of permissions under this License. |
||||
|
|
||||
|
8. Limitation of Liability. In no event and under no legal theory, |
||||
|
whether in tort (including negligence), contract, or otherwise, |
||||
|
unless required by applicable law (such as deliberate and grossly |
||||
|
negligent acts) or agreed to in writing, shall any Contributor be |
||||
|
liable to You for damages, including any direct, indirect, special, |
||||
|
incidental, or consequential damages of any character arising as a |
||||
|
result of this License or out of the use or inability to use the |
||||
|
Work (including but not limited to damages for loss of goodwill, |
||||
|
work stoppage, computer failure or malfunction, or any and all |
||||
|
other commercial damages or losses), even if such Contributor |
||||
|
has been advised of the possibility of such damages. |
||||
|
|
||||
|
9. Accepting Warranty or Additional Liability. While redistributing |
||||
|
the Work or Derivative Works thereof, You may choose to offer, |
||||
|
and charge a fee for, acceptance of support, warranty, indemnity, |
||||
|
or other liability obligations and/or rights consistent with this |
||||
|
License. However, in accepting such obligations, You may act only |
||||
|
on Your own behalf and on Your sole responsibility, not on behalf |
||||
|
of any other Contributor, and only if You agree to indemnify, |
||||
|
defend, and hold each Contributor harmless for any liability |
||||
|
incurred by, or claims asserted against, such Contributor by reason |
||||
|
of your accepting any such warranty or additional liability. |
||||
|
|
||||
|
END OF TERMS AND CONDITIONS |
||||
|
|
||||
|
APPENDIX: How to apply the Apache License to your work. |
||||
|
|
||||
|
To apply the Apache License to your work, attach the following |
||||
|
boilerplate notice, with the fields enclosed by brackets "{}" |
||||
|
replaced with your own identifying information. (Don't include |
||||
|
the brackets!) The text should be enclosed in the appropriate |
||||
|
comment syntax for the file format. We also recommend that a |
||||
|
file or class name and description of purpose be included on the |
||||
|
same "printed page" as the copyright notice for easier |
||||
|
identification within third-party archives. |
||||
|
|
||||
|
Copyright {yyyy} {name of copyright owner} |
||||
|
|
||||
|
Licensed under the Apache License, Version 2.0 (the "License"); |
||||
|
you may not use this file except in compliance with the License. |
||||
|
You may obtain a copy of the License at |
||||
|
|
||||
|
http://www.apache.org/licenses/LICENSE-2.0 |
||||
|
|
||||
|
Unless required by applicable law or agreed to in writing, software |
||||
|
distributed under the License is distributed on an "AS IS" BASIS, |
||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
|
See the License for the specific language governing permissions and |
||||
|
limitations under the License. |
@ -0,0 +1,4 @@ |
|||||
|
# gomatrix |
||||
|
[![GoDoc](https://godoc.org/github.com/matrix-org/gomatrix?status.svg)](https://godoc.org/github.com/matrix-org/gomatrix) |
||||
|
|
||||
|
A Golang Matrix client |
@ -0,0 +1,381 @@ |
|||||
|
// Package gomatrix implements the Matrix Client-Server API.
|
||||
|
//
|
||||
|
// Specification can be found at http://matrix.org/docs/spec/client_server/r0.2.0.html
|
||||
|
//
|
||||
|
// Example usage of this library: (blocking version)
|
||||
|
// cli, _ := gomatrix.NewClient("https://matrix.org", "@example:matrix.org", "MDAefhiuwehfuiwe")
|
||||
|
// syncer := cli.Syncer.(*gomatrix.DefaultSyncer)
|
||||
|
// syncer.OnEventType("m.room.message", func(ev *gomatrix.Event) {
|
||||
|
// fmt.Println("Message: ", ev)
|
||||
|
// })
|
||||
|
// if err := cli.Sync(); err != nil {
|
||||
|
// fmt.Println("Sync() returned ", err)
|
||||
|
// }
|
||||
|
//
|
||||
|
// To make the example non-blocking, call Sync() in a goroutine.
|
||||
|
package gomatrix |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"io" |
||||
|
"io/ioutil" |
||||
|
"net/http" |
||||
|
"net/url" |
||||
|
"path" |
||||
|
"strconv" |
||||
|
"sync" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Client represents a Matrix client.
|
||||
|
type Client struct { |
||||
|
HomeserverURL *url.URL // The base homeserver URL
|
||||
|
Prefix string // The API prefix eg '/_matrix/client/r0'
|
||||
|
UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID.
|
||||
|
AccessToken string // The access_token for the client.
|
||||
|
syncingMutex sync.Mutex // protects syncingID
|
||||
|
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
||||
|
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
|
||||
|
Syncer Syncer // The thing which can process /sync responses
|
||||
|
Store Storer // The thing which can store rooms/tokens/ids
|
||||
|
} |
||||
|
|
||||
|
// HTTPError An HTTP Error response, which may wrap an underlying native Go Error.
|
||||
|
type HTTPError struct { |
||||
|
WrappedError error |
||||
|
Message string |
||||
|
Code int |
||||
|
} |
||||
|
|
||||
|
func (e HTTPError) Error() string { |
||||
|
var wrappedErrMsg string |
||||
|
if e.WrappedError != nil { |
||||
|
wrappedErrMsg = e.WrappedError.Error() |
||||
|
} |
||||
|
return fmt.Sprintf("msg=%s code=%d wrapped=%s", e.Message, e.Code, wrappedErrMsg) |
||||
|
} |
||||
|
|
||||
|
// BuildURL builds a URL with the Client's homserver/prefix/access_token set already.
|
||||
|
func (cli *Client) BuildURL(urlPath ...string) string { |
||||
|
ps := []string{cli.Prefix} |
||||
|
for _, p := range urlPath { |
||||
|
ps = append(ps, p) |
||||
|
} |
||||
|
return cli.BuildBaseURL(ps...) |
||||
|
} |
||||
|
|
||||
|
// BuildBaseURL builds a URL with the Client's homeserver/access_token set already. You must
|
||||
|
// supply the prefix in the path.
|
||||
|
func (cli *Client) BuildBaseURL(urlPath ...string) string { |
||||
|
// copy the URL. Purposefully ignore error as the input is from a valid URL already
|
||||
|
hsURL, _ := url.Parse(cli.HomeserverURL.String()) |
||||
|
parts := []string{hsURL.Path} |
||||
|
parts = append(parts, urlPath...) |
||||
|
hsURL.Path = path.Join(parts...) |
||||
|
query := hsURL.Query() |
||||
|
query.Set("access_token", cli.AccessToken) |
||||
|
hsURL.RawQuery = query.Encode() |
||||
|
return hsURL.String() |
||||
|
} |
||||
|
|
||||
|
// BuildURLWithQuery builds a URL with query paramters in addition to the Client's homeserver/prefix/access_token set already.
|
||||
|
func (cli *Client) BuildURLWithQuery(urlPath []string, urlQuery map[string]string) string { |
||||
|
u, _ := url.Parse(cli.BuildURL(urlPath...)) |
||||
|
q := u.Query() |
||||
|
for k, v := range urlQuery { |
||||
|
q.Set(k, v) |
||||
|
} |
||||
|
u.RawQuery = q.Encode() |
||||
|
return u.String() |
||||
|
} |
||||
|
|
||||
|
// Sync starts syncing with the provided Homeserver. This function will block until a fatal /sync error occurs, so should
|
||||
|
// almost always be started as a new goroutine. If Sync() is called twice then the first sync will be stopped.
|
||||
|
func (cli *Client) Sync() error { |
||||
|
// Mark the client as syncing.
|
||||
|
// We will keep syncing until the syncing state changes. Either because
|
||||
|
// Sync is called or StopSync is called.
|
||||
|
syncingID := cli.incrementSyncingID() |
||||
|
nextBatch := cli.Store.LoadNextBatch(cli.UserID) |
||||
|
filterID := cli.Store.LoadFilterID(cli.UserID) |
||||
|
if filterID == "" { |
||||
|
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID) |
||||
|
resFilter, err := cli.CreateFilter(filterJSON) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
filterID = resFilter.FilterID |
||||
|
cli.Store.SaveFilterID(cli.UserID, filterID) |
||||
|
} |
||||
|
|
||||
|
for { |
||||
|
resSync, err := cli.SyncRequest(30000, nextBatch, filterID, false, "") |
||||
|
if err != nil { |
||||
|
duration, err2 := cli.Syncer.OnFailedSync(resSync, err) |
||||
|
if err2 != nil { |
||||
|
return err2 |
||||
|
} |
||||
|
time.Sleep(duration) |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
// Check that the syncing state hasn't changed
|
||||
|
// Either because we've stopped syncing or another sync has been started.
|
||||
|
// We discard the response from our sync.
|
||||
|
if cli.getSyncingID() != syncingID { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// Save the token now *before* processing it. This means it's possible
|
||||
|
// to not process some events, but it means that we won't get constantly stuck processing
|
||||
|
// a malformed/buggy event which keeps making us panic.
|
||||
|
cli.Store.SaveNextBatch(cli.UserID, resSync.NextBatch) |
||||
|
if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
nextBatch = resSync.NextBatch |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
func (cli *Client) incrementSyncingID() uint32 { |
||||
|
cli.syncingMutex.Lock() |
||||
|
defer cli.syncingMutex.Unlock() |
||||
|
cli.syncingID++ |
||||
|
return cli.syncingID |
||||
|
} |
||||
|
|
||||
|
func (cli *Client) getSyncingID() uint32 { |
||||
|
cli.syncingMutex.Lock() |
||||
|
defer cli.syncingMutex.Unlock() |
||||
|
return cli.syncingID |
||||
|
} |
||||
|
|
||||
|
// StopSync stops the ongoing sync started by Sync.
|
||||
|
func (cli *Client) StopSync() { |
||||
|
// Advance the syncing state so that any running Syncs will terminate.
|
||||
|
cli.incrementSyncingID() |
||||
|
} |
||||
|
|
||||
|
// SendJSON sends JSON to the given URL.
|
||||
|
//
|
||||
|
// Returns the HTTP body as bytes on 2xx. Returns an error if the response is not 2xx. This error
|
||||
|
// is an HTTPError which includes the returned HTTP status code and possibly a RespError as the
|
||||
|
// WrappedError, if the HTTP body could be decoded as a RespError.
|
||||
|
func (cli *Client) SendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { |
||||
|
jsonStr, err := json.Marshal(contentJSON) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
req.Header.Set("Content-Type", "application/json") |
||||
|
res, err := cli.Client.Do(req) |
||||
|
if res != nil { |
||||
|
defer res.Body.Close() |
||||
|
} |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
contents, err := ioutil.ReadAll(res.Body) |
||||
|
if res.StatusCode >= 300 || res.StatusCode < 200 { |
||||
|
var wrap error |
||||
|
var respErr RespError |
||||
|
if _ = json.Unmarshal(contents, respErr); respErr.ErrCode != "" { |
||||
|
wrap = respErr |
||||
|
} |
||||
|
|
||||
|
// If we failed to decode as RespError, don't just drop the HTTP body, include it in the
|
||||
|
// HTTP error instead (e.g proxy errors which return HTML).
|
||||
|
msg := "Failed to " + method + " JSON" |
||||
|
if wrap == nil { |
||||
|
msg = msg + ": " + string(contents) |
||||
|
} |
||||
|
|
||||
|
return nil, HTTPError{ |
||||
|
Code: res.StatusCode, |
||||
|
Message: msg, |
||||
|
WrappedError: wrap, |
||||
|
} |
||||
|
} |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return contents, nil |
||||
|
} |
||||
|
|
||||
|
// CreateFilter makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter
|
||||
|
func (cli *Client) CreateFilter(filter json.RawMessage) (*RespCreateFilter, error) { |
||||
|
urlPath := cli.BuildURL("user", cli.UserID, "filter") |
||||
|
resBytes, err := cli.SendJSON("POST", urlPath, &filter) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
var filterResponse RespCreateFilter |
||||
|
if err = json.Unmarshal(resBytes, &filterResponse); err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return &filterResponse, nil |
||||
|
} |
||||
|
|
||||
|
// SyncRequest makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||
|
func (cli *Client) SyncRequest(timeout int, since, filterID string, fullState bool, setPresence string) (*RespSync, error) { |
||||
|
query := map[string]string{ |
||||
|
"timeout": strconv.Itoa(timeout), |
||||
|
} |
||||
|
if since != "" { |
||||
|
query["since"] = since |
||||
|
} |
||||
|
if filterID != "" { |
||||
|
query["filter"] = filterID |
||||
|
} |
||||
|
if setPresence != "" { |
||||
|
query["set_presence"] = setPresence |
||||
|
} |
||||
|
if fullState { |
||||
|
query["full_state"] = "true" |
||||
|
} |
||||
|
urlPath := cli.BuildURLWithQuery([]string{"sync"}, query) |
||||
|
req, err := http.NewRequest("GET", urlPath, nil) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
res, err := cli.Client.Do(req) |
||||
|
if res != nil { |
||||
|
defer res.Body.Close() |
||||
|
} |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
var syncResponse RespSync |
||||
|
err = json.NewDecoder(res.Body).Decode(&syncResponse) |
||||
|
return &syncResponse, err |
||||
|
} |
||||
|
|
||||
|
// JoinRoom joins the client to a room ID or alias. See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-join-roomidoralias
|
||||
|
//
|
||||
|
// If serverName is specified, this will be added as a query param to instruct the homeserver to join via that server. If content is specified, it will
|
||||
|
// be JSON encoded and used as the request body.
|
||||
|
func (cli *Client) JoinRoom(roomIDorAlias, serverName string, content interface{}) (*RespJoinRoom, error) { |
||||
|
var urlPath string |
||||
|
if serverName != "" { |
||||
|
urlPath = cli.BuildURLWithQuery([]string{"join", roomIDorAlias}, map[string]string{ |
||||
|
"server_name": serverName, |
||||
|
}) |
||||
|
} else { |
||||
|
urlPath = cli.BuildURL("join", roomIDorAlias) |
||||
|
} |
||||
|
|
||||
|
resBytes, err := cli.SendJSON("POST", urlPath, content) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
var joinRoomResponse RespJoinRoom |
||||
|
if err = json.Unmarshal(resBytes, &joinRoomResponse); err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return &joinRoomResponse, nil |
||||
|
} |
||||
|
|
||||
|
// SetDisplayName sets the user's profile display name. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-profile-userid-displayname
|
||||
|
func (cli *Client) SetDisplayName(displayName string) error { |
||||
|
urlPath := cli.BuildURL("profile", cli.UserID, "displayname") |
||||
|
s := struct { |
||||
|
DisplayName string `json:"displayname"` |
||||
|
}{displayName} |
||||
|
_, err := cli.SendJSON("PUT", urlPath, &s) |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// SendMessageEvent sends a message event into a room. See http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
|
||||
|
// contentJSON should be a pointer to something that can be encoded as JSON using json.Marshal.
|
||||
|
func (cli *Client) SendMessageEvent(roomID string, eventType string, contentJSON interface{}) (*RespSendEvent, error) { |
||||
|
txnID := "go" + strconv.FormatInt(time.Now().UnixNano(), 10) |
||||
|
urlPath := cli.BuildURL("rooms", roomID, "send", eventType, txnID) |
||||
|
resBytes, err := cli.SendJSON("PUT", urlPath, contentJSON) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
var sendEventResponse RespSendEvent |
||||
|
if err = json.Unmarshal(resBytes, &sendEventResponse); err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return &sendEventResponse, nil |
||||
|
} |
||||
|
|
||||
|
// SendText sends an m.room.message event into the given room with a msgtype of m.text
|
||||
|
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-text
|
||||
|
func (cli *Client) SendText(roomID, text string) (*RespSendEvent, error) { |
||||
|
return cli.SendMessageEvent(roomID, "m.room.message", |
||||
|
TextMessage{"m.text", text}) |
||||
|
} |
||||
|
|
||||
|
// UploadLink uploads an HTTP URL and then returns an MXC URI.
|
||||
|
func (cli *Client) UploadLink(link string) (*RespMediaUpload, error) { |
||||
|
res, err := cli.Client.Get(link) |
||||
|
if res != nil { |
||||
|
defer res.Body.Close() |
||||
|
} |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return cli.UploadToContentRepo(res.Body, res.Header.Get("Content-Type"), res.ContentLength) |
||||
|
} |
||||
|
|
||||
|
// UploadToContentRepo uploads the given bytes to the content repository and returns an MXC URI.
|
||||
|
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload
|
||||
|
func (cli *Client) UploadToContentRepo(content io.Reader, contentType string, contentLength int64) (*RespMediaUpload, error) { |
||||
|
req, err := http.NewRequest("POST", cli.BuildBaseURL("_matrix/media/r0/upload"), content) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
req.Header.Set("Content-Type", contentType) |
||||
|
req.ContentLength = contentLength |
||||
|
res, err := cli.Client.Do(req) |
||||
|
if res != nil { |
||||
|
defer res.Body.Close() |
||||
|
} |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
if res.StatusCode != 200 { |
||||
|
return nil, HTTPError{ |
||||
|
Message: "Upload request failed", |
||||
|
Code: res.StatusCode, |
||||
|
} |
||||
|
} |
||||
|
var m RespMediaUpload |
||||
|
if err := json.NewDecoder(res.Body).Decode(&m); err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
return &m, nil |
||||
|
} |
||||
|
|
||||
|
// NewClient creates a new Matrix Client ready for syncing
|
||||
|
func NewClient(homeserverURL, userID, accessToken string) (*Client, error) { |
||||
|
hsURL, err := url.Parse(homeserverURL) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
// By default, use an in-memory store which will never save filter ids / next batch tokens to disk.
|
||||
|
// The client will work with this storer: it just won't remember across restarts.
|
||||
|
// In practice, a database backend should be used.
|
||||
|
store := NewInMemoryStore() |
||||
|
cli := Client{ |
||||
|
AccessToken: accessToken, |
||||
|
HomeserverURL: hsURL, |
||||
|
UserID: userID, |
||||
|
Prefix: "/_matrix/client/r0", |
||||
|
Syncer: NewDefaultSyncer(userID, store), |
||||
|
Store: store, |
||||
|
} |
||||
|
// By default, use the default HTTP client.
|
||||
|
cli.Client = http.DefaultClient |
||||
|
|
||||
|
return &cli, nil |
||||
|
} |
@ -0,0 +1,28 @@ |
|||||
|
package gomatrix |
||||
|
|
||||
|
import "fmt" |
||||
|
|
||||
|
func ExampleClient_BuildURLWithQuery() { |
||||
|
cli, _ := NewClient("https://matrix.org", "@example:matrix.org", "abcdef123456") |
||||
|
out := cli.BuildURLWithQuery([]string{"sync"}, map[string]string{ |
||||
|
"filter_id": "5", |
||||
|
}) |
||||
|
fmt.Println(out) |
||||
|
// Output: https://matrix.org/_matrix/client/r0/sync?access_token=abcdef123456&filter_id=5
|
||||
|
} |
||||
|
|
||||
|
func ExampleClient_BuildURL() { |
||||
|
userID := "@example:matrix.org" |
||||
|
cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") |
||||
|
out := cli.BuildURL("user", userID, "filter") |
||||
|
fmt.Println(out) |
||||
|
// Output: https://matrix.org/_matrix/client/r0/user/@example:matrix.org/filter?access_token=abcdef123456
|
||||
|
} |
||||
|
|
||||
|
func ExampleClient_BuildBaseURL() { |
||||
|
userID := "@example:matrix.org" |
||||
|
cli, _ := NewClient("https://matrix.org", userID, "abcdef123456") |
||||
|
out := cli.BuildBaseURL("_matrix", "client", "r0", "directory", "room", "#matrix:matrix.org") |
||||
|
fmt.Println(out) |
||||
|
// Output: https://matrix.org/_matrix/client/r0/directory/room/%23matrix:matrix.org?access_token=abcdef123456
|
||||
|
} |
@ -0,0 +1,5 @@ |
|||||
|
#! /bin/bash |
||||
|
|
||||
|
DOT_GIT="$(dirname $0)/../.git" |
||||
|
|
||||
|
ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" |
@ -0,0 +1,9 @@ |
|||||
|
#! /bin/bash |
||||
|
|
||||
|
set -eu |
||||
|
|
||||
|
golint |
||||
|
go fmt |
||||
|
go tool vet --shadow . |
||||
|
gocyclo -over 12 . |
||||
|
go test -timeout 5s -test.v |
@ -0,0 +1,61 @@ |
|||||
|
package gomatrix |
||||
|
|
||||
|
// RespError is the standard JSON error response from Homeservers. It also implements the Golang "error" interface.
|
||||
|
// See http://matrix.org/docs/spec/client_server/r0.2.0.html#api-standards
|
||||
|
type RespError struct { |
||||
|
ErrCode string `json:"errcode"` |
||||
|
Err string `json:"error"` |
||||
|
} |
||||
|
|
||||
|
// Error returns the errcode and error message.
|
||||
|
func (e RespError) Error() string { |
||||
|
return e.ErrCode + ": " + e.Err |
||||
|
} |
||||
|
|
||||
|
// RespCreateFilter is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter
|
||||
|
type RespCreateFilter struct { |
||||
|
FilterID string `json:"filter_id"` |
||||
|
} |
||||
|
|
||||
|
// RespJoinRoom is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-rooms-roomid-join
|
||||
|
type RespJoinRoom struct { |
||||
|
RoomID string `json:"room_id"` |
||||
|
} |
||||
|
|
||||
|
// RespSendEvent is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
|
||||
|
type RespSendEvent struct { |
||||
|
EventID string `json:"event_id"` |
||||
|
} |
||||
|
|
||||
|
// RespMediaUpload is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload
|
||||
|
type RespMediaUpload struct { |
||||
|
ContentURI string `json:"content_uri"` |
||||
|
} |
||||
|
|
||||
|
// RespSync is the JSON response for http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||
|
type RespSync struct { |
||||
|
NextBatch string `json:"next_batch"` |
||||
|
AccountData struct { |
||||
|
Events []Event `json:"events"` |
||||
|
} `json:"account_data"` |
||||
|
Presence struct { |
||||
|
Events []Event `json:"events"` |
||||
|
} `json:"presence"` |
||||
|
Rooms struct { |
||||
|
Join map[string]struct { |
||||
|
State struct { |
||||
|
Events []Event `json:"events"` |
||||
|
} `json:"state"` |
||||
|
Timeline struct { |
||||
|
Events []Event `json:"events"` |
||||
|
Limited bool `json:"limited"` |
||||
|
PrevBatch string `json:"prev_batch"` |
||||
|
} `json:"timeline"` |
||||
|
} `json:"join"` |
||||
|
Invite map[string]struct { |
||||
|
State struct { |
||||
|
Events []Event |
||||
|
} `json:"invite_state"` |
||||
|
} `json:"invite"` |
||||
|
} `json:"rooms"` |
||||
|
} |
@ -0,0 +1,50 @@ |
|||||
|
package gomatrix |
||||
|
|
||||
|
// Room represents a single Matrix room.
|
||||
|
type Room struct { |
||||
|
ID string |
||||
|
State map[string]map[string]*Event |
||||
|
} |
||||
|
|
||||
|
// UpdateState updates the room's current state with the given Event. This will clobber events based
|
||||
|
// on the type/state_key combination.
|
||||
|
func (room Room) UpdateState(event *Event) { |
||||
|
_, exists := room.State[event.Type] |
||||
|
if !exists { |
||||
|
room.State[event.Type] = make(map[string]*Event) |
||||
|
} |
||||
|
room.State[event.Type][event.StateKey] = event |
||||
|
} |
||||
|
|
||||
|
// GetStateEvent returns the state event for the given type/state_key combo, or nil.
|
||||
|
func (room Room) GetStateEvent(eventType string, stateKey string) *Event { |
||||
|
stateEventMap, _ := room.State[eventType] |
||||
|
event, _ := stateEventMap[stateKey] |
||||
|
return event |
||||
|
} |
||||
|
|
||||
|
// GetMembershipState returns the membership state of the given user ID in this room. If there is
|
||||
|
// no entry for this member, 'leave' is returned for consistency with left users.
|
||||
|
func (room Room) GetMembershipState(userID string) string { |
||||
|
state := "leave" |
||||
|
event := room.GetStateEvent("m.room.member", userID) |
||||
|
if event != nil { |
||||
|
membershipState, found := event.Content["membership"] |
||||
|
if found { |
||||
|
mState, isString := membershipState.(string) |
||||
|
if isString { |
||||
|
state = mState |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return state |
||||
|
} |
||||
|
|
||||
|
// NewRoom creates a new Room with the given ID
|
||||
|
func NewRoom(roomID string) *Room { |
||||
|
// Init the State map and return a pointer to the Room
|
||||
|
return &Room{ |
||||
|
ID: roomID, |
||||
|
State: make(map[string]map[string]*Event), |
||||
|
} |
||||
|
} |
@ -0,0 +1,65 @@ |
|||||
|
package gomatrix |
||||
|
|
||||
|
// Storer is an interface which must be satisfied to store client data.
|
||||
|
//
|
||||
|
// You can either write a struct which persists this data to disk, or you can use the
|
||||
|
// provided "InMemoryStore" which just keeps data around in-memory which is lost on
|
||||
|
// restarts.
|
||||
|
type Storer interface { |
||||
|
SaveFilterID(userID, filterID string) |
||||
|
LoadFilterID(userID string) string |
||||
|
SaveNextBatch(userID, nextBatchToken string) |
||||
|
LoadNextBatch(userID string) string |
||||
|
SaveRoom(room *Room) |
||||
|
LoadRoom(roomID string) *Room |
||||
|
} |
||||
|
|
||||
|
// InMemoryStore implements the Storer interface.
|
||||
|
//
|
||||
|
// Everything is persisted in-memory as maps. It is not safe to load/save filter IDs
|
||||
|
// or next batch tokens on any goroutine other than the syncing goroutine: the one
|
||||
|
// which called Client.Sync().
|
||||
|
type InMemoryStore struct { |
||||
|
Filters map[string]string |
||||
|
NextBatch map[string]string |
||||
|
Rooms map[string]*Room |
||||
|
} |
||||
|
|
||||
|
// SaveFilterID to memory.
|
||||
|
func (s *InMemoryStore) SaveFilterID(userID, filterID string) { |
||||
|
s.Filters[userID] = filterID |
||||
|
} |
||||
|
|
||||
|
// LoadFilterID from memory.
|
||||
|
func (s *InMemoryStore) LoadFilterID(userID string) string { |
||||
|
return s.Filters[userID] |
||||
|
} |
||||
|
|
||||
|
// SaveNextBatch to memory.
|
||||
|
func (s *InMemoryStore) SaveNextBatch(userID, nextBatchToken string) { |
||||
|
s.NextBatch[userID] = nextBatchToken |
||||
|
} |
||||
|
|
||||
|
// LoadNextBatch from memory.
|
||||
|
func (s *InMemoryStore) LoadNextBatch(userID string) string { |
||||
|
return s.NextBatch[userID] |
||||
|
} |
||||
|
|
||||
|
// SaveRoom to memory.
|
||||
|
func (s *InMemoryStore) SaveRoom(room *Room) { |
||||
|
s.Rooms[room.ID] = room |
||||
|
} |
||||
|
|
||||
|
// LoadRoom from memory.
|
||||
|
func (s *InMemoryStore) LoadRoom(roomID string) *Room { |
||||
|
return s.Rooms[roomID] |
||||
|
} |
||||
|
|
||||
|
// NewInMemoryStore constructs a new InMemoryStore.
|
||||
|
func NewInMemoryStore() *InMemoryStore { |
||||
|
return &InMemoryStore{ |
||||
|
Filters: make(map[string]string), |
||||
|
NextBatch: make(map[string]string), |
||||
|
Rooms: make(map[string]*Room), |
||||
|
} |
||||
|
} |
@ -0,0 +1,154 @@ |
|||||
|
package gomatrix |
||||
|
|
||||
|
import ( |
||||
|
"encoding/json" |
||||
|
"fmt" |
||||
|
"runtime/debug" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
// Syncer represents an interface that must be satisfied in order to do /sync requests on a client.
|
||||
|
type Syncer interface { |
||||
|
// Process the /sync response. The since parameter is the since= value that was used to produce the response.
|
||||
|
// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
|
||||
|
// permanently.
|
||||
|
ProcessResponse(resp *RespSync, since string) error |
||||
|
// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
|
||||
|
OnFailedSync(res *RespSync, err error) (time.Duration, error) |
||||
|
// GetFilterJSON for the given user ID. NOT the filter ID.
|
||||
|
GetFilterJSON(userID string) json.RawMessage |
||||
|
} |
||||
|
|
||||
|
// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
|
||||
|
// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
|
||||
|
// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information.
|
||||
|
type DefaultSyncer struct { |
||||
|
UserID string |
||||
|
Store Storer |
||||
|
listeners map[string][]OnEventListener // event type to listeners array
|
||||
|
} |
||||
|
|
||||
|
// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
|
||||
|
type OnEventListener func(*Event) |
||||
|
|
||||
|
// NewDefaultSyncer returns an instantiated DefaultSyncer
|
||||
|
func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer { |
||||
|
return &DefaultSyncer{ |
||||
|
UserID: userID, |
||||
|
Store: store, |
||||
|
listeners: make(map[string][]OnEventListener), |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
|
||||
|
// unrepeating events. Returns a fatal error if a listener panics.
|
||||
|
func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) { |
||||
|
if !s.shouldProcessResponse(res, since) { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
defer func() { |
||||
|
if r := recover(); r != nil { |
||||
|
err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack()) |
||||
|
} |
||||
|
}() |
||||
|
|
||||
|
for roomID, roomData := range res.Rooms.Join { |
||||
|
room := s.getOrCreateRoom(roomID) |
||||
|
for _, event := range roomData.State.Events { |
||||
|
event.RoomID = roomID |
||||
|
room.UpdateState(&event) |
||||
|
s.notifyListeners(&event) |
||||
|
} |
||||
|
for _, event := range roomData.Timeline.Events { |
||||
|
event.RoomID = roomID |
||||
|
s.notifyListeners(&event) |
||||
|
} |
||||
|
} |
||||
|
for roomID, roomData := range res.Rooms.Invite { |
||||
|
room := s.getOrCreateRoom(roomID) |
||||
|
for _, event := range roomData.State.Events { |
||||
|
event.RoomID = roomID |
||||
|
room.UpdateState(&event) |
||||
|
s.notifyListeners(&event) |
||||
|
} |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
// OnEventType allows callers to be notified when there are new events for the given event type.
|
||||
|
// There are no duplicate checks.
|
||||
|
func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) { |
||||
|
_, exists := s.listeners[eventType] |
||||
|
if !exists { |
||||
|
s.listeners[eventType] = []OnEventListener{} |
||||
|
} |
||||
|
s.listeners[eventType] = append(s.listeners[eventType], callback) |
||||
|
} |
||||
|
|
||||
|
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
|
||||
|
// stuff that shouldn't be processed.
|
||||
|
func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool { |
||||
|
if since == "" { |
||||
|
return false |
||||
|
} |
||||
|
// This is a horrible hack because /sync will return the most recent messages for a room
|
||||
|
// as soon as you /join it. We do NOT want to process those events in that particular room
|
||||
|
// because they may have already been processed (if you toggle the bot in/out of the room).
|
||||
|
//
|
||||
|
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
|
||||
|
// exists and is "join" and then discard processing that room entirely if so.
|
||||
|
// TODO: We probably want to process messages from after the last join event in the timeline.
|
||||
|
for roomID, roomData := range resp.Rooms.Join { |
||||
|
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { |
||||
|
e := roomData.Timeline.Events[i] |
||||
|
if e.Type == "m.room.member" && e.StateKey == s.UserID { |
||||
|
m := e.Content["membership"] |
||||
|
mship, ok := m.(string) |
||||
|
if !ok { |
||||
|
continue |
||||
|
} |
||||
|
if mship == "join" { |
||||
|
_, ok := resp.Rooms.Join[roomID] |
||||
|
if !ok { |
||||
|
continue |
||||
|
} |
||||
|
delete(resp.Rooms.Join, roomID) // don't re-process messages
|
||||
|
delete(resp.Rooms.Invite, roomID) // don't re-process invites
|
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return true |
||||
|
} |
||||
|
|
||||
|
// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
|
||||
|
func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room { |
||||
|
room := s.Store.LoadRoom(roomID) |
||||
|
if room == nil { // create a new Room
|
||||
|
room = NewRoom(roomID) |
||||
|
s.Store.SaveRoom(room) |
||||
|
} |
||||
|
return room |
||||
|
} |
||||
|
|
||||
|
func (s *DefaultSyncer) notifyListeners(event *Event) { |
||||
|
listeners, exists := s.listeners[event.Type] |
||||
|
if !exists { |
||||
|
return |
||||
|
} |
||||
|
for _, fn := range listeners { |
||||
|
fn(event) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
|
||||
|
func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { |
||||
|
return 10 * time.Second, nil |
||||
|
} |
||||
|
|
||||
|
// GetFilterJSON returns a filter with a timeline limit of 50.
|
||||
|
func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage { |
||||
|
return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue