From e485912b46d12de73754a8aec2b3b489cb5d8de6 Mon Sep 17 00:00:00 2001 From: Johannes Rothe Date: Sat, 15 Jan 2022 00:10:00 +0100 Subject: [PATCH] pydantic & csv writing --- scrape.py | 90 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/scrape.py b/scrape.py index 1eae8e5..0b8d996 100644 --- a/scrape.py +++ b/scrape.py @@ -1,31 +1,64 @@ import asyncio +import csv +import json import locale import pickle from datetime import datetime -from typing import Tuple +from typing import List, Optional, Tuple from bs4 import BeautifulSoup from httpx import AsyncClient +from pydantic import BaseModel MAIN_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalenderSite.htm?wohnids=" DATA_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalender.htm?wohnids=" -FROM = 100 +FROM = 110 TO = 120 STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 2, "DayBF": 2} -def convert_to_datestring(day: str, month: str, year: str) -> str: + +class Availability(BaseModel): + date: datetime + status: int + + +class Entry(BaseModel): + index: int + haus: str + wohneinheit: str + availabilities: List[Availability] + + +class Result(BaseModel): + entries: List[Entry] + + +def generate_csv(result: Result) -> None: + with open("result.csv", "w") as csvfile: + fieldnames = list(Entry.schema()["properties"].keys()) + fieldnames.remove("availabilities") + fieldnames.extend([a.date.strftime("%Y-%m-%d") for a in result.entries[0].availabilities]) + csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames) + csvwriter.writeheader() + for entry in result.entries: + row_content = {"index": entry.index, "haus": entry.haus, "wohneinheit": entry.wohneinheit} + for avail in entry.availabilities: + row_content[avail.date.strftime("%Y-%m-%d")] = avail.status + csvwriter.writerow(row_content) + + +def convert_to_datestring(day: str, month: str, year: str) -> datetime: locale.setlocale(locale.LC_TIME, "de_DE") date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y") - return date.strftime('%Y-%m-%d') + return date -async def request_data(index: int, client: AsyncClient) -> Tuple: +async def request_data(index: int, client: AsyncClient) -> Optional[Entry]: response_data = await client.get(DATA_URL + str(index), timeout=20.0) if "Fehler aufgetreten" not in response_data.text: response_title = await client.get(MAIN_URL + str(index), timeout=20.0) title_soup = BeautifulSoup(response_title.text, "lxml") - apartment = title_soup.body.header.h1.get_text() - unit = ( + apartment = ( title_soup.body.header.h2.get_text() .replace("\xa0", " ") .replace("Wohneinheit: ", "") @@ -33,31 +66,44 @@ async def request_data(index: int, client: AsyncClient) -> Tuple: data_soup = BeautifulSoup(response_data.text, "lxml") valid_element = data_soup.find_all("td", attrs={"data-daynum": True}) - days = [] + availabilities = [] for elm in valid_element: - day = elm.get_text(strip=True) date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True) - month = date_raw.split(" ")[0] - year = date_raw.split(" ")[1] status = STATUS_MAPPING[elm["class"][0]] - date = convert_to_datestring(day, month, year) - days.append({date: status}) - return index, apartment, unit, days + date = convert_to_datestring( + elm.get_text(strip=True), date_raw.split(" ")[0], date_raw.split(" ")[1] + ) + availabilities.append(Availability(date=date, status=status)) + return Entry( + index=index, + haus=title_soup.body.header.h1.get_text(), + wohneinheit=apartment, + availabilities=availabilities, + ) else: - return 0, "" + return None async def extract_results() -> None: client = AsyncClient() - results = await asyncio.gather(*[request_data(i, client) for i in range(FROM, TO)]) + result = Result( + entries=await asyncio.gather( + *[request_data(i, client) for i in range(FROM, TO)] + ) + ) # Filter the invalid units - valid = list(filter(lambda item: item[0] != 0, results)) - #print(f"results: {valid}") + # valid = list(filter(lambda item: item[0] != 0, results)) + # print(f"results: {valid}") + await client.aclose() - with open("valid_ids", "wb") as file: - pickle.dump(valid, file) + print(result.json()) + with open("results.json", "w") as file: + file.write(result.json()) if __name__ == "__main__": - # print(pickle.load(open("valid_ids", "rb"))) - asyncio.run(extract_results()) + with open("results.json", "r") as file: + result = Result(**json.load(file)) + print(result.json()) + generate_csv(result) + # asyncio.run(extract_results())