From 5f9a065d8393f2cad4af50100bb508da92e9d458 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 6 Sep 2016 14:38:34 +0100 Subject: [PATCH] Persist next_batch tokens Indirect this through a `NextBatchStore` type to get around circular dependencies which are formed if you try to inline it in the `Sync()` while loop. Fork out event handlers to separate functions in `Clients` to make gocyclo happy. Add `UPDATE` query for `next_batch`. --- .../matrix-org/go-neb/clients/clients.go | 133 ++++++++++-------- .../matrix-org/go-neb/database/db.go | 8 ++ .../matrix-org/go-neb/database/schema.go | 9 ++ .../matrix-org/go-neb/matrix/matrix.go | 43 ++++-- 4 files changed, 125 insertions(+), 68 deletions(-) diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 79fce5c..268b603 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -141,6 +141,71 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr return } +func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) { + services, err := c.db.LoadServicesForUser(client.UserID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "service_user_id": client.UserID, + }).Warn("Error loading services") + } + var plugins []plugin.Plugin + for _, service := range services { + plugins = append(plugins, service.Plugin(client, event.RoomID)) + } + plugin.OnMessage(plugins, client, event) +} + +func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) { + // see if these options are for us. The state key is the user ID with a leading _ + // to get around restrictions in the HS about having user IDs as state keys. + targetUserID := strings.TrimPrefix(event.StateKey, "_") + if targetUserID != client.UserID { + return + } + // these options fully clobber what was there previously. + opts := types.BotOptions{ + UserID: client.UserID, + RoomID: event.RoomID, + SetByUserID: event.Sender, + Options: event.Content, + } + if _, err := c.db.StoreBotOptions(opts); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "bot_user_id": client.UserID, + "set_by_user_id": event.Sender, + }).Error("Failed to persist bot options") + } +} + +func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) { + if event.StateKey != client.UserID { + return // not our member event + } + m := event.Content["membership"] + membership, ok := m.(string) + if !ok { + return + } + if membership == "invite" { + logger := log.WithFields(log.Fields{ + "room_id": event.RoomID, + "service_user_id": client.UserID, + "inviter": event.Sender, + }) + logger.Print("Accepting invite from user") + + if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { + logger.WithError(err).Print("Failed to join room") + } else { + logger.Print("Joined room") + } + } +} + func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { homeserverURL, err := url.Parse(config.HomeserverURL) if err != nil { @@ -149,73 +214,29 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) + client.OnSaveNextBatch(func(nextBatch string) { + if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") + } + }) + // TODO: Check that the access token is valid for the userID by peforming // a request against the server. client.Worker.OnEventType("m.room.message", func(event *matrix.Event) { - services, err := c.db.LoadServicesForUser(client.UserID) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "service_user_id": client.UserID, - }).Warn("Error loading services") - } - var plugins []plugin.Plugin - for _, service := range services { - plugins = append(plugins, service.Plugin(client, event.RoomID)) - } - plugin.OnMessage(plugins, client, event) + c.onMessageEvent(client, event) }) client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) { - // see if these options are for us. The state key is the user ID with a leading _ - // to get around restrictions in the HS about having user IDs as state keys. - targetUserID := strings.TrimPrefix(event.StateKey, "_") - if targetUserID != client.UserID { - return - } - // these options fully clobber what was there previously. - opts := types.BotOptions{ - UserID: client.UserID, - RoomID: event.RoomID, - SetByUserID: event.Sender, - Options: event.Content, - } - if _, err := c.db.StoreBotOptions(opts); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "bot_user_id": client.UserID, - "set_by_user_id": event.Sender, - }).Error("Failed to persist bot options") - } + c.onBotOptionsEvent(client, event) }) if config.AutoJoinRooms { client.Worker.OnEventType("m.room.member", func(event *matrix.Event) { - if event.StateKey != config.UserID { - return // not our member event - } - m := event.Content["membership"] - membership, ok := m.(string) - if !ok { - return - } - if membership == "invite" { - logger := log.WithFields(log.Fields{ - "room_id": event.RoomID, - "service_user_id": config.UserID, - "inviter": event.Sender, - }) - logger.Print("Accepting invite from user") - - if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { - logger.WithError(err).Print("Failed to join room") - } else { - logger.Print("Joined room") - } - } + c.onRoomMemberEvent(client, event) }) } diff --git a/src/github.com/matrix-org/go-neb/database/db.go b/src/github.com/matrix-org/go-neb/database/db.go index e650471..1889e33 100644 --- a/src/github.com/matrix-org/go-neb/database/db.go +++ b/src/github.com/matrix-org/go-neb/database/db.go @@ -77,6 +77,14 @@ func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientCo return } +// UpdateNextBatch updates the next_batch token for the given user. +func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + return updateNextBatchTxn(txn, userID, nextBatch) + }) + return +} + // LoadService loads a service from the database. // Returns sql.ErrNoRows if the service isn't in the database. func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { diff --git a/src/github.com/matrix-org/go-neb/database/schema.go b/src/github.com/matrix-org/go-neb/database/schema.go index f17db68..22eb442 100644 --- a/src/github.com/matrix-org/go-neb/database/schema.go +++ b/src/github.com/matrix-org/go-neb/database/schema.go @@ -129,6 +129,15 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client return err } +const updateNextBatchSQL = ` +UPDATE matrix_clients SET next_batch = $1 WHERE user_id = $2 +` + +func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error { + _, err := txn.Exec(updateNextBatchSQL, nextBatch, userID) + return err +} + const selectServiceSQL = ` SELECT service_type, service_user_id, service_json FROM services WHERE service_id = $1 diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 14610ad..866f172 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -28,21 +28,22 @@ import ( ) var ( - filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":0}}}`) + filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) ) // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + nextBatchSaver func(string) } func (cli *Client) buildURL(urlPath ...string) string { @@ -239,13 +240,30 @@ func (cli *Client) Sync() { return } + isFirstSync := nextToken == "" + // Update client state nextToken = syncResponse.NextBatch logger.WithField("next_batch", nextToken).Print("Received sync response") - channel <- syncResponse + + // Save the token now *before* passing it through to the worker. This means it's possible + // to not process some events, but it means that we won't get constantly stuck processing + // a malformed/buggy event which keeps making us panic. + if cli.nextBatchSaver != nil { + cli.nextBatchSaver(nextToken) + } + + if !isFirstSync { + channel <- syncResponse + } } } +// OnSaveNextBatch is a function which can save the next_batch token passed to it +func (cli *Client) OnSaveNextBatch(fn func(string)) { + cli.nextBatchSaver = fn +} + func (cli *Client) incrementSyncingID() uint32 { cli.syncingMutex.Lock() defer cli.syncingMutex.Unlock() @@ -344,6 +362,7 @@ func (cli *Client) doSync(timeout int, since string) ([]byte, error) { log.WithFields(log.Fields{ "since": since, "timeout": timeout, + "user_id": cli.UserID, }).Print("Syncing") res, err := http.Get(urlPath) if err != nil {