# -*- coding: utf-8 -*- # Copyright 2025 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://simpcity.cr/""" from .common import Extractor, Message from .. import text, exception from ..cache import cache BASE_PATTERN = r"(?:https?://)?(?:www\.)?simpcity\.(?:cr|su)" class SimpcityExtractor(Extractor): """Base class for simpcity extractors""" category = "simpcity" cookies_domain = "simpcity.cr" cookies_names = ("ogaddgmetaprof_user",) root = "https://simpcity.cr" def items(self): self.login() extract_urls = text.re( r'<(?:a [^>]*?href|iframe [^>]*?src)="([^"]+)').findall for post in self.posts(): urls = extract_urls(post["content"]) data = {"post": post} post["count"] = data["count"] = len(urls) yield Message.Directory, data for data["num"], url in enumerate(urls, 1): yield Message.Queue, url, data def request_page(self, url): try: return self.request(url) except exception.HttpError as exc: if exc.status == 403 and b">Log in<" in exc.response.content: raise exception.AuthRequired( ("username & password", "authenticated cookies"), None, self._extract_error(exc.response.text)) raise def login(self): if self.cookies_check(self.cookies_names): return username, password = self._get_auth_info() if username: self.cookies_update(self._login_impl(username, password)) @cache(maxage=365*86400, keyarg=1) def _login_impl(self, username, password): self.log.info("Logging in as %s", username) url = f"{self.root}/login/login" page = self.request(url).text data = { "_xfToken": text.extr(page, 'name="_xfToken" value="', '"'), "login" : username, "password": password, "remember": "1", "_xfRedirect": "", } response = self.request(url, method="POST", data=data) if not response.history: err = self._extract_error(response.text) raise exception.AuthenticationError(f'"{err}"') return { cookie.name: cookie.value for cookie in self.cookies if cookie.domain.endswith(self.cookies_domain) } def _pagination(self, base, pnum=None): base = f"{self.root}{base}" if pnum is None: url = f"{base}/" pnum = 1 else: url = f"{base}/page-{pnum}" pnum = None while True: page = self.request_page(url).text yield page if pnum is None or "pageNav-jump--next" not in page: return pnum += 1 url = f"{base}/page-{pnum}" def _pagination_reverse(self, base, pnum=None): base = f"{self.root}{base}" url = f"{base}/page-9999" # force redirect to last page with self.request_page(url) as response: url = response.url if url[-1] == "/": pnum = 1 else: pnum = text.parse_int(url[url.rfind("-")+1:], 1) page = response.text while True: yield page pnum -= 1 if pnum > 1: url = f"{base}/page-{pnum}" elif pnum == 1: url = f"{base}/" else: return page = self.request_page(url).text def _extract_error(self, html): return text.unescape(text.extr( html, "blockMessage--error", "").rpartition(">")[2].strip()) def _parse_thread(self, page): schema = self._extract_jsonld(page)["mainEntity"] author = schema["author"] stats = schema["interactionStatistic"] url_t = schema["url"] url_a = author.get("url") or "" thread = { "id" : url_t[url_t.rfind(".")+1:-1], "url" : url_t, "title": schema["headline"], "date" : text.parse_datetime(schema["datePublished"]), "views": stats[0]["userInteractionCount"], "posts": stats[1]["userInteractionCount"], "tags" : (schema["keywords"].split(", ") if "keywords" in schema else ()), "section" : schema["articleSection"], "author" : author.get("name") or "", "author_id" : (url_a[url_a.rfind(".")+1:-1] if url_a else (author.get("name") or "")[15:]), "author_url": url_a, } return thread def _parse_post(self, html): extr = text.extract_from(html) post = { "author": extr('data-author="', '"'), "id": extr('data-content="post-', '"'), "author_url": extr('itemprop="url" content="', '"'), "date": text.parse_datetime(extr('datetime="', '"')), "content": extr('