Source code for proboards_scraper.http_requests

import hashlib
import http
import imghdr
import logging
import pathlib
import time
from typing import List

import aiofiles
import aiohttp
import bs4
import selenium.webdriver

logger = logging.getLogger(__name__)

def test_ico(h: bytes, f):
    Test for .ico files to be added to the ``imghdr`` module tests.
    See `ICO file format`_ and `imghdr.tests`_.

    .. _`ICO file format`:
    .. _`imghdr.tests`:
    if h.startswith(b"\x00\x00") and (h[2:4] in (b"\x01\x00", b"\x02\x00")):
        return "ico"

[docs]def get_chrome_driver() -> selenium.webdriver.Chrome: """ Returns an instance of a Selenium Chrome driver with the headless option set to ``True``. Returns: Headless Chrome driver. """ chrome_opts = selenium.webdriver.ChromeOptions() chrome_opts.headless = True driver = selenium.webdriver.Chrome(options=chrome_opts) return driver
[docs]def get_login_cookies( home_url: str, username: str, password: str, driver: selenium.webdriver.Chrome = None, page_load_wait: int = 1 ) -> List[dict]: """ Logs in to a Proboards account using Selenium and returns the cookies from the authenticated login session. Args: home_url: URL for the Proboards forum homepage. username: Login username. password: Login password. driver: Selenium Chrome driver (optional). page_load_wait: Time (in seconds) to wait to allow the page to load. Returns: A list of dicts, where each dict corresponds to a cookie, from the Selenium Chrome driver. """ if driver is None: driver = get_chrome_driver() driver.get(home_url) time.sleep(page_load_wait) links = driver.find_elements_by_tag_name("a") login_url = None for link in links: href = link.get_attribute("href") if href.startswith(""): login_url = href break # Navigate to login page and fill in username/password fields. driver.get(login_url) time.sleep(page_load_wait) email_input = None password_input = None submit_input = None inputs = driver.find_elements_by_tag_name("input") for input_ in inputs: try: input_name = input_.get_attribute("name") if input_name == "email": email_input = input_ elif input_name == "password": password_input = input_ elif input_name == "continue": submit_input = input_ except Exception: pass email_input.send_keys(username) password_input.send_keys(password) time.sleep(page_load_wait) cookies = driver.get_cookies() return cookies
[docs]def get_login_session(cookies: List[dict]) -> aiohttp.ClientSession: """ Get an authenticated ``aiohttp`` session using the cookies provided. This is achieved by converting cookies from a Selenium driver session to ``http`` module Morsels (see `http.cookies.Morsel`_), which can be added to the ``aiohttp`` session's cookie jar. Args: cookies: A list of dicts as returned by :func:`get_login_cookies`, i.e., from a Selenium driver session. Returns: An ``aiohttp`` session with the given cookies in its cookie jar. .. _`http.cookies.Morsel`: """ logger.debug("Creating aiohttp login session from cookies") session = aiohttp.ClientSession() morsels = {} for cookie in cookies: # morsel = http.cookies.Morsel() morsel.set(cookie["name"], cookie["value"], cookie["value"]) morsel["domain"] = cookie["domain"] morsel["httponly"] = cookie["httpOnly"] morsel["path"] = cookie["path"] morsel["secure"] = cookie["secure"] # NOTE: ignore expires field; if it's absent, the cookie remains # valid for the duration of the session. # if "expiry" in cookie: # morsel["expires"] = cookie["expiry"] morsels[cookie["name"]] = morsel session.cookie_jar.update_cookies(morsels) logger.debug("Added cookies to aiohttp session") return session
[docs]async def get_source( url: str, session: aiohttp.ClientSession ) -> bs4.BeautifulSoup: """ Get page source of a URL. Args: url: URL to visit. session: ``aiohttp`` session. Returns: Page source. """ logger.debug(f"Getting page source for {url}") # TODO: check response HTTP status code resp = await session.get(url) text = await resp.text() return bs4.BeautifulSoup(text, "html.parser")
[docs]async def download_image( url: str, session: aiohttp.ClientSession, dst_dir: pathlib.Path ) -> dict: """ Attempt to download the image at ``url`` to the directory specified by ``dst_dir``. The downloaded file is named after its MD5 hash to ensure uniqueness. If a file already exists on disk (i.e., has been previously downloaded), it is not rewritten. Args: url: Image URL. session: ``aiohttp`` session. dst_dir: Directory to which the image should be downloaded. Returns: A dict containing information on the download attempt and, if download was successful, image metadata:: { "status": { "get": HTTP response code, "exists": whether the image already exists on disk (bool), "valid": whether the file is a valid image file, }, "image": { "url": image download URL, "filename": downloaded image filename, "md5_hash": file MD5 hash, "size": filesize on disk, }, } """ if url.startswith("//"): url = f"https:{url}" logger.debug(f"Downloading image: {url}") ret = { "status": { "get": None, "exists": None, "valid": None }, "image": { "url": url, "filename": None, "md5_hash": None, "size": None, }, } try: response = await session.get(url, timeout=45) except aiohttp.client_exceptions.ClientConnectorError as e: logger.warning( f"Failed to download image at {url}: {str(e)} " "(it is likely the image or server no longer exists)" ) else: ret["status"]["get"] = response.status if response.status == 200: img = await # The file extension doesn't necessarily match the filetype, so we # manually check the file header and set the correct extension. If # the file doesn't correspond to a supported image filetype, we # assume the downloaded file is invalid and skip it. ret["status"]["valid"] = False filetype = imghdr.what(None, h=img) if filetype == "jpeg": filetype = "jpg" if filetype is not None: ret["status"]["valid"] = True # Set the filestem to the md5 hash of the image. img_md5 = hashlib.md5(img).hexdigest() new_fname = f"{img_md5}.{filetype}" ret["image"]["filename"] = new_fname ret["image"]["size"] = len(img) ret["image"]["md5_hash"] = img_md5 img_fpath = dst_dir / new_fname if not img_fpath.exists(): ret["status"]["exists"] = False async with, "wb") as f: await f.write(img) else: ret["status"]["exists"] = True finally: return ret