You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
9.2 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. # Copyright (c) 2020 Tulir Asokan
  2. #
  3. # This Source Code Form is subject to the terms of the Mozilla Public
  4. # License, v. 2.0. If a copy of the MPL was not distributed with this
  5. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. from typing import Dict, Optional, TYPE_CHECKING
  7. from io import BytesIO
  8. import argparse
  9. import os.path
  10. import asyncio
  11. import json
  12. import re
  13. from aiohttp import ClientSession
  14. from yarl import URL
  15. from PIL import Image
  16. from telethon import TelegramClient
  17. from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest
  18. from telethon.tl.types.messages import AllStickers
  19. from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker
  20. from telethon.tl.types.messages import StickerSet as StickerSetFull
  21. parser = argparse.ArgumentParser()
  22. parser.add_argument("--list", help="List your saved sticker packs", action="store_true")
  23. parser.add_argument("--session", help="Telethon session file name", default="sticker-import")
  24. parser.add_argument("--config", help="Path to JSON file with Matrix homeserver and access_token",
  25. type=str, default="config.json")
  26. parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/",
  27. type=str)
  28. parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*")
  29. args = parser.parse_args()
  30. loop = asyncio.get_event_loop()
  31. async def whoami(url: URL, access_token: str) -> str:
  32. headers = {"Authorization": f"Bearer {access_token}"}
  33. async with ClientSession() as sess, sess.get(url, headers=headers) as resp:
  34. resp.raise_for_status()
  35. user_id = (await resp.json())["user_id"]
  36. print(f"Access token validated (user ID: {user_id})")
  37. return user_id
  38. try:
  39. with open(args.config) as config_file:
  40. config = json.load(config_file)
  41. homeserver_url = config["homeserver"]
  42. access_token = config["access_token"]
  43. except FileNotFoundError:
  44. print("Matrix config file not found. Please enter your homeserver and access token.")
  45. homeserver_url = input("Homeserver URL: ")
  46. access_token = input("Access token: ")
  47. whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami"
  48. user_id = loop.run_until_complete(whoami(whoami_url, access_token))
  49. with open(args.config, "w") as config_file:
  50. json.dump({
  51. "homeserver": homeserver_url,
  52. "user_id": user_id,
  53. "access_token": access_token
  54. }, config_file)
  55. print(f"Wrote config to {args.config}")
  56. upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload"
  57. async def upload(data: bytes, mimetype: str, filename: str) -> str:
  58. url = upload_url.with_query({"filename": filename})
  59. headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"}
  60. async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp:
  61. return (await resp.json())["content_uri"]
  62. if TYPE_CHECKING:
  63. from typing import TypedDict
  64. class MatrixMediaInfo(TypedDict):
  65. w: int
  66. h: int
  67. size: int
  68. mimetype: str
  69. thumbnail_url: Optional[str]
  70. thumbnail_info: Optional['MatrixMediaInfo']
  71. class MatrixStickerInfo(TypedDict, total=False):
  72. body: str
  73. url: str
  74. info: MatrixMediaInfo
  75. id: str
  76. def convert_image(data: bytes) -> (bytes, int, int):
  77. image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
  78. new_file = BytesIO()
  79. image.save(new_file, "png")
  80. w, h = image.size
  81. return new_file.getvalue(), w, h
  82. async def reupload_document(client: TelegramClient, document: Document) -> 'MatrixStickerInfo':
  83. print(f"Reuploading {document.id}", end="", flush=True)
  84. data = await client.download_media(document, file=bytes)
  85. print(".", end="", flush=True)
  86. data, width, height = convert_image(data)
  87. print(".", end="", flush=True)
  88. mxc = await upload(data, "image/png", f"{document.id}.png")
  89. print(".", flush=True)
  90. if width > 256 or height > 256:
  91. # Set the width and height to lower values so clients wouldn't show them as huge images
  92. if width > height:
  93. height = int(height / (width / 256))
  94. width = 256
  95. else:
  96. width = int(width / (height / 256))
  97. height = 256
  98. return {
  99. "body": "",
  100. "url": mxc,
  101. "info": {
  102. "w": width,
  103. "h": height,
  104. "size": len(data),
  105. "mimetype": "image/png",
  106. # Element iOS compatibility hack
  107. "thumbnail_url": mxc,
  108. "thumbnail_info": {
  109. "w": width,
  110. "h": height,
  111. "size": len(data),
  112. "mimetype": "image/png",
  113. },
  114. },
  115. }
  116. def add_to_index(name: str) -> None:
  117. index_path = os.path.join(args.output_dir, "index.json")
  118. try:
  119. with open(index_path) as index_file:
  120. index_data = json.load(index_file)
  121. except (FileNotFoundError, json.JSONDecodeError):
  122. index_data = {"packs": [], "homeserver_url": homeserver_url}
  123. if name not in index_data["packs"]:
  124. index_data["packs"].append(name)
  125. with open(index_path, "w") as index_file:
  126. json.dump(index_data, index_file, indent=" ")
  127. print(f"Added {name} to {index_path}")
  128. def add_meta(document: Document, info: 'MatrixStickerInfo', pack: StickerSetFull) -> None:
  129. for attr in document.attributes:
  130. if isinstance(attr, DocumentAttributeSticker):
  131. info["body"] = attr.alt
  132. info["id"] = str(document.id)
  133. info["net.maunium.telegram.sticker"] = {
  134. "pack": {
  135. "id": str(pack.set.id),
  136. "short_name": pack.set.short_name,
  137. },
  138. "id": str(document.id),
  139. "emoticons": [],
  140. }
  141. async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None:
  142. if pack.set.animated:
  143. print("Animated stickerpacks are currently not supported")
  144. return
  145. pack_path = os.path.join(args.output_dir, f"{pack.set.short_name}.json")
  146. try:
  147. os.mkdir(os.path.dirname(pack_path))
  148. except FileExistsError:
  149. pass
  150. print(f"Reuploading {pack.set.title} with {pack.set.count} stickers "
  151. f"and writing output to {pack_path}")
  152. already_uploaded = {}
  153. try:
  154. with open(pack_path) as pack_file:
  155. existing_pack = json.load(pack_file)
  156. already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker
  157. for sticker in existing_pack["stickers"]}
  158. print(f"Found {len(already_uploaded)} already reuploaded stickers")
  159. except FileNotFoundError:
  160. pass
  161. reuploaded_documents: Dict[int, 'MatrixStickerInfo'] = {}
  162. for document in pack.documents:
  163. try:
  164. reuploaded_documents[document.id] = already_uploaded[document.id]
  165. print(f"Skipped reuploading {document.id}")
  166. except KeyError:
  167. reuploaded_documents[document.id] = await reupload_document(client, document)
  168. # Always ensure the body and telegram metadata is correct
  169. add_meta(document, reuploaded_documents[document.id], pack)
  170. for sticker in pack.packs:
  171. if not sticker.emoticon:
  172. continue
  173. for document_id in sticker.documents:
  174. doc = reuploaded_documents[document_id]
  175. # If there was no sticker metadata, use the first emoji we find
  176. if doc["body"] == "":
  177. doc["body"] = sticker.emoticon
  178. doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon)
  179. with open(pack_path, "w") as pack_file:
  180. json.dump({
  181. "title": pack.set.title,
  182. "short_name": pack.set.short_name,
  183. "id": str(pack.set.id),
  184. "hash": str(pack.set.hash),
  185. "stickers": list(reuploaded_documents.values()),
  186. }, pack_file, ensure_ascii=False)
  187. print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
  188. add_to_index(os.path.basename(pack_path))
  189. pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?"
  190. r"([A-Za-z0-9-_]+)"
  191. r"(?:\.json)?$")
  192. async def main():
  193. client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7")
  194. await client.start()
  195. if args.list:
  196. stickers: AllStickers = await client(GetAllStickersRequest(hash=0))
  197. index = 1
  198. width = len(str(stickers.sets))
  199. print("Your saved sticker packs:")
  200. for saved_pack in stickers.sets:
  201. print(f"{index:>{width}}. {saved_pack.title} "
  202. f"(t.me/addstickers/{saved_pack.short_name})")
  203. elif args.pack[0]:
  204. input_packs = []
  205. for pack_url in args.pack[0]:
  206. match = pack_url_regex.match(pack_url)
  207. if not match:
  208. print(f"'{pack_url}' doesn't look like a sticker pack URL")
  209. return
  210. input_packs.append(InputStickerSetShortName(short_name=match.group(1)))
  211. for input_pack in input_packs:
  212. pack: StickerSetFull = await client(GetStickerSetRequest(input_pack))
  213. await reupload_pack(client, pack)
  214. else:
  215. parser.print_help()
  216. await client.disconnect()
  217. loop.run_until_complete(main())