You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
12 KiB

  1. # maunium-stickerpicker - A fast and simple Matrix sticker picker widget.
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from functools import partial
  17. from io import BytesIO
  18. import numpy as np
  19. import os.path
  20. import subprocess
  21. import json
  22. import tempfile
  23. import mimetypes
  24. try:
  25. import magic
  26. except ImportError:
  27. print("[Warning] Magic is not installed, using file extensions to guess mime types")
  28. magic = None
  29. from PIL import Image, ImageSequence, ImageFilter
  30. from . import matrix
  31. open_utf8 = partial(open, encoding='UTF-8')
  32. def guess_mime(data: bytes) -> str:
  33. mime = None
  34. if magic:
  35. try:
  36. return magic.Magic(mime=True).from_buffer(data)
  37. except Exception:
  38. pass
  39. else:
  40. with tempfile.NamedTemporaryFile() as temp:
  41. temp.write(data)
  42. temp.close()
  43. mime, _ = mimetypes.guess_type(temp.name)
  44. return mime or "image/png"
  45. def _video_to_webp(data: bytes) -> bytes:
  46. mime = guess_mime(data)
  47. ext = mimetypes.guess_extension(mime)
  48. with tempfile.NamedTemporaryFile(suffix=ext) as video:
  49. video.write(data)
  50. video.flush()
  51. with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
  52. print(".", end="", flush=True)
  53. ffmpeg_encoder_args = []
  54. if mime == "video/webm":
  55. encode = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nokey=1:noprint_wrappers=1", video.name], capture_output=True, text=True).stdout.strip()
  56. ffmpeg_encoder = None
  57. if encode == "vp8":
  58. ffmpeg_encoder = "libvpx"
  59. elif encode == "vp9":
  60. ffmpeg_encoder = "libvpx-vp9"
  61. if ffmpeg_encoder:
  62. ffmpeg_encoder_args = ["-c:v", ffmpeg_encoder]
  63. result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", *ffmpeg_encoder_args, "-i", video.name, "-lossless", "1", webp.name],
  64. capture_output=True)
  65. if result.returncode != 0:
  66. raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
  67. webp.seek(0)
  68. return webp.read()
  69. def video_to_webp(data: bytes) -> bytes:
  70. mime = guess_mime(data)
  71. ext = mimetypes.guess_extension(mime)
  72. # run ffmpeg to fix duration
  73. with tempfile.NamedTemporaryFile(suffix=ext) as temp:
  74. temp.write(data)
  75. temp.flush()
  76. with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
  77. print(".", end="", flush=True)
  78. result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
  79. capture_output=True)
  80. if result.returncode != 0:
  81. raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
  82. temp_fixed.seek(0)
  83. data = temp_fixed.read()
  84. return _video_to_webp(data)
  85. def process_frame(frame):
  86. """
  87. Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
  88. """
  89. frame = frame.convert('RGBA')
  90. # Decompose Alpha channel
  91. alpha = frame.getchannel('A')
  92. # Process Alpha channel with threshold, remove semi-transparent pixels
  93. # Threshold can be adjusted as needed (0-255), 128 is the middle value
  94. threshold = 128
  95. alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
  96. # Process Alpha channel with MinFilter, remove edge noise
  97. alpha = alpha.filter(ImageFilter.MinFilter(3))
  98. # Process Alpha channel with MaxFilter, repair edges
  99. alpha = alpha.filter(ImageFilter.MaxFilter(3))
  100. # Apply processed Alpha channel back to image
  101. frame.putalpha(alpha)
  102. return frame
  103. def webp_to_others(data: bytes, mimetype: str) -> bytes:
  104. format = mimetypes.guess_extension(mimetype)[1:]
  105. print(format)
  106. with Image.open(BytesIO(data)) as webp:
  107. with BytesIO() as img:
  108. print(".", end="", flush=True)
  109. webp.info.pop('background', None)
  110. if mimetype == "image/gif":
  111. frames = []
  112. duration = [100, ]
  113. for frame in ImageSequence.Iterator(webp):
  114. frame = process_frame(frame)
  115. frames.append(frame)
  116. duration.append(frame.info.get('duration', duration[-1]))
  117. frames[0].save(img, format=format, save_all=True, lossless=True, quality=100, method=6,
  118. append_images=frames[1:], loop=0, duration=duration[1:], disposal=2)
  119. else:
  120. webp.save(img, format=format, lossless=True, quality=100, method=6)
  121. img.seek(0)
  122. return img.read()
  123. def is_uniform_animated_webp(data: bytes) -> bool:
  124. with Image.open(BytesIO(data)) as img:
  125. if img.n_frames <= 1:
  126. return True
  127. img_iter = ImageSequence.Iterator(img)
  128. first_frame = np.array(img_iter[0].convert("RGBA"))
  129. for frame in img_iter:
  130. current_frame = np.array(frame.convert("RGBA"))
  131. if not np.array_equal(first_frame, current_frame):
  132. return False
  133. return True
  134. def webp_to_gif_or_png(data: bytes) -> bytes:
  135. with Image.open(BytesIO(data)) as image:
  136. # check if the webp is animated
  137. is_animated = getattr(image, "is_animated", False)
  138. if is_animated and not is_uniform_animated_webp(data):
  139. return webp_to_others(data, "image/gif")
  140. else:
  141. # convert to png
  142. return webp_to_others(data, "image/png")
  143. def opermize_gif(data: bytes) -> bytes:
  144. with tempfile.NamedTemporaryFile() as gif:
  145. gif.write(data)
  146. gif.flush()
  147. # use gifsicle to optimize gif
  148. result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
  149. capture_output=True)
  150. if result.returncode != 0:
  151. raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
  152. gif.seek(0)
  153. return gif.read()
  154. def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
  155. with Image.open(BytesIO(data)) as image:
  156. with BytesIO() as new_file:
  157. # Determine if the image is a GIF
  158. is_animated = getattr(image, "is_animated", False)
  159. if is_animated:
  160. frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
  161. # Save the new GIF
  162. frames[0].save(
  163. new_file,
  164. format='GIF',
  165. save_all=True,
  166. append_images=frames[1:],
  167. loop=image.info.get('loop', 0), # Default loop to 0 if not present
  168. duration=image.info.get('duration', 100), # Set a default duration if not present
  169. disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
  170. )
  171. # Get the size of the first frame to determine resizing
  172. w, h = frames[0].size
  173. else:
  174. suffix = mimetypes.guess_extension(mimetype)
  175. if suffix:
  176. suffix = suffix[1:]
  177. image = image.convert("RGBA")
  178. image.save(new_file, format=suffix)
  179. w, h = image.size
  180. if w > 256 or h > 256:
  181. # Set the width and height to lower values so clients wouldn't show them as huge images
  182. if w > h:
  183. h = int(h / (w / 256))
  184. w = 256
  185. else:
  186. w = int(w / (h / 256))
  187. h = 256
  188. return new_file.getvalue(), w, h
  189. def _convert_sticker(data: bytes) -> (bytes, str, int, int):
  190. mimetype = guess_mime(data)
  191. if mimetype.startswith("video/"):
  192. data = video_to_webp(data)
  193. print(".", end="", flush=True)
  194. elif mimetype.startswith("application/gzip"):
  195. print(".", end="", flush=True)
  196. # unzip file
  197. import gzip
  198. with gzip.open(BytesIO(data), "rb") as f:
  199. data = f.read()
  200. mimetype = guess_mime(data)
  201. suffix = mimetypes.guess_extension(mimetype)
  202. with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
  203. temp.write(data)
  204. with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
  205. # run lottie_convert.py input output
  206. print(".", end="", flush=True)
  207. import subprocess
  208. cmd = ["lottie_convert.py", temp.name, gif.name]
  209. result = subprocess.run(cmd, capture_output=True, text=True)
  210. retcode = result.returncode
  211. if retcode != 0:
  212. raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
  213. gif.seek(0)
  214. data = gif.read()
  215. mimetype = guess_mime(data)
  216. if mimetype == "image/webp":
  217. data = webp_to_gif_or_png(data)
  218. mimetype = guess_mime(data)
  219. rlt = _convert_image(data, mimetype)
  220. data = rlt[0]
  221. if mimetype == "image/gif":
  222. print(".", end="", flush=True)
  223. data = opermize_gif(data)
  224. return data, mimetype, rlt[1], rlt[2]
  225. def convert_sticker(data: bytes) -> (bytes, str, int, int):
  226. try:
  227. return _convert_sticker(data)
  228. except Exception as e:
  229. mimetype = guess_mime(data)
  230. print(f"Error converting image, mimetype: {mimetype}")
  231. ext = mimetypes.guess_extension(mimetype)
  232. with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
  233. temp.write(data)
  234. print(f"Saved to {temp.name}")
  235. raise e
  236. def add_to_index(name: str, output_dir: str) -> None:
  237. index_path = os.path.join(output_dir, "index.json")
  238. try:
  239. with open_utf8(index_path) as index_file:
  240. index_data = json.load(index_file)
  241. except (FileNotFoundError, json.JSONDecodeError):
  242. index_data = {"packs": []}
  243. if "homeserver_url" not in index_data and matrix.homeserver_url:
  244. index_data["homeserver_url"] = matrix.homeserver_url
  245. if name not in index_data["packs"]:
  246. index_data["packs"].append(name)
  247. with open_utf8(index_path, "w") as index_file:
  248. json.dump(index_data, index_file, indent=" ")
  249. print(f"Added {name} to {index_path}")
  250. def make_sticker(mxc: str, width: int, height: int, size: int,
  251. mimetype: str, body: str = "") -> matrix.StickerInfo:
  252. return {
  253. "body": body,
  254. "url": mxc,
  255. "info": {
  256. "w": width,
  257. "h": height,
  258. "size": size,
  259. "mimetype": mimetype,
  260. # Element iOS compatibility hack
  261. "thumbnail_url": mxc,
  262. "thumbnail_info": {
  263. "w": width,
  264. "h": height,
  265. "size": size,
  266. "mimetype": mimetype,
  267. },
  268. },
  269. "msgtype": "m.sticker",
  270. }