diff --git a/src/github.com/matrix-org/go-neb/clients/clients.go b/src/github.com/matrix-org/go-neb/clients/clients.go index 79fce5c..caab1f4 100644 --- a/src/github.com/matrix-org/go-neb/clients/clients.go +++ b/src/github.com/matrix-org/go-neb/clients/clients.go @@ -11,6 +11,31 @@ import ( "sync" ) +type nextBatchStore struct { + db *database.ServiceDB +} + +func (s nextBatchStore) Save(userID, nextBatch string) { + if err := s.db.UpdateNextBatch(userID, nextBatch); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + "next_batch": nextBatch, + }).Error("Failed to persist next_batch token") + } +} +func (s nextBatchStore) Load(userID string) string { + token, err := s.db.LoadNextBatch(userID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "user_id": userID, + }).Error("Failed to load next_batch token") + return "" + } + return token +} + // A Clients is a collection of clients used for bot services. type Clients struct { db *database.ServiceDB @@ -141,6 +166,71 @@ func (c *Clients) updateClientInDB(newConfig types.ClientConfig) (new clientEntr return } +func (c *Clients) onMessageEvent(client *matrix.Client, event *matrix.Event) { + services, err := c.db.LoadServicesForUser(client.UserID) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "service_user_id": client.UserID, + }).Warn("Error loading services") + } + var plugins []plugin.Plugin + for _, service := range services { + plugins = append(plugins, service.Plugin(client, event.RoomID)) + } + plugin.OnMessage(plugins, client, event) +} + +func (c *Clients) onBotOptionsEvent(client *matrix.Client, event *matrix.Event) { + // see if these options are for us. The state key is the user ID with a leading _ + // to get around restrictions in the HS about having user IDs as state keys. + targetUserID := strings.TrimPrefix(event.StateKey, "_") + if targetUserID != client.UserID { + return + } + // these options fully clobber what was there previously. + opts := types.BotOptions{ + UserID: client.UserID, + RoomID: event.RoomID, + SetByUserID: event.Sender, + Options: event.Content, + } + if _, err := c.db.StoreBotOptions(opts); err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "room_id": event.RoomID, + "bot_user_id": client.UserID, + "set_by_user_id": event.Sender, + }).Error("Failed to persist bot options") + } +} + +func (c *Clients) onRoomMemberEvent(client *matrix.Client, event *matrix.Event) { + if event.StateKey != client.UserID { + return // not our member event + } + m := event.Content["membership"] + membership, ok := m.(string) + if !ok { + return + } + if membership == "invite" { + logger := log.WithFields(log.Fields{ + "room_id": event.RoomID, + "service_user_id": client.UserID, + "inviter": event.Sender, + }) + logger.Print("Accepting invite from user") + + if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { + logger.WithError(err).Print("Failed to join room") + } else { + logger.Print("Joined room") + } + } +} + func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { homeserverURL, err := url.Parse(config.HomeserverURL) if err != nil { @@ -148,74 +238,22 @@ func (c *Clients) newClient(config types.ClientConfig) (*matrix.Client, error) { } client := matrix.NewClient(homeserverURL, config.AccessToken, config.UserID) + client.NextBatchStorer = nextBatchStore{c.db} // TODO: Check that the access token is valid for the userID by peforming // a request against the server. client.Worker.OnEventType("m.room.message", func(event *matrix.Event) { - services, err := c.db.LoadServicesForUser(client.UserID) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "service_user_id": client.UserID, - }).Warn("Error loading services") - } - var plugins []plugin.Plugin - for _, service := range services { - plugins = append(plugins, service.Plugin(client, event.RoomID)) - } - plugin.OnMessage(plugins, client, event) + c.onMessageEvent(client, event) }) client.Worker.OnEventType("m.room.bot.options", func(event *matrix.Event) { - // see if these options are for us. The state key is the user ID with a leading _ - // to get around restrictions in the HS about having user IDs as state keys. - targetUserID := strings.TrimPrefix(event.StateKey, "_") - if targetUserID != client.UserID { - return - } - // these options fully clobber what was there previously. - opts := types.BotOptions{ - UserID: client.UserID, - RoomID: event.RoomID, - SetByUserID: event.Sender, - Options: event.Content, - } - if _, err := c.db.StoreBotOptions(opts); err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "room_id": event.RoomID, - "bot_user_id": client.UserID, - "set_by_user_id": event.Sender, - }).Error("Failed to persist bot options") - } + c.onBotOptionsEvent(client, event) }) if config.AutoJoinRooms { client.Worker.OnEventType("m.room.member", func(event *matrix.Event) { - if event.StateKey != config.UserID { - return // not our member event - } - m := event.Content["membership"] - membership, ok := m.(string) - if !ok { - return - } - if membership == "invite" { - logger := log.WithFields(log.Fields{ - "room_id": event.RoomID, - "service_user_id": config.UserID, - "inviter": event.Sender, - }) - logger.Print("Accepting invite from user") - - if _, err := client.JoinRoom(event.RoomID, "", event.Sender); err != nil { - logger.WithError(err).Print("Failed to join room") - } else { - logger.Print("Joined room") - } - } + c.onRoomMemberEvent(client, event) }) } diff --git a/src/github.com/matrix-org/go-neb/database/db.go b/src/github.com/matrix-org/go-neb/database/db.go index e650471..79c83bc 100644 --- a/src/github.com/matrix-org/go-neb/database/db.go +++ b/src/github.com/matrix-org/go-neb/database/db.go @@ -77,6 +77,23 @@ func (d *ServiceDB) LoadMatrixClientConfig(userID string) (config types.ClientCo return } +// UpdateNextBatch updates the next_batch token for the given user. +func (d *ServiceDB) UpdateNextBatch(userID, nextBatch string) (err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + return updateNextBatchTxn(txn, userID, nextBatch) + }) + return +} + +// LoadNextBatch loads the next_batch token for the given user. +func (d *ServiceDB) LoadNextBatch(userID string) (nextBatch string, err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + nextBatch, err = selectNextBatchTxn(txn, userID) + return err + }) + return +} + // LoadService loads a service from the database. // Returns sql.ErrNoRows if the service isn't in the database. func (d *ServiceDB) LoadService(serviceID string) (service types.Service, err error) { diff --git a/src/github.com/matrix-org/go-neb/database/schema.go b/src/github.com/matrix-org/go-neb/database/schema.go index f17db68..e60e23a 100644 --- a/src/github.com/matrix-org/go-neb/database/schema.go +++ b/src/github.com/matrix-org/go-neb/database/schema.go @@ -129,6 +129,28 @@ func updateMatrixClientConfigTxn(txn *sql.Tx, now time.Time, config types.Client return err } +const updateNextBatchSQL = ` +UPDATE matrix_clients SET next_batch = $1 WHERE user_id = $2 +` + +func updateNextBatchTxn(txn *sql.Tx, userID, nextBatch string) error { + _, err := txn.Exec(updateNextBatchSQL, nextBatch, userID) + return err +} + +const selectNextBatchSQL = ` +SELECT next_batch FROM matrix_clients WHERE user_id = $1 +` + +func selectNextBatchTxn(txn *sql.Tx, userID string) (string, error) { + var nextBatch string + row := txn.QueryRow(selectNextBatchSQL, userID) + if err := row.Scan(&nextBatch); err != nil { + return "", err + } + return nextBatch, nil +} + const selectServiceSQL = ` SELECT service_type, service_user_id, service_json FROM services WHERE service_id = $1 diff --git a/src/github.com/matrix-org/go-neb/matrix/matrix.go b/src/github.com/matrix-org/go-neb/matrix/matrix.go index 14610ad..7d85fb9 100644 --- a/src/github.com/matrix-org/go-neb/matrix/matrix.go +++ b/src/github.com/matrix-org/go-neb/matrix/matrix.go @@ -28,21 +28,36 @@ import ( ) var ( - filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":0}}}`) + filterJSON = json.RawMessage(`{"room":{"timeline":{"limit":50}}}`) ) +// NextBatchStorer controls loading/saving of next_batch tokens for users +type NextBatchStorer interface { + // Save a next_batch token for a given user. Best effort. + Save(userID, nextBatch string) + // Load a next_batch token for a given user. Return an empty string if no token exists. + Load(userID string) string +} + +// noopNextBatchStore does not load or save next_batch tokens. +type noopNextBatchStore struct{} + +func (s noopNextBatchStore) Save(userID, nextBatch string) {} +func (s noopNextBatchStore) Load(userID string) string { return "" } + // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL - Prefix string - UserID string - AccessToken string - Rooms map[string]*Room - Worker *Worker - syncingMutex sync.Mutex - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - httpClient *http.Client - filterID string + HomeserverURL *url.URL + Prefix string + UserID string + AccessToken string + Rooms map[string]*Room + Worker *Worker + syncingMutex sync.Mutex + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + httpClient *http.Client + filterID string + NextBatchStorer NextBatchStorer } func (cli *Client) buildURL(urlPath ...string) string { @@ -191,7 +206,7 @@ func (cli *Client) Sync() { "user_id": cli.UserID, }) - // TODO: Store the filter ID and sync token in the database + // TODO: Store the filter ID in the database filterID, err := cli.createFilter() if err != nil { logger.WithError(err).Fatal("Failed to create filter") @@ -199,9 +214,9 @@ func (cli *Client) Sync() { } cli.filterID = filterID logger.WithField("filter", filterID).Print("Got filter ID") - nextToken := "" + nextToken := cli.NextBatchStorer.Load(cli.UserID) - logger.Print("Starting sync") + logger.WithField("next_batch", nextToken).Print("Starting sync") channel := make(chan syncHTTPResponse, 5) @@ -232,17 +247,25 @@ func (cli *Client) Sync() { // Check that the syncing state hasn't changed // Either because we've stopped syncing or another sync has been started. // We discard the response from our sync. - // TODO: Store the next_batch token so that the next sync can resume - // from where this sync left off. if cli.getSyncingID() != syncingID { logger.Print("Stopping sync") return } + isFirstSync := nextToken == "" + // Update client state nextToken = syncResponse.NextBatch logger.WithField("next_batch", nextToken).Print("Received sync response") - channel <- syncResponse + + // Save the token now *before* passing it through to the worker. This means it's possible + // to not process some events, but it means that we won't get constantly stuck processing + // a malformed/buggy event which keeps making us panic. + cli.NextBatchStorer.Save(cli.UserID, nextToken) + + if !isFirstSync { + channel <- syncResponse + } } } @@ -344,6 +367,7 @@ func (cli *Client) doSync(timeout int, since string) ([]byte, error) { log.WithFields(log.Fields{ "since": since, "timeout": timeout, + "user_id": cli.UserID, }).Print("Syncing") res, err := http.Get(urlPath) if err != nil { @@ -366,6 +390,10 @@ func NewClient(homeserverURL *url.URL, accessToken string, userID string) *Clien Prefix: "/_matrix/client/r0", } cli.Worker = newWorker(&cli) + // By default, use a no-op next_batch storer which will never save tokens and always + // "load" the empty string as a token. The client will work with this storer: it just won't + // remember the token across restarts. In practice, a database backend should be used. + cli.NextBatchStorer = noopNextBatchStore{} cli.Rooms = make(map[string]*Room) cli.httpClient = &http.Client{}