You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
6.0 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. # Copyright (c) 2020 Tulir Asokan
  2. #
  3. # This Source Code Form is subject to the terms of the Mozilla Public
  4. # License, v. 2.0. If a copy of the MPL was not distributed with this
  5. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. from typing import Dict
  7. import argparse
  8. import asyncio
  9. import os.path
  10. import json
  11. import re
  12. from telethon import TelegramClient
  13. from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest
  14. from telethon.tl.types.messages import AllStickers
  15. from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker
  16. from telethon.tl.types.messages import StickerSet as StickerSetFull
  17. from .lib import matrix, util
  18. async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo:
  19. print(f"Reuploading {document.id}", end="", flush=True)
  20. data = await client.download_media(document, file=bytes)
  21. print(".", end="", flush=True)
  22. data, width, height = util.convert_image(data)
  23. print(".", end="", flush=True)
  24. mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
  25. print(".", flush=True)
  26. return util.make_sticker(mxc, width, height, len(data))
  27. def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
  28. for attr in document.attributes:
  29. if isinstance(attr, DocumentAttributeSticker):
  30. info["body"] = attr.alt
  31. info["id"] = f"tg-{document.id}"
  32. info["net.maunium.telegram.sticker"] = {
  33. "pack": {
  34. "id": str(pack.set.id),
  35. "short_name": pack.set.short_name,
  36. },
  37. "id": str(document.id),
  38. "emoticons": [],
  39. }
  40. async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir: str) -> None:
  41. if pack.set.animated:
  42. print("Animated stickerpacks are currently not supported")
  43. return
  44. pack_path = os.path.join(output_dir, f"{pack.set.short_name}.json")
  45. try:
  46. os.mkdir(os.path.dirname(pack_path))
  47. except FileExistsError:
  48. pass
  49. print(f"Reuploading {pack.set.title} with {pack.set.count} stickers "
  50. f"and writing output to {pack_path}")
  51. already_uploaded = {}
  52. try:
  53. with open(pack_path) as pack_file:
  54. existing_pack = json.load(pack_file)
  55. already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker
  56. for sticker in existing_pack["stickers"]}
  57. print(f"Found {len(already_uploaded)} already reuploaded stickers")
  58. except FileNotFoundError:
  59. pass
  60. reuploaded_documents: Dict[int, matrix.StickerInfo] = {}
  61. for document in pack.documents:
  62. try:
  63. reuploaded_documents[document.id] = already_uploaded[document.id]
  64. print(f"Skipped reuploading {document.id}")
  65. except KeyError:
  66. reuploaded_documents[document.id] = await reupload_document(client, document)
  67. # Always ensure the body and telegram metadata is correct
  68. add_meta(document, reuploaded_documents[document.id], pack)
  69. for sticker in pack.packs:
  70. if not sticker.emoticon:
  71. continue
  72. for document_id in sticker.documents:
  73. doc = reuploaded_documents[document_id]
  74. # If there was no sticker metadata, use the first emoji we find
  75. if doc["body"] == "":
  76. doc["body"] = sticker.emoticon
  77. doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon)
  78. with open(pack_path, "w") as pack_file:
  79. json.dump({
  80. "title": pack.set.title,
  81. "id": f"tg-{pack.set.id}",
  82. "net.maunium.telegram.pack": {
  83. "short_name": pack.set.short_name,
  84. "hash": str(pack.set.hash),
  85. },
  86. "stickers": list(reuploaded_documents.values()),
  87. }, pack_file, ensure_ascii=False)
  88. print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
  89. util.add_to_index(os.path.basename(pack_path), output_dir)
  90. pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?"
  91. r"([A-Za-z0-9-_]+)"
  92. r"(?:\.json)?$")
  93. parser = argparse.ArgumentParser()
  94. parser.add_argument("--list", help="List your saved sticker packs", action="store_true")
  95. parser.add_argument("--session", help="Telethon session file name", default="sticker-import")
  96. parser.add_argument("--config",
  97. help="Path to JSON file with Matrix homeserver and access_token",
  98. type=str, default="config.json")
  99. parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/",
  100. type=str)
  101. parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*")
  102. async def main(args: argparse.Namespace) -> None:
  103. await matrix.load_config(args.config)
  104. client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7")
  105. await client.start()
  106. if args.list:
  107. stickers: AllStickers = await client(GetAllStickersRequest(hash=0))
  108. index = 1
  109. width = len(str(stickers.sets))
  110. print("Your saved sticker packs:")
  111. for saved_pack in stickers.sets:
  112. print(f"{index:>{width}}. {saved_pack.title} "
  113. f"(t.me/addstickers/{saved_pack.short_name})")
  114. elif args.pack[0]:
  115. input_packs = []
  116. for pack_url in args.pack[0]:
  117. match = pack_url_regex.match(pack_url)
  118. if not match:
  119. print(f"'{pack_url}' doesn't look like a sticker pack URL")
  120. return
  121. input_packs.append(InputStickerSetShortName(short_name=match.group(1)))
  122. for input_pack in input_packs:
  123. pack: StickerSetFull = await client(GetStickerSetRequest(input_pack))
  124. await reupload_pack(client, pack, args.output_dir)
  125. else:
  126. parser.print_help()
  127. await client.disconnect()
  128. def cmd() -> None:
  129. asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
  130. if __name__ == "__main__":
  131. cmd()