|
|
@ -3,156 +3,34 @@ |
|
|
|
# This Source Code Form is subject to the terms of the Mozilla Public |
|
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this |
|
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/. |
|
|
|
from typing import Dict, Optional, TYPE_CHECKING |
|
|
|
from io import BytesIO |
|
|
|
from typing import Dict |
|
|
|
import argparse |
|
|
|
import os.path |
|
|
|
import asyncio |
|
|
|
import os.path |
|
|
|
import json |
|
|
|
import re |
|
|
|
|
|
|
|
from aiohttp import ClientSession |
|
|
|
from yarl import URL |
|
|
|
from PIL import Image |
|
|
|
|
|
|
|
from telethon import TelegramClient |
|
|
|
from telethon.tl.functions.messages import GetAllStickersRequest, GetStickerSetRequest |
|
|
|
from telethon.tl.types.messages import AllStickers |
|
|
|
from telethon.tl.types import InputStickerSetShortName, Document, DocumentAttributeSticker |
|
|
|
from telethon.tl.types.messages import StickerSet as StickerSetFull |
|
|
|
|
|
|
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument("--list", help="List your saved sticker packs", action="store_true") |
|
|
|
parser.add_argument("--session", help="Telethon session file name", default="sticker-import") |
|
|
|
parser.add_argument("--config", help="Path to JSON file with Matrix homeserver and access_token", |
|
|
|
type=str, default="config.json") |
|
|
|
parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/", |
|
|
|
type=str) |
|
|
|
parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*") |
|
|
|
args = parser.parse_args() |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
|
|
|
|
|
|
async def whoami(url: URL, access_token: str) -> str: |
|
|
|
headers = {"Authorization": f"Bearer {access_token}"} |
|
|
|
async with ClientSession() as sess, sess.get(url, headers=headers) as resp: |
|
|
|
resp.raise_for_status() |
|
|
|
user_id = (await resp.json())["user_id"] |
|
|
|
print(f"Access token validated (user ID: {user_id})") |
|
|
|
return user_id |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
with open(args.config) as config_file: |
|
|
|
config = json.load(config_file) |
|
|
|
homeserver_url = config["homeserver"] |
|
|
|
access_token = config["access_token"] |
|
|
|
except FileNotFoundError: |
|
|
|
print("Matrix config file not found. Please enter your homeserver and access token.") |
|
|
|
homeserver_url = input("Homeserver URL: ") |
|
|
|
access_token = input("Access token: ") |
|
|
|
whoami_url = URL(homeserver_url) / "_matrix" / "client" / "r0" / "account" / "whoami" |
|
|
|
user_id = loop.run_until_complete(whoami(whoami_url, access_token)) |
|
|
|
with open(args.config, "w") as config_file: |
|
|
|
json.dump({ |
|
|
|
"homeserver": homeserver_url, |
|
|
|
"user_id": user_id, |
|
|
|
"access_token": access_token |
|
|
|
}, config_file) |
|
|
|
print(f"Wrote config to {args.config}") |
|
|
|
|
|
|
|
upload_url = URL(homeserver_url) / "_matrix" / "media" / "r0" / "upload" |
|
|
|
|
|
|
|
|
|
|
|
async def upload(data: bytes, mimetype: str, filename: str) -> str: |
|
|
|
url = upload_url.with_query({"filename": filename}) |
|
|
|
headers = {"Content-Type": mimetype, "Authorization": f"Bearer {access_token}"} |
|
|
|
async with ClientSession() as sess, sess.post(url, data=data, headers=headers) as resp: |
|
|
|
return (await resp.json())["content_uri"] |
|
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
|
from typing import TypedDict |
|
|
|
|
|
|
|
|
|
|
|
class MatrixMediaInfo(TypedDict): |
|
|
|
w: int |
|
|
|
h: int |
|
|
|
size: int |
|
|
|
mimetype: str |
|
|
|
thumbnail_url: Optional[str] |
|
|
|
thumbnail_info: Optional['MatrixMediaInfo'] |
|
|
|
from .lib import matrix, util |
|
|
|
|
|
|
|
|
|
|
|
class MatrixStickerInfo(TypedDict, total=False): |
|
|
|
body: str |
|
|
|
url: str |
|
|
|
info: MatrixMediaInfo |
|
|
|
id: str |
|
|
|
|
|
|
|
|
|
|
|
def convert_image(data: bytes) -> (bytes, int, int): |
|
|
|
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") |
|
|
|
new_file = BytesIO() |
|
|
|
image.save(new_file, "png") |
|
|
|
w, h = image.size |
|
|
|
return new_file.getvalue(), w, h |
|
|
|
|
|
|
|
|
|
|
|
async def reupload_document(client: TelegramClient, document: Document) -> 'MatrixStickerInfo': |
|
|
|
async def reupload_document(client: TelegramClient, document: Document) -> matrix.StickerInfo: |
|
|
|
print(f"Reuploading {document.id}", end="", flush=True) |
|
|
|
data = await client.download_media(document, file=bytes) |
|
|
|
print(".", end="", flush=True) |
|
|
|
data, width, height = convert_image(data) |
|
|
|
data, width, height = util.convert_image(data) |
|
|
|
print(".", end="", flush=True) |
|
|
|
mxc = await upload(data, "image/png", f"{document.id}.png") |
|
|
|
mxc = await matrix.upload(data, "image/png", f"{document.id}.png") |
|
|
|
print(".", flush=True) |
|
|
|
if width > 256 or height > 256: |
|
|
|
# Set the width and height to lower values so clients wouldn't show them as huge images |
|
|
|
if width > height: |
|
|
|
height = int(height / (width / 256)) |
|
|
|
width = 256 |
|
|
|
else: |
|
|
|
width = int(width / (height / 256)) |
|
|
|
height = 256 |
|
|
|
return { |
|
|
|
"body": "", |
|
|
|
"url": mxc, |
|
|
|
"info": { |
|
|
|
"w": width, |
|
|
|
"h": height, |
|
|
|
"size": len(data), |
|
|
|
"mimetype": "image/png", |
|
|
|
|
|
|
|
# Element iOS compatibility hack |
|
|
|
"thumbnail_url": mxc, |
|
|
|
"thumbnail_info": { |
|
|
|
"w": width, |
|
|
|
"h": height, |
|
|
|
"size": len(data), |
|
|
|
"mimetype": "image/png", |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def add_to_index(name: str) -> None: |
|
|
|
index_path = os.path.join(args.output_dir, "index.json") |
|
|
|
try: |
|
|
|
with open(index_path) as index_file: |
|
|
|
index_data = json.load(index_file) |
|
|
|
except (FileNotFoundError, json.JSONDecodeError): |
|
|
|
index_data = {"packs": []} |
|
|
|
if "homeserver_url" not in index_data: |
|
|
|
index_data["homeserver_url"] = homeserver_url |
|
|
|
if name not in index_data["packs"]: |
|
|
|
index_data["packs"].append(name) |
|
|
|
with open(index_path, "w") as index_file: |
|
|
|
json.dump(index_data, index_file, indent=" ") |
|
|
|
print(f"Added {name} to {index_path}") |
|
|
|
return util.make_sticker(mxc, width, height, len(data)) |
|
|
|
|
|
|
|
|
|
|
|
def add_meta(document: Document, info: 'MatrixStickerInfo', pack: StickerSetFull) -> None: |
|
|
|
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: |
|
|
|
for attr in document.attributes: |
|
|
|
if isinstance(attr, DocumentAttributeSticker): |
|
|
|
info["body"] = attr.alt |
|
|
@ -167,12 +45,12 @@ def add_meta(document: Document, info: 'MatrixStickerInfo', pack: StickerSetFull |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None: |
|
|
|
async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir: str) -> None: |
|
|
|
if pack.set.animated: |
|
|
|
print("Animated stickerpacks are currently not supported") |
|
|
|
return |
|
|
|
|
|
|
|
pack_path = os.path.join(args.output_dir, f"{pack.set.short_name}.json") |
|
|
|
pack_path = os.path.join(output_dir, f"{pack.set.short_name}.json") |
|
|
|
try: |
|
|
|
os.mkdir(os.path.dirname(pack_path)) |
|
|
|
except FileExistsError: |
|
|
@ -191,7 +69,7 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None: |
|
|
|
except FileNotFoundError: |
|
|
|
pass |
|
|
|
|
|
|
|
reuploaded_documents: Dict[int, 'MatrixStickerInfo'] = {} |
|
|
|
reuploaded_documents: Dict[int, matrix.StickerInfo] = {} |
|
|
|
for document in pack.documents: |
|
|
|
try: |
|
|
|
reuploaded_documents[document.id] = already_uploaded[document.id] |
|
|
@ -223,15 +101,27 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull) -> None: |
|
|
|
}, pack_file, ensure_ascii=False) |
|
|
|
print(f"Saved {pack.set.title} as {pack.set.short_name}.json") |
|
|
|
|
|
|
|
add_to_index(os.path.basename(pack_path)) |
|
|
|
util.add_to_index(os.path.basename(pack_path), output_dir) |
|
|
|
|
|
|
|
|
|
|
|
pack_url_regex = re.compile(r"^(?:(?:https?://)?(?:t|telegram)\.(?:me|dog)/addstickers/)?" |
|
|
|
r"([A-Za-z0-9-_]+)" |
|
|
|
r"(?:\.json)?$") |
|
|
|
|
|
|
|
parser = argparse.ArgumentParser() |
|
|
|
|
|
|
|
async def main(): |
|
|
|
parser.add_argument("--list", help="List your saved sticker packs", action="store_true") |
|
|
|
parser.add_argument("--session", help="Telethon session file name", default="sticker-import") |
|
|
|
parser.add_argument("--config", |
|
|
|
help="Path to JSON file with Matrix homeserver and access_token", |
|
|
|
type=str, default="config.json") |
|
|
|
parser.add_argument("--output-dir", help="Directory to write packs to", default="web/packs/", |
|
|
|
type=str) |
|
|
|
parser.add_argument("pack", help="Sticker pack URLs to import", action="append", nargs="*") |
|
|
|
|
|
|
|
|
|
|
|
async def main(args: argparse.Namespace) -> None: |
|
|
|
await matrix.load_config(args.config) |
|
|
|
client = TelegramClient(args.session, 298751, "cb676d6bae20553c9996996a8f52b4d7") |
|
|
|
await client.start() |
|
|
|
|
|
|
@ -253,11 +143,16 @@ async def main(): |
|
|
|
input_packs.append(InputStickerSetShortName(short_name=match.group(1))) |
|
|
|
for input_pack in input_packs: |
|
|
|
pack: StickerSetFull = await client(GetStickerSetRequest(input_pack)) |
|
|
|
await reupload_pack(client, pack) |
|
|
|
await reupload_pack(client, pack, args.output_dir) |
|
|
|
else: |
|
|
|
parser.print_help() |
|
|
|
|
|
|
|
await client.disconnect() |
|
|
|
|
|
|
|
|
|
|
|
loop.run_until_complete(main()) |
|
|
|
def cmd() -> None: |
|
|
|
asyncio.get_event_loop().run_until_complete(main(parser.parse_args())) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
cmd() |
xxxxxxxxxx