Browse Source

Add a `NextBatchStorer` interface to load/save next_batch tokens

Gets around cyclic dependencies.
pull/60/head
Kegan Dougal 8 years ago
parent
commit
b59d43b810
  1. 35
      src/github.com/matrix-org/go-neb/clients/clients.go
  2. 9
      src/github.com/matrix-org/go-neb/database/db.go
  3. 13
      src/github.com/matrix-org/go-neb/database/schema.go
  4. 50
      src/github.com/matrix-org/go-neb/matrix/matrix.go

35
src/github.com/matrix-org/go-neb/clients/clients.go

@ -11,6 +11,31 @@ import (
"sync" "sync"
) )
type nextBatchStore struct {
db *database.ServiceDB
}
func (s nextBatchStore) Save(userID, nextBatch string) {
if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"user_id": userID,
"next_batch": nextBatch,
}).Error("Failed to persist next_batch token")
}
}
func (s nextBatchStore) Load(userID string) string {
token, err := s.db.LoadNextBatch(userID)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"user_id": userID,
}).Error("Failed to load next_batch token")
return ""
}
return token
}
// A Clients is a collection of clients used for bot services. // A Clients is a collection of clients used for bot services.
type Clients struct { type Clients struct {
db *database.ServiceDB db *database.ServiceDB
@ -213,15 +238,7 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) {
} }
client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID)
client.OnSaveNextBatch(func(nextBatch string) {
if err := c.db.UpdateNextBatch(client.UserID, nextBatch); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"next_batch": nextBatch,
}).Error("Failed to persist next_batch token")
}
})
client.NextBatch = nextBatchStore{c.db}
// TODO: Check that the access token is valid for the userID by peforming // TODO: Check that the access token is valid for the userID by peforming
// a request against the server. // a request against the server.

9
src/github.com/matrix-org/go-neb/database/db.go

@ -85,6 +85,15 @@ func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) {
return return
} }
// LoadNextBatch loads the next_batch token for the given user.
func (d *ServiceDB) LoadNextBatch(userID string) (nextBatch string, err error) {
err = runTransaction(d.db, func(txn *sql.Tx) error {
nextBatch, err = selectNextBatchTxn(txn, userID)
return err
})
return
}
// LoadService loads a service from the database. // LoadService loads a service from the database.
// Returns sql.ErrNoRows if the service isn't in the database. // Returns sql.ErrNoRows if the service isn't in the database.
func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) {

13
src/github.com/matrix-org/go-neb/database/schema.go

@ -138,6 +138,19 @@ func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error {
return err return err
} }
const selectNextBatchSQL = `
SELECT next_batch FROM matrix_clients WHERE user_id = $1
`
func selectNextBatchTxn(txn *sql.Tx, userID string) (string, error) {
var nextBatch string
row := txn.QueryRow(selectNextBatchSQL, userID)
if err := row.Scan(&nextBatch); err != nil {
return "", err
}
return nextBatch, nil
}
const selectServiceSQL = ` const selectServiceSQL = `
SELECT service_type, service_user_id, service_json FROM services SELECT service_type, service_user_id, service_json FROM services
WHERE service_id = $1 WHERE service_id = $1

50
src/github.com/matrix-org/go-neb/matrix/matrix.go

@ -31,19 +31,33 @@ var (
filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
) )
// NextBatchStorer controls loading/saving of next_batch tokens for users
type NextBatchStorer interface {
// Save a next_batch token for a given user. Best effort.
Save(userID, nextBatch string)
// Load a next_batch token for a given user. Return an empty string if no token exists.
Load(userID string) string
}
// noopNextBatchStore does not load or save next_batch tokens.
type noopNextBatchStore struct{}
func (s noopNextBatchStore) Save(userID, nextBatch string) {}
func (s noopNextBatchStore) Load(userID string) string { return "" }
// Client represents a Matrix client. // Client represents a Matrix client.
type Client struct { type Client struct {
HomeserverURL *url.URL
Prefix string
UserID string
AccessToken string
Rooms map[string]*Room
Worker *Worker
syncingMutex sync.Mutex
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
httpClient *http.Client
filterID string
nextBatchSaver func(string)
HomeserverURL *url.URL
Prefix string
UserID string
AccessToken string
Rooms map[string]*Room
Worker *Worker
syncingMutex sync.Mutex
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
httpClient *http.Client
filterID string
NextBatch NextBatchStorer
} }
func (cli *Client) buildURL(urlPath ...string) string { func (cli *Client) buildURL(urlPath ...string) string {
@ -200,9 +214,9 @@ func (cli *Client) Sync() {
} }
cli.filterID = filterID cli.filterID = filterID
logger.WithField("filter", filterID).Print("Got filter ID") logger.WithField("filter", filterID).Print("Got filter ID")
nextToken := ""
nextToken := cli.NextBatch.Load(cli.UserID)
logger.Print("Starting sync")
logger.WithField("next_batch", nextToken).Print("Starting sync")
channel := make(chan syncHTTPResponse, 5) channel := make(chan syncHTTPResponse, 5)
@ -249,9 +263,7 @@ func (cli *Client) Sync() {
// Save the token now *before* passing it through to the worker. This means it's possible // Save the token now *before* passing it through to the worker. This means it's possible
// to not process some events, but it means that we won't get constantly stuck processing // to not process some events, but it means that we won't get constantly stuck processing
// a malformed/buggy event which keeps making us panic. // a malformed/buggy event which keeps making us panic.
if cli.nextBatchSaver != nil {
cli.nextBatchSaver(nextToken)
}
cli.NextBatch.Save(cli.UserID, nextToken)
if !isFirstSync { if !isFirstSync {
channel <- syncResponse channel <- syncResponse
@ -259,11 +271,6 @@ func (cli *Client) Sync() {
} }
} }
// OnSaveNextBatch is a function which can save the next_batch token passed to it
func (cli *Client) OnSaveNextBatch(fn func(string)) {
cli.nextBatchSaver = fn
}
func (cli *Client) incrementSyncingID() uint32 { func (cli *Client) incrementSyncingID() uint32 {
cli.syncingMutex.Lock() cli.syncingMutex.Lock()
defer cli.syncingMutex.Unlock() defer cli.syncingMutex.Unlock()
@ -385,6 +392,7 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien
Prefix: "/_matrix/client/r0", Prefix: "/_matrix/client/r0",
} }
cli.Worker = newWorker(&cli) cli.Worker = newWorker(&cli)
cli.NextBatch = noopNextBatchStore{}
cli.Rooms = make(map[string]*Room) cli.Rooms = make(map[string]*Room)
cli.httpClient = &http.Client{} cli.httpClient = &http.Client{}

Loading…
Cancel
Save