# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This script has been simplified and adapted from # https://raw.githubusercontent.com/matrix-org/synapse/master/synapse/_scripts/register_new_matrix_user.py # # The purpose is to facilitate registration using a shared_secret over the open # registration that is supported in the matrix spec # import hashlib import hmac import logging from typing import Dict, Optional import requests as _requests log = logging.getLogger(__name__) def request_registration( user, password, server_location, shared_secret, admin=False, user_type=None, requests=_requests) -> Optional[Dict]: url = "%s/_matrix/client/r0/admin/register" % (server_location,) # Get the nonce r = requests.get(url, verify=False) if r.status_code is not 200: log.error("ERROR! Received %d %s" % (r.status_code, r.reason)) if 400 <= r.status_code < 500: try: log.error(r.json()["error"]) except Exception as e: log.error(e) return None nonce = r.json()["nonce"] mac = hmac.new(key=shared_secret.encode('utf8'), digestmod=hashlib.sha1) mac.update(nonce.encode('utf8')) mac.update(b"\x00") mac.update(user.encode('utf8')) mac.update(b"\x00") mac.update(password.encode('utf8')) mac.update(b"\x00") mac.update(b"admin" if admin else b"notadmin") if user_type: mac.update(b"\x00") mac.update(user_type.encode('utf8')) mac = mac.hexdigest() data = { "nonce": nonce, "username": user, "password": password, "mac": mac, "admin": admin, "user_type": user_type, } log.debug("Sending registration request...") r = requests.post(url, json=data, verify=False) if r.status_code is not 200: log.error("ERROR! Received %d %s" % (r.status_code, r.reason)) if 400 <= r.status_code < 500: try: log.error(r.json()["error"]) except Exception as e: log.error(e) return None return r.json() def register_new_user( user: str, password: str, server_location: str, shared_secret: str) -> Optional[Dict]: return request_registration( user, password, server_location, shared_secret, False, None)