A simple web application that allows invitation based registration to a matrix instance
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.7 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. import logging
  2. import os
  3. import sqlite3
  4. import uuid
  5. from datetime import datetime
  6. from typing import Optional, Tuple
  7. from flask import g
  8. from models import RegistrationCode, RegisteredUser
  9. DATABASE = os.getenv("APP_DATA_DIRECTORY", ".") + "/data.db"
  10. REGISTRATION_CODE_INSERT_SQL = """INSERT INTO
  11. registration_codes(code, creationTime, expirationTime, usages, maxUsages)
  12. VALUES(?, ?, ?, ?, ?)"""
  13. REGISTRATION_CODE_UPDATE_SQL = """UPDATE registration_codes
  14. SET expirationTime = ?, usages = ?, maxUsages = ?
  15. WHERE code = ?"""
  16. REGISTERED_USER_INSERT_SQL = """INSERT INTO
  17. registered_users(registrationCode, username, registeredUserId, registeredTime)
  18. VALUES(?, ?, ?, ?)
  19. """
  20. log = logging.getLogger(__name__)
  21. def get_db():
  22. db = getattr(g, '_database', None)
  23. if db is None:
  24. db = g._database = sqlite3.connect(DATABASE)
  25. return db
  26. def _query_db(query, args=(), one=False):
  27. cur = get_db().execute(query, args)
  28. rv = cur.fetchall()
  29. cur.close()
  30. return (rv[0] if rv else None) if one else rv
  31. def get_registration_codes() -> [RegistrationCode]:
  32. registration_codes = []
  33. for registration_code in _query_db("SELECT * FROM registration_codes"):
  34. registration_codes.append(RegistrationCode.from_db(registration_code))
  35. return registration_codes
  36. def get_registration_code(code: str) -> Optional[RegistrationCode]:
  37. registration_code: Optional[Tuple] = _query_db(
  38. "SELECT * FROM registration_codes WHERE code = ?", [code], one=True)
  39. if registration_code is not None:
  40. return RegistrationCode.from_db(registration_code)
  41. return None
  42. def add_registration_code(
  43. expiration_time: datetime = None,
  44. max_usages: int = 1) -> RegistrationCode:
  45. code = RegistrationCode(
  46. uuid.uuid4().__str__(), expiration_time=expiration_time, max_usages=max_usages)
  47. db = get_db()
  48. db.execute(
  49. REGISTRATION_CODE_INSERT_SQL,
  50. [code.code,
  51. code.creation_time,
  52. code.expiration_time,
  53. code.usages,
  54. code.max_usages]
  55. )
  56. db.commit()
  57. return get_registration_code(code.code)
  58. def update_registration_code(code: RegistrationCode) -> RegistrationCode:
  59. db = get_db()
  60. db.execute(REGISTRATION_CODE_UPDATE_SQL,
  61. [code.expiration_time, code.usages, code.max_usages, code.code]
  62. )
  63. db.commit()
  64. return get_registration_code(code.code)
  65. def expire_registration_code(code: str):
  66. db = get_db()
  67. db.execute(
  68. "UPDATE registration_codes SET expirationTime = ? WHERE code = ?",
  69. [datetime.now(), code]
  70. )
  71. db.commit()
  72. def delete_registration_code(code:str):
  73. db = get_db()
  74. db.execute(
  75. "DELETE FROM registration_codes WHERE code = ?",
  76. [code]
  77. )
  78. db.commit()
  79. def get_registered_users() -> [RegisteredUser]:
  80. registered_users = []
  81. for registered_user in _query_db("SELECT * FROM registered_users"):
  82. registered_users.append(RegisteredUser.from_db(registered_user))
  83. return registered_users
  84. def get_registered_user(username: str) -> Optional[RegisteredUser]:
  85. registered_user: Optional[Tuple] = _query_db(
  86. "SELECT * FROM registered_users WHERE username = ?", [username], one=True)
  87. if registered_user is not None:
  88. return RegisteredUser.from_db(registered_user)
  89. return None
  90. def add_registered_user(code: str, username: str,
  91. matrix_user_id: str) -> RegisteredUser:
  92. registration_code = get_registration_code(code)
  93. registration_code.usages += 1
  94. update_registration_code(registration_code)
  95. db = get_db()
  96. db.execute(REGISTERED_USER_INSERT_SQL,
  97. [code, username, matrix_user_id, datetime.now()])
  98. db.commit()
  99. return get_registered_user(username)