You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

184 lines
7.4 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. # maunium-stickerpicker - A fast and simple Matrix sticker picker widget.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Dict
  17. import argparse
  18. import asyncio
  19. import os.path
  20. import json
  21. import re
  22. import os
  23. from telethon import TelegramClient
  24. from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest
  25. from telethon.tl.types.messages import AllStickers
  26. from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker
  27. from telethon.tl.types.messages import StickerSet as StickerSetFull
  28. from subprocess import run
  29. from .lib import matrix, util
  30. async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo:
  31. print(f"Reuploading {document.id}", end="", flush=True)
  32. data = await client.download_media(document, file=bytes)
  33. print(".", end="", flush=True)
  34. data, width, height = util.convert_image(data)
  35. print(".", end="", flush=True)
  36. mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
  37. print(".", flush=True)
  38. return util.make_sticker(mxc, width, height, len(data))
  39. async def reupload_document_gif(client: TelegramClient, document: Document) -> matrix.StickerInfo:
  40. print(f"Reuploading {document.id}", end="", flush=True)
  41. data = await client.download_media(document, file=bytes)
  42. print(".", end="", flush=True)
  43. # run LottieConverter
  44. convert_data = run(['lottieconverter', '-', '-', 'gif', '512x512', '60'],capture_output = True, input=data).stdout
  45. print(".", end="", flush=True)
  46. mxc = await matrix.upload(convert_data, "image/gif", f"{document.id}.gif")
  47. print(".", flush=True)
  48. # 512x512 is mandatory for all stickers
  49. return util.make_sticker(mxc, 512, 512, len(data), mimetype="image/gif")
  50. def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
  51. for attr in document.attributes:
  52. if isinstance(attr, DocumentAttributeSticker):
  53. info["body"] = attr.alt
  54. info["id"] = f"tg-{document.id}"
  55. info["net.maunium.telegram.sticker"] = {
  56. "pack": {
  57. "id": str(pack.set.id),
  58. "short_name": pack.set.short_name,
  59. },
  60. "id": str(document.id),
  61. "emoticons": [],
  62. }
  63. async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir: str) -> None:
  64. if pack.set.animated:
  65. print("Warning, to convert animated stickers, you need lottieconverter installed")
  66. pack_path = os.path.join(output_dir, f"{pack.set.short_name}.json")
  67. try:
  68. os.mkdir(os.path.dirname(pack_path))
  69. except FileExistsError:
  70. pass
  71. print(f"Reuploading {pack.set.title} with {pack.set.count} stickers "
  72. f"and writing output to {pack_path}")
  73. already_uploaded = {}
  74. try:
  75. with util.open_utf8(pack_path) as pack_file:
  76. existing_pack = json.load(pack_file)
  77. already_uploaded = {int(sticker["net.maunium.telegram.sticker"]["id"]): sticker
  78. for sticker in existing_pack["stickers"]}
  79. print(f"Found {len(already_uploaded)} already reuploaded stickers")
  80. except FileNotFoundError:
  81. pass
  82. reuploaded_documents: Dict[int, matrix.StickerInfo] = {}
  83. for document in pack.documents:
  84. try:
  85. reuploaded_documents[document.id] = already_uploaded[document.id]
  86. print(f"Skipped reuploading {document.id}")
  87. except KeyError:
  88. if pack.set.animated:
  89. reuploaded_documents[document.id] = await reupload_document_gif(client,document)
  90. else:
  91. reuploaded_documents[document.id] = await reupload_document(client, document)
  92. # Always ensure the body and telegram metadata is correct
  93. add_meta(document, reuploaded_documents[document.id], pack)
  94. for sticker in pack.packs:
  95. if not sticker.emoticon:
  96. continue
  97. for document_id in sticker.documents:
  98. doc = reuploaded_documents[document_id]
  99. # If there was no sticker metadata, use the first emoji we find
  100. if doc["body"] == "":
  101. doc["body"] = sticker.emoticon
  102. doc["net.maunium.telegram.sticker"]["emoticons"].append(sticker.emoticon)
  103. with util.open_utf8(pack_path, "w") as pack_file:
  104. json.dump({
  105. "title": pack.set.title,
  106. "id": f"tg-{pack.set.id}",
  107. "net.maunium.telegram.pack": {
  108. "short_name": pack.set.short_name,
  109. "hash": str(pack.set.hash),
  110. },
  111. "stickers": list(reuploaded_documents.values()),
  112. }, pack_file, ensure_ascii=False)
  113. print(f"Saved {pack.set.title} as {pack.set.short_name}.json")
  114. util.add_to_index(os.path.basename(pack_path), output_dir)
  115. pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?"
  116. r"([A-Za-z0-9-_]+)"
  117. r"(?:\.json)?$")
  118. parser = argparse.ArgumentParser()
  119. parser.add_argument("--list", help="List your saved sticker packs", action="store_true")
  120. parser.add_argument("--session", help="Telethon session file name", default="sticker-import")
  121. parser.add_argument("--config",
  122. help="Path to JSON file with Matrix homeserver and access_token",
  123. type=str, default="config.json")
  124. parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/",
  125. type=str)
  126. parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*")
  127. async def main(args: argparse.Namespace) -> None:
  128. await matrix.load_config(args.config)
  129. client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7")
  130. await client.start()
  131. if args.list:
  132. stickers: AllStickers = await client(GetAllStickersRequest(hash=0))
  133. index = 1
  134. width = len(str(len(stickers.sets)))
  135. print("Your saved sticker packs:")
  136. for saved_pack in stickers.sets:
  137. print(f"{index:>{width}}. {saved_pack.title} "
  138. f"(t.me/addstickers/{saved_pack.short_name})")
  139. index += 1
  140. elif args.pack[0]:
  141. input_packs = []
  142. for pack_url in args.pack[0]:
  143. match = pack_url_regex.match(pack_url)
  144. if not match:
  145. print(f"'{pack_url}' doesn't look like a sticker pack URL")
  146. return
  147. input_packs.append(InputStickerSetShortName(short_name=match.group(1)))
  148. for input_pack in input_packs:
  149. pack: StickerSetFull = await client(GetStickerSetRequest(input_pack, hash=0))
  150. await reupload_pack(client, pack, args.output_dir)
  151. else:
  152. parser.print_help()
  153. await client.disconnect()
  154. def cmd() -> None:
  155. asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))
  156. if __name__ == "__main__":
  157. cmd()