A simple web application that allows invitation based registration to a matrix instance
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.7 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. import logging
  2. import os
  3. import sqlite3
  4. import uuid
  5. from datetime import datetime
  6. from typing import Optional, Tuple
  7. from flask import g
  8. from models import RegistrationCode, RegisteredUser
  9. DATABASE = os.getenv("APP_DATA_DIRECTORY", ".") + "/data.db"
  10. REGISTRATION_CODE_INSERT_SQL = """INSERT INTO
  11. registration_codes(code, creationTime, expirationTime, usages, maxUsages)
  12. VALUES(?, ?, ?, ?, ?)"""
  13. REGISTRATION_CODE_UPDATE_SQL = """UPDATE registration_codes
  14. SET expirationTime = ?, usages = ?, maxUsages = ?
  15. WHERE code = ?"""
  16. REGISTERED_USER_INSERT_SQL = """INSERT INTO
  17. registered_users(registrationCode, username, registeredUserId, registeredTime)
  18. VALUES(?, ?, ?, ?)
  19. """
  20. log = logging.getLogger(__name__)
  21. def get_db():
  22. db = getattr(g, '_database', None)
  23. if db is None:
  24. log.info("Using database at: %s" % DATABASE)
  25. db = g._database = sqlite3.connect(DATABASE)
  26. return db
  27. def _query_db(query, args=(), one=False):
  28. cur = get_db().execute(query, args)
  29. rv = cur.fetchall()
  30. cur.close()
  31. return (rv[0] if rv else None) if one else rv
  32. def get_registration_codes() -> [RegistrationCode]:
  33. registration_codes = []
  34. for registration_code in _query_db("SELECT * FROM registration_codes"):
  35. registration_codes.append(RegistrationCode.from_db(registration_code))
  36. return registration_codes
  37. def get_registration_code(code: str) -> Optional[RegistrationCode]:
  38. registration_code: Optional[Tuple] = _query_db(
  39. "SELECT * FROM registration_codes WHERE code = ?", [code], one=True)
  40. if registration_code is not None:
  41. return RegistrationCode.from_db(registration_code)
  42. return None
  43. def add_registration_code(
  44. expiration_time: datetime = None,
  45. max_usages: int = 1) -> RegistrationCode:
  46. code = RegistrationCode(
  47. uuid.uuid4().__str__(), expiration_time=expiration_time, max_usages=max_usages)
  48. db = get_db()
  49. db.execute(
  50. REGISTRATION_CODE_INSERT_SQL,
  51. [code.code,
  52. code.creation_time,
  53. code.expiration_time,
  54. code.usages,
  55. code.max_usages]
  56. )
  57. db.commit()
  58. return get_registration_code(code.code)
  59. def update_registration_code(code: RegistrationCode) -> RegistrationCode:
  60. db = get_db()
  61. db.execute(REGISTRATION_CODE_UPDATE_SQL,
  62. [code.expiration_time, code.usages, code.max_usages, code.code]
  63. )
  64. db.commit()
  65. return get_registration_code(code.code)
  66. def expire_registration_code(code: str):
  67. db = get_db()
  68. db.execute(
  69. "UPDATE registration_codes SET expirationTime = ? WHERE code = ?",
  70. [datetime.now(), code]
  71. )
  72. db.commit()
  73. def delete_registration_code(code:str):
  74. db = get_db()
  75. db.execute(
  76. "DELETE FROM registration_codes WHERE code = ?",
  77. [code]
  78. )
  79. db.commit()
  80. def get_registered_users() -> [RegisteredUser]:
  81. registered_users = []
  82. for registered_user in _query_db("SELECT * FROM registered_users"):
  83. registered_users.append(RegisteredUser.from_db(registered_user))
  84. return registered_users
  85. def get_registered_user(username: str) -> Optional[RegisteredUser]:
  86. registered_user: Optional[Tuple] = _query_db(
  87. "SELECT * FROM registered_users WHERE username = ?", [username], one=True)
  88. if registered_user is not None:
  89. return RegisteredUser.from_db(registered_user)
  90. return None
  91. def add_registered_user(code: str, username: str,
  92. matrix_user_id: str) -> RegisteredUser:
  93. registration_code = get_registration_code(code)
  94. registration_code.usages += 1
  95. update_registration_code(registration_code)
  96. db = get_db()
  97. db.execute(REGISTERED_USER_INSERT_SQL,
  98. [code, username, matrix_user_id, datetime.now()])
  99. db.commit()
  100. return get_registered_user(username)