|
@ -252,9 +252,7 @@ func (cli *Client) Sync() { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
isFirstSync := nextToken == "" |
|
|
|
|
|
|
|
|
|
|
|
// Update client state
|
|
|
|
|
|
|
|
|
processResponse := cli.shouldProcessResponse(nextToken, &syncResponse) |
|
|
nextToken = syncResponse.NextBatch |
|
|
nextToken = syncResponse.NextBatch |
|
|
logger.WithField("next_batch", nextToken).Print("Received sync response") |
|
|
logger.WithField("next_batch", nextToken).Print("Received sync response") |
|
|
|
|
|
|
|
@ -263,12 +261,55 @@ func (cli *Client) Sync() { |
|
|
// a malformed/buggy event which keeps making us panic.
|
|
|
// a malformed/buggy event which keeps making us panic.
|
|
|
cli.NextBatchStorer.Save(cli.UserID, nextToken) |
|
|
cli.NextBatchStorer.Save(cli.UserID, nextToken) |
|
|
|
|
|
|
|
|
if !isFirstSync { |
|
|
|
|
|
|
|
|
if processResponse { |
|
|
|
|
|
// Update client state
|
|
|
channel <- syncResponse |
|
|
channel <- syncResponse |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
|
|
|
|
|
|
// stuff that shouldn't be processed.
|
|
|
|
|
|
func (cli *Client) shouldProcessResponse(tokenOnSync string, syncResponse *syncHTTPResponse) bool { |
|
|
|
|
|
if tokenOnSync == "" { |
|
|
|
|
|
return false |
|
|
|
|
|
} |
|
|
|
|
|
// This is a horrible hack because /sync will return the most recent messages for a room
|
|
|
|
|
|
// as soon as you /join it. We do NOT want to process those events in that particular room
|
|
|
|
|
|
// because they may have already been processed (if you toggle the bot in/out of the room).
|
|
|
|
|
|
//
|
|
|
|
|
|
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
|
|
|
|
|
|
// exists and is "join" and then discard processing that room entirely if so.
|
|
|
|
|
|
// TODO: We probably want to process the !commands from after the last join event in the timeline.
|
|
|
|
|
|
for roomID, roomData := range syncResponse.Rooms.Join { |
|
|
|
|
|
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { |
|
|
|
|
|
e := roomData.Timeline.Events[i] |
|
|
|
|
|
if e.Type == "m.room.member" && e.StateKey == cli.UserID { |
|
|
|
|
|
m := e.Content["membership"] |
|
|
|
|
|
mship, ok := m.(string) |
|
|
|
|
|
if !ok { |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
if mship == "join" { |
|
|
|
|
|
log.WithFields(log.Fields{ |
|
|
|
|
|
"room_id": roomID, |
|
|
|
|
|
"user_id": cli.UserID, |
|
|
|
|
|
"start_token": tokenOnSync, |
|
|
|
|
|
}).Info("Discarding /sync events in room: just joined it.") |
|
|
|
|
|
_, ok := syncResponse.Rooms.Join[roomID] |
|
|
|
|
|
if !ok { |
|
|
|
|
|
panic("room " + roomID + " does not exist in Join?!") |
|
|
|
|
|
} |
|
|
|
|
|
delete(syncResponse.Rooms.Join, roomID) // don't re-process !commands
|
|
|
|
|
|
delete(syncResponse.Rooms.Invite, roomID) // don't re-process invites
|
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return true |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (cli *Client) incrementSyncingID() uint32 { |
|
|
func (cli *Client) incrementSyncingID() uint32 { |
|
|
cli.syncingMutex.Lock() |
|
|
cli.syncingMutex.Lock() |
|
|
defer cli.syncingMutex.Unlock() |
|
|
defer cli.syncingMutex.Unlock() |
|
|