diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 268b603..44355f4 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -11,6 +11,31 @@ import ( "sync" ) +type nextBatchStore struct { + db *database.ServiceDB +} + +func (s nextBatchStore) Save(userID, nextBatch string) { + if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") + } +} +func (s nextBatchStore) Load(userID string) string { + token, err := s.db.LoadNextBatch(userID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + }).Error("Failed to load next_batch token") + return "" + } + return token +} + // A Clients is a collection of clients used for bot services. type Clients struct { db *database.ServiceDB @@ -213,15 +238,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { } client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) - - client.OnSaveNextBatch(func(nextBatch string) { - if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "next_batch": nextBatch, - }).Error("Failed to persist next_batch token") - } - }) + client.NextBatch = nextBatchStore{c.db} // TODO: Check that the access token is valid for the userID by peforming // a request against the server. diff --git a/src/github.com/matrix-org/go-neb/database/db.go b/src/github.com/matrix-org/go-neb/database/db.go index 1889e33..79c83bc 100644 --- a/src/github.com/matrix-org/go-neb/database/db.go +++ b/src/github.com/matrix-org/go-neb/database/db.go @@ -85,6 +85,15 @@ func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) { return } +// LoadNextBatch loads the next_batch token for the given user. +func (d *ServiceDB) LoadNextBatch(userID string) (nextBatch string, err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + nextBatch, err = selectNextBatchTxn(txn, userID) + return err + }) + return +} + // LoadService loads a service from the database. // Returns sql.ErrNoRows if the service isn't in the database. func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { diff --git a/src/github.com/matrix-org/go-neb/database/schema.go b/src/github.com/matrix-org/go-neb/database/schema.go index 22eb442..e60e23a 100644 --- a/src/github.com/matrix-org/go-neb/database/schema.go +++ b/src/github.com/matrix-org/go-neb/database/schema.go @@ -138,6 +138,19 @@ func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error { return err } +const selectNextBatchSQL = ` +SELECT next_batch FROM matrix_clients WHERE user_id = $1 +` + +func selectNextBatchTxn(txn *sql.Tx, userID string) (string, error) { + var nextBatch string + row := txn.QueryRow(selectNextBatchSQL, userID) + if err := row.Scan(&nextBatch); err != nil { + return "", err + } + return nextBatch, nil +} + const selectServiceSQL = ` SELECT service_type, service_user_id, service_json FROM services WHERE service_id = $1 diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 866f172..07175c0 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -31,19 +31,33 @@ var ( filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) ) +// NextBatchStorer controls loading/saving of next_batch tokens for users +type NextBatchStorer interface { + // Save a next_batch token for a given user. Best effort. + Save(userID, nextBatch string) + // Load a next_batch token for a given user. Return an empty string if no token exists. + Load(userID string) string +} + +// noopNextBatchStore does not load or save next_batch tokens. +type noopNextBatchStore struct{} + +func (s noopNextBatchStore) Save(userID, nextBatch string) {} +func (s noopNextBatchStore) Load(userID string) string { return "" } + // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string - nextBatchSaver func(string) + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + NextBatch NextBatchStorer } func (cli *Client) buildURL(urlPath ...string) string { @@ -200,9 +214,9 @@ func (cli *Client) Sync() { } cli.filterID = filterID logger.WithField("filter", filterID).Print("Got filter ID") - nextToken := "" + nextToken := cli.NextBatch.Load(cli.UserID) - logger.Print("Starting sync") + logger.WithField("next_batch", nextToken).Print("Starting sync") channel := make(chan syncHTTPResponse, 5) @@ -249,9 +263,7 @@ func (cli *Client) Sync() { // Save the token now *before* passing it through to the worker. This means it's possible // to not process some events, but it means that we won't get constantly stuck processing // a malformed/buggy event which keeps making us panic. - if cli.nextBatchSaver != nil { - cli.nextBatchSaver(nextToken) - } + cli.NextBatch.Save(cli.UserID, nextToken) if !isFirstSync { channel <- syncResponse @@ -259,11 +271,6 @@ func (cli *Client) Sync() { } } -// OnSaveNextBatch is a function which can save the next_batch token passed to it -func (cli *Client) OnSaveNextBatch(fn func(string)) { - cli.nextBatchSaver = fn -} - func (cli *Client) incrementSyncingID() uint32 { cli.syncingMutex.Lock() defer cli.syncingMutex.Unlock() @@ -385,6 +392,7 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien Prefix: "/_matrix/client/r0", } cli.Worker = newWorker(&cli) + cli.NextBatch = noopNextBatchStore{} cli.Rooms = make(map[string]*Room) cli.httpClient = &http.Client{}