From 5f9a065d8393f2cad4af50100bb508da92e9d458 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 6 Sep 2016 14:38:34 +0100 Subject: [PATCH 1/4] Persist next_batch tokens Indirect this through a `NextBatchStore` type to get around circular dependencies which are formed if you try to inline it in the `Sync()` while loop. Fork out event handlers to separate functions in `Clients` to make gocyclo happy. Add `UPDATE` query for `next_batch`. --- .../matrix-org/go-neb/clients/clients.go | 133 ++++++++++-------- .../matrix-org/go-neb/database/db.go | 8 ++ .../matrix-org/go-neb/database/schema.go | 9 ++ .../matrix-org/go-neb/matrix/matrix.go | 43 ++++-- 4 files changed, 125 insertions(+), 68 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 79fce5c..268b603 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -141,6 +141,71 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr return } +func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) { + services, err := c.db.LoadServicesForUser(client.UserID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "service_user_id": client.UserID, + }).Warn("Error loading services") + } + var plugins []plugin.Plugin + for _, service := range services { + plugins = append(plugins, service.Plugin(client, event.RoomID)) + } + plugin.OnMessage(plugins, client, event) +} + +func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) { + // see if these options are for us. The state key is the user ID with a leading _ + // to get around restrictions in the HS about having user IDs as state keys. + targetUserID := strings.TrimPrefix(event.StateKey, "_") + if targetUserID != client.UserID { + return + } + // these options fully clobber what was there previously. + opts := types.BotOptions{ + UserID: client.UserID, + RoomID: event.RoomID, + SetByUserID: event.Sender, + Options: event.Content, + } + if _, err := c.db.StoreBotOptions(opts); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "bot_user_id": client.UserID, + "set_by_user_id": event.Sender, + }).Error("Failed to persist bot options") + } +} + +func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) { + if event.StateKey != client.UserID { + return // not our member event + } + m := event.Content["membership"] + membership, ok := m.(string) + if !ok { + return + } + if membership == "invite" { + logger := log.WithFields(log.Fields{ + "room_id": event.RoomID, + "service_user_id": client.UserID, + "inviter": event.Sender, + }) + logger.Print("Accepting invite from user") + + if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { + logger.WithError(err).Print("Failed to join room") + } else { + logger.Print("Joined room") + } + } +} + func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { homeserverURL, err := url.Parse(config.HomeserverURL) if err != nil { @@ -149,73 +214,29 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) + client.OnSaveNextBatch(func(nextBatch string) { + if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") + } + }) + // TODO: Check that the access token is valid for the userID by peforming // a request against the server. client.Worker.OnEventType("m.room.message", func(event *matrix.Event) { - services, err := c.db.LoadServicesForUser(client.UserID) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "service_user_id": client.UserID, - }).Warn("Error loading services") - } - var plugins []plugin.Plugin - for _, service := range services { - plugins = append(plugins, service.Plugin(client, event.RoomID)) - } - plugin.OnMessage(plugins, client, event) + c.onMessageEvent(client, event) }) client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) { - // see if these options are for us. The state key is the user ID with a leading _ - // to get around restrictions in the HS about having user IDs as state keys. - targetUserID := strings.TrimPrefix(event.StateKey, "_") - if targetUserID != client.UserID { - return - } - // these options fully clobber what was there previously. - opts := types.BotOptions{ - UserID: client.UserID, - RoomID: event.RoomID, - SetByUserID: event.Sender, - Options: event.Content, - } - if _, err := c.db.StoreBotOptions(opts); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "bot_user_id": client.UserID, - "set_by_user_id": event.Sender, - }).Error("Failed to persist bot options") - } + c.onBotOptionsEvent(client, event) }) if config.AutoJoinRooms { client.Worker.OnEventType("m.room.member", func(event *matrix.Event) { - if event.StateKey != config.UserID { - return // not our member event - } - m := event.Content["membership"] - membership, ok := m.(string) - if !ok { - return - } - if membership == "invite" { - logger := log.WithFields(log.Fields{ - "room_id": event.RoomID, - "service_user_id": config.UserID, - "inviter": event.Sender, - }) - logger.Print("Accepting invite from user") - - if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { - logger.WithError(err).Print("Failed to join room") - } else { - logger.Print("Joined room") - } - } + c.onRoomMemberEvent(client, event) }) } diff --git a/src/github.com/matrix-org/go-neb/database/db.go b/src/github.com/matrix-org/go-neb/database/db.go index e650471..1889e33 100644 --- a/src/github.com/matrix-org/go-neb/database/db.go +++ b/src/github.com/matrix-org/go-neb/database/db.go @@ -77,6 +77,14 @@ func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientCo return } +// UpdateNextBatch updates the next_batch token for the given user. +func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + return updateNextBatchTxn(txn, userID, nextBatch) + }) + return +} + // LoadService loads a service from the database. // Returns sql.ErrNoRows if the service isn't in the database. func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { diff --git a/src/github.com/matrix-org/go-neb/database/schema.go b/src/github.com/matrix-org/go-neb/database/schema.go index f17db68..22eb442 100644 --- a/src/github.com/matrix-org/go-neb/database/schema.go +++ b/src/github.com/matrix-org/go-neb/database/schema.go @@ -129,6 +129,15 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client return err } +const updateNextBatchSQL = ` +UPDATE matrix_clients SET next_batch = $1 WHERE user_id = $2 +` + +func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error { + _, err := txn.Exec(updateNextBatchSQL, nextBatch, userID) + return err +} + const selectServiceSQL = ` SELECT service_type, service_user_id, service_json FROM services WHERE service_id = $1 diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 14610ad..866f172 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -28,21 +28,22 @@ import ( ) var ( - filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":0}}}`) + filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) ) // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + nextBatchSaver func(string) } func (cli *Client) buildURL(urlPath ...string) string { @@ -239,13 +240,30 @@ func (cli *Client) Sync() { return } + isFirstSync := nextToken == "" + // Update client state nextToken = syncResponse.NextBatch logger.WithField("next_batch", nextToken).Print("Received sync response") - channel <- syncResponse + + // Save the token now *before* passing it through to the worker. This means it's possible + // to not process some events, but it means that we won't get constantly stuck processing + // a malformed/buggy event which keeps making us panic. + if cli.nextBatchSaver != nil { + cli.nextBatchSaver(nextToken) + } + + if !isFirstSync { + channel <- syncResponse + } } } +// OnSaveNextBatch is a function which can save the next_batch token passed to it +func (cli *Client) OnSaveNextBatch(fn func(string)) { + cli.nextBatchSaver = fn +} + func (cli *Client) incrementSyncingID() uint32 { cli.syncingMutex.Lock() defer cli.syncingMutex.Unlock() @@ -344,6 +362,7 @@ func (cli *Client) doSync(timeout int, since string) ([]byte, error) { log.WithFields(log.Fields{ "since": since, "timeout": timeout, + "user_id": cli.UserID, }).Print("Syncing") res, err := http.Get(urlPath) if err != nil { From b59d43b810304bb7b81181ca8024752d304a057e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 6 Sep 2016 15:10:56 +0100 Subject: [PATCH 2/4] Add a `NextBatchStorer` interface to load/save next_batch tokens Gets around cyclic dependencies. --- .../matrix-org/go-neb/clients/clients.go | 35 +++++++++---- .../matrix-org/go-neb/database/db.go | 9 ++++ .../matrix-org/go-neb/database/schema.go | 13 +++++ .../matrix-org/go-neb/matrix/matrix.go | 50 +++++++++++-------- 4 files changed, 77 insertions(+), 30 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 268b603..44355f4 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -11,6 +11,31 @@ import ( "sync" ) +type nextBatchStore struct { + db *database.ServiceDB +} + +func (s nextBatchStore) Save(userID, nextBatch string) { + if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") + } +} +func (s nextBatchStore) Load(userID string) string { + token, err := s.db.LoadNextBatch(userID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + }).Error("Failed to load next_batch token") + return "" + } + return token +} + // A Clients is a collection of clients used for bot services. type Clients struct { db *database.ServiceDB @@ -213,15 +238,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { } client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) - - client.OnSaveNextBatch(func(nextBatch string) { - if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "next_batch": nextBatch, - }).Error("Failed to persist next_batch token") - } - }) + client.NextBatch = nextBatchStore{c.db} // TODO: Check that the access token is valid for the userID by peforming // a request against the server. diff --git a/src/github.com/matrix-org/go-neb/database/db.go b/src/github.com/matrix-org/go-neb/database/db.go index 1889e33..79c83bc 100644 --- a/src/github.com/matrix-org/go-neb/database/db.go +++ b/src/github.com/matrix-org/go-neb/database/db.go @@ -85,6 +85,15 @@ func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) { return } +// LoadNextBatch loads the next_batch token for the given user. +func (d *ServiceDB) LoadNextBatch(userID string) (nextBatch string, err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + nextBatch, err = selectNextBatchTxn(txn, userID) + return err + }) + return +} + // LoadService loads a service from the database. // Returns sql.ErrNoRows if the service isn't in the database. func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { diff --git a/src/github.com/matrix-org/go-neb/database/schema.go b/src/github.com/matrix-org/go-neb/database/schema.go index 22eb442..e60e23a 100644 --- a/src/github.com/matrix-org/go-neb/database/schema.go +++ b/src/github.com/matrix-org/go-neb/database/schema.go @@ -138,6 +138,19 @@ func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error { return err } +const selectNextBatchSQL = ` +SELECT next_batch FROM matrix_clients WHERE user_id = $1 +` + +func selectNextBatchTxn(txn *sql.Tx, userID string) (string, error) { + var nextBatch string + row := txn.QueryRow(selectNextBatchSQL, userID) + if err := row.Scan(&nextBatch); err != nil { + return "", err + } + return nextBatch, nil +} + const selectServiceSQL = ` SELECT service_type, service_user_id, service_json FROM services WHERE service_id = $1 diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 866f172..07175c0 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -31,19 +31,33 @@ var ( filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) ) +// NextBatchStorer controls loading/saving of next_batch tokens for users +type NextBatchStorer interface { + // Save a next_batch token for a given user. Best effort. + Save(userID, nextBatch string) + // Load a next_batch token for a given user. Return an empty string if no token exists. + Load(userID string) string +} + +// noopNextBatchStore does not load or save next_batch tokens. +type noopNextBatchStore struct{} + +func (s noopNextBatchStore) Save(userID, nextBatch string) {} +func (s noopNextBatchStore) Load(userID string) string { return "" } + // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string - nextBatchSaver func(string) + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + NextBatch NextBatchStorer } func (cli *Client) buildURL(urlPath ...string) string { @@ -200,9 +214,9 @@ func (cli *Client) Sync() { } cli.filterID = filterID logger.WithField("filter", filterID).Print("Got filter ID") - nextToken := "" + nextToken := cli.NextBatch.Load(cli.UserID) - logger.Print("Starting sync") + logger.WithField("next_batch", nextToken).Print("Starting sync") channel := make(chan syncHTTPResponse, 5) @@ -249,9 +263,7 @@ func (cli *Client) Sync() { // Save the token now *before* passing it through to the worker. This means it's possible // to not process some events, but it means that we won't get constantly stuck processing // a malformed/buggy event which keeps making us panic. - if cli.nextBatchSaver != nil { - cli.nextBatchSaver(nextToken) - } + cli.NextBatch.Save(cli.UserID, nextToken) if !isFirstSync { channel <- syncResponse @@ -259,11 +271,6 @@ func (cli *Client) Sync() { } } -// OnSaveNextBatch is a function which can save the next_batch token passed to it -func (cli *Client) OnSaveNextBatch(fn func(string)) { - cli.nextBatchSaver = fn -} - func (cli *Client) incrementSyncingID() uint32 { cli.syncingMutex.Lock() defer cli.syncingMutex.Unlock() @@ -385,6 +392,7 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien Prefix: "/_matrix/client/r0", } cli.Worker = newWorker(&cli) + cli.NextBatch = noopNextBatchStore{} cli.Rooms = make(map[string]*Room) cli.httpClient = &http.Client{} From 41f5cdabaf7c5097d8537046344229932faaad6d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 6 Sep 2016 15:12:48 +0100 Subject: [PATCH 3/4] Remove TODOs which are now done --- src/github.com/matrix-org/go-neb/matrix/matrix.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 07175c0..b52a79a 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -206,7 +206,7 @@ func (cli *Client) Sync() { "user_id": cli.UserID, }) - // TODO: Store the filter ID and sync token in the database + // TODO: Store the filter ID in the database filterID, err := cli.createFilter() if err != nil { logger.WithError(err).Fatal("Failed to create filter") @@ -247,8 +247,6 @@ func (cli *Client) Sync() { // Check that the syncing state hasn't changed // Either because we've stopped syncing or another sync has been started. // We discard the response from our sync. - // TODO: Store the next_batch token so that the next sync can resume - // from where this sync left off. if cli.getSyncingID() != syncingID { logger.Print("Stopping sync") return From 178ae79e29c71f98580e08baf0392c9ec304127e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 6 Sep 2016 15:51:54 +0100 Subject: [PATCH 4/4] Review comments --- .../matrix-org/go-neb/clients/clients.go | 2 +- .../matrix-org/go-neb/matrix/matrix.go | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 44355f4..caab1f4 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -238,7 +238,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { } client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) - client.NextBatch = nextBatchStore{c.db} + client.NextBatchStorer = nextBatchStore{c.db} // TODO: Check that the access token is valid for the userID by peforming // a request against the server. diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index b52a79a..7d85fb9 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -47,17 +47,17 @@ func (s noopNextBatchStore) Load(userID string) string { return "" } // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string - NextBatch NextBatchStorer + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + NextBatchStorer NextBatchStorer } func (cli *Client) buildURL(urlPath ...string) string { @@ -214,7 +214,7 @@ func (cli *Client) Sync() { } cli.filterID = filterID logger.WithField("filter", filterID).Print("Got filter ID") - nextToken := cli.NextBatch.Load(cli.UserID) + nextToken := cli.NextBatchStorer.Load(cli.UserID) logger.WithField("next_batch", nextToken).Print("Starting sync") @@ -261,7 +261,7 @@ func (cli *Client) Sync() { // Save the token now *before* passing it through to the worker. This means it's possible // to not process some events, but it means that we won't get constantly stuck processing // a malformed/buggy event which keeps making us panic. - cli.NextBatch.Save(cli.UserID, nextToken) + cli.NextBatchStorer.Save(cli.UserID, nextToken) if !isFirstSync { channel <- syncResponse @@ -390,7 +390,10 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien Prefix: "/_matrix/client/r0", } cli.Worker = newWorker(&cli) - cli.NextBatch = noopNextBatchStore{} + // By default, use a no-op next_batch storer which will never save tokens and always + // "load" the empty string as a token. The client will work with this storer: it just won't + // remember the token across restarts. In practice, a database backend should be used. + cli.NextBatchStorer = noopNextBatchStore{} cli.Rooms = make(map[string]*Room) cli.httpClient = &http.Client{}