import requests import bs4 import random import configparser import re import io import os import mimetypes import asyncio from collections import defaultdict from slixmpp import ClientXMPP from urllib.parse import urlparse, parse_qs, urlunparse from pantomime import normalize_mimetype import ecgi parser = "html.parser" user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0" accept_lang = "en-US" data_limit = 786400000 headers = { "user-agent": user_agent, "Accept-Language": accept_lang, "Cache-Control": "no-cache", } block_list = ( "localhost", "127.0.0.1", "0.0.0.0", "youtu.be", "www.youtube.com", "youtube.com", "m.youtube.com", "music.youtube.com", ) req_list = ( "http://", "https://", ) html_files = ( "text/html", "application/xhtml+xml", ) class Lifo(list): """ Limited size LIFO array to store messages and urls """ def __init__(self, size): super().__init__() self.size = size def add(self, item): self.insert(0, item) if len(self) > self.size: self.pop() # Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen class ChenBot(ClientXMPP): commands = {} muc_commands = {} messages = defaultdict( lambda: { "messages": Lifo(100), "links": Lifo(10), "previews": Lifo(10), } ) def get_urls(self, msg): str_list = msg["body"].strip().split() urls = [u for u in str_list if any(r in u for r in req_list)] return urls async def parse_uri(self, uri, sender, mtype): """Parse a URI and send the result to the sender.""" netloc = uri.netloc if netloc.split(":")[0] in block_list: return else: await self.process_link(uri, sender, mtype) async def process_link(self, uri, sender, mtype): """Process a link and send the result to the sender.""" url = urlunparse(uri) r = requests.get(url, stream=True, headers=headers, timeout=5) if not r.ok: return ftype = normalize_mimetype(r.headers.get("content-type")) if ftype in html_files: data = "" for i in r.iter_content(chunk_size=1024, decode_unicode=False): data += i.decode("utf-8", errors="ignore") if len(data) > data_limit or "" in data.lower(): break soup = bs4.BeautifulSoup(data, parser) if title := soup.find("title"): output = title.text.strip() if output: output = f"*{output}*" if ("\n" not in output) else output if output in self.messages[sender]["previews"]: return self.messages[sender]["previews"].add(output) if r.history: self.send_message(mto=sender, mbody=r.url, mtype=mtype) self.send_message(mto=sender, mbody=output, mtype=mtype) else: try: length = 0 outfile = io.BytesIO() for chunk in r.iter_content( chunk_size=512, decode_unicode=False, ): length += 512 if length >= data_limit: return outfile.write(chunk) content_disposition = r.headers.get("content-disposition") filename = None if content_disposition: _, params = ecgi.parse_header(content_disposition) filename = params.get("filename") else: filename = os.path.basename(uri.path) ext = os.path.splitext(filename)[1] if filename else ".txt" fname = filename if filename else f"file{ext}" await self.embed_file(url, sender, mtype, ftype, fname, outfile) except Exception as e: print(e) async def embed_file(self, url, sender, mtype, ftype, fname, outfile): """Embed a file and send the result to the sender.""" furl = await self.plugin["xep_0363"].upload_file( fname, content_type=ftype, input_file=outfile ) message = self.make_message(sender) message["body"] = furl message["type"] = mtype message["oob"]["url"] = furl message.send() async def parse_urls(self, msg, urls, sender, mtype): body = msg["body"].lower() if "nsfl" in body: return if "nsfw" in body: return if "#nospoil" in body: return for u in urls: if u in self.messages[sender]["links"]: continue else: self.messages[sender]["links"].add(u) uri = urlparse(u) await self.parse_uri(uri, sender, mtype) def __init__(self, jid, password, nick, autojoin=None): ClientXMPP.__init__(self, jid, password) self.jid = jid self.nick = nick or [] self.autojoin = autojoin or [] self.register_plugin("xep_0030") self.register_plugin("xep_0060") self.register_plugin("xep_0054") self.register_plugin("xep_0045") self.register_plugin("xep_0066") self.register_plugin("xep_0084") self.register_plugin("xep_0153") self.register_plugin("xep_0363") self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.add_event_handler("groupchat_message", self.muc_message) self.add_event_handler("disconnected", lambda _: self.connect()) async def session_start(self, event): """Start the bot.""" self.send_presence() await self.get_roster() await self.update_info() for channel in self.autojoin: try: self.plugin["xep_0045"].join_muc(channel, self.nick) except Exception as e: print(e) async def update_info(self): """Update the bot info.""" with open("avatar.png", "rb") as avatar_file: avatar = avatar_file.read() avatar_type = "image/png" avatar_id = self.plugin["xep_0084"].generate_id(avatar) avatar_bytes = len(avatar) asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) asyncio.gather( self.plugin["xep_0153"].set_avatar( avatar=avatar, mtype=avatar_type, ) ) info = { "id": avatar_id, "type": avatar_type, "bytes": avatar_bytes, } asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) vcard = self.plugin["xep_0054"].make_vcard() vcard["URL"] = "https://git.chaotic.ninja/yakumo.izuru/chen" vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai" vcard["NICKNAME"] = "Chen" vcard["FN"] = "Chen" asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) async def message(self, msg): """Process a message.""" if msg["type"] in ("chat", "normal"): mtype = "chat" sender = msg["from"].bare message = msg["body"] ctx = message.strip().split() try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception as e: print(e) async def muc_message(self, msg): """Process a groupchat message.""" if msg["type"] in ("groupchat", "normal"): mtype = "groupchat" sender = msg["from"].bare if msg["mucnick"] == self.nick: return ctx = msg["body"].strip().split() try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception: pass if __name__ == "__main__": config = configparser.ConfigParser() config.read("config.ini") jid = config["chen"]["jid"] password = config["chen"]["password"] nick = config["chen"]["nick"] autojoin = config["chen"]["autojoin"].split() bot = ChenBot(jid, password, nick, autojoin=autojoin) bot.connect() bot.process(forever=True)