pydantic & csv writing

This commit is contained in:
Johannes Rothe 2022-01-15 00:10:00 +01:00
parent 31403c2e5d
commit e485912b46

View File

@ -1,31 +1,64 @@
import asyncio import asyncio
import csv
import json
import locale import locale
import pickle import pickle
from datetime import datetime from datetime import datetime
from typing import Tuple from typing import List, Optional, Tuple
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from httpx import AsyncClient from httpx import AsyncClient
from pydantic import BaseModel
MAIN_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalenderSite.htm?wohnids=" MAIN_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalenderSite.htm?wohnids="
DATA_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalender.htm?wohnids=" DATA_URL = "https://www.spiekeroog-vermieter.de/suche/monatskalender.htm?wohnids="
FROM = 100 FROM = 110
TO = 120 TO = 120
STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 2, "DayBF": 2} STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 2, "DayBF": 2}
def convert_to_datestring(day: str, month: str, year: str) -> str:
class Availability(BaseModel):
date: datetime
status: int
class Entry(BaseModel):
index: int
haus: str
wohneinheit: str
availabilities: List[Availability]
class Result(BaseModel):
entries: List[Entry]
def generate_csv(result: Result) -> None:
with open("result.csv", "w") as csvfile:
fieldnames = list(Entry.schema()["properties"].keys())
fieldnames.remove("availabilities")
fieldnames.extend([a.date.strftime("%Y-%m-%d") for a in result.entries[0].availabilities])
csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames)
csvwriter.writeheader()
for entry in result.entries:
row_content = {"index": entry.index, "haus": entry.haus, "wohneinheit": entry.wohneinheit}
for avail in entry.availabilities:
row_content[avail.date.strftime("%Y-%m-%d")] = avail.status
csvwriter.writerow(row_content)
def convert_to_datestring(day: str, month: str, year: str) -> datetime:
locale.setlocale(locale.LC_TIME, "de_DE") locale.setlocale(locale.LC_TIME, "de_DE")
date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y") date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y")
return date.strftime('%Y-%m-%d') return date
async def request_data(index: int, client: AsyncClient) -> Tuple: async def request_data(index: int, client: AsyncClient) -> Optional[Entry]:
response_data = await client.get(DATA_URL + str(index), timeout=20.0) response_data = await client.get(DATA_URL + str(index), timeout=20.0)
if "Fehler aufgetreten" not in response_data.text: if "Fehler aufgetreten" not in response_data.text:
response_title = await client.get(MAIN_URL + str(index), timeout=20.0) response_title = await client.get(MAIN_URL + str(index), timeout=20.0)
title_soup = BeautifulSoup(response_title.text, "lxml") title_soup = BeautifulSoup(response_title.text, "lxml")
apartment = title_soup.body.header.h1.get_text() apartment = (
unit = (
title_soup.body.header.h2.get_text() title_soup.body.header.h2.get_text()
.replace("\xa0", " ") .replace("\xa0", " ")
.replace("Wohneinheit: ", "") .replace("Wohneinheit: ", "")
@ -33,31 +66,44 @@ async def request_data(index: int, client: AsyncClient) -> Tuple:
data_soup = BeautifulSoup(response_data.text, "lxml") data_soup = BeautifulSoup(response_data.text, "lxml")
valid_element = data_soup.find_all("td", attrs={"data-daynum": True}) valid_element = data_soup.find_all("td", attrs={"data-daynum": True})
days = [] availabilities = []
for elm in valid_element: for elm in valid_element:
day = elm.get_text(strip=True)
date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True) date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True)
month = date_raw.split(" ")[0]
year = date_raw.split(" ")[1]
status = STATUS_MAPPING[elm["class"][0]] status = STATUS_MAPPING[elm["class"][0]]
date = convert_to_datestring(day, month, year) date = convert_to_datestring(
days.append({date: status}) elm.get_text(strip=True), date_raw.split(" ")[0], date_raw.split(" ")[1]
return index, apartment, unit, days )
availabilities.append(Availability(date=date, status=status))
return Entry(
index=index,
haus=title_soup.body.header.h1.get_text(),
wohneinheit=apartment,
availabilities=availabilities,
)
else: else:
return 0, "" return None
async def extract_results() -> None: async def extract_results() -> None:
client = AsyncClient() client = AsyncClient()
results = await asyncio.gather(*[request_data(i, client) for i in range(FROM, TO)]) result = Result(
entries=await asyncio.gather(
*[request_data(i, client) for i in range(FROM, TO)]
)
)
# Filter the invalid units # Filter the invalid units
valid = list(filter(lambda item: item[0] != 0, results)) # valid = list(filter(lambda item: item[0] != 0, results))
#print(f"results: {valid}") # print(f"results: {valid}")
await client.aclose() await client.aclose()
with open("valid_ids", "wb") as file: print(result.json())
pickle.dump(valid, file) with open("results.json", "w") as file:
file.write(result.json())
if __name__ == "__main__": if __name__ == "__main__":
# print(pickle.load(open("valid_ids", "rb"))) with open("results.json", "r") as file:
asyncio.run(extract_results()) result = Result(**json.load(file))
print(result.json())
generate_csv(result)
# asyncio.run(extract_results())