scraperoog/scrape.py

137 lines
4.3 KiB
Python

import asyncio
import csv
import json
import locale
import pickle
import platform
from configparser import ConfigParser
from datetime import datetime
from typing import List, Optional, Tuple
if platform.system() == "Windows":
import encodings.idna
import tqdm
from bs4 import BeautifulSoup
from httpx import AsyncClient
from pydantic import BaseModel
config = ConfigParser()
config.read("config.ini")
MAIN_URL = config['Allgemein'].get('haupt_url')
DATA_URL = config['Allgemein'].get('data_url')
FROM = config['Allgemein'].getint('von_id')
TO = config['Allgemein'].getint('bis_id')
DATEFORMAT = "%Y-%m-%d"
STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 0.5, "DayBF": 0.5}
class Availability(BaseModel):
date: datetime
status: float
class Entry(BaseModel):
index: int
haus: Optional[str]
wohneinheit: Optional[str]
availabilities: Optional[List[Availability]]
def __lt__(self, other):
return self.index < other.index
def __le__(self, other):
return self.index <= other.index
def __gt__(self, other):
return self.index > other.index
def __ge__(self, other):
return self.index >= other.index
class Result(BaseModel):
entries: List[Entry]
def generate_csv(result: Result) -> None:
with open("result.csv", "w", newline='') as csvfile:
fieldnames = list(Entry.schema()["properties"].keys())
fieldnames.remove("availabilities")
fieldnames.extend(
[a.date.strftime(DATEFORMAT) for a in result.entries[0].availabilities]
)
csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames)
csvwriter.writeheader()
for entry in result.entries:
row_content = {
"index": entry.index,
"haus": entry.haus,
"wohneinheit": entry.wohneinheit,
}
for avail in entry.availabilities:
row_content[avail.date.strftime(DATEFORMAT)] = avail.status
csvwriter.writerow(row_content)
def convert_to_datestring(day: str, month: str, year: str) -> datetime:
locale.setlocale(locale.LC_TIME, "")
date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y")
return date
async def request_data(index: int, client: AsyncClient) -> Optional[Entry]:
response_data = await client.get(DATA_URL + str(index), timeout=20.0)
if "Fehler aufgetreten" not in response_data.text:
response_title = await client.get(MAIN_URL + str(index), timeout=20.0)
title_soup = BeautifulSoup(response_title.text, "lxml")
apartment = (
title_soup.body.header.h2.get_text()
.replace("\xa0", " ")
.replace("Wohneinheit: ", "")
)
data_soup = BeautifulSoup(response_data.text, "lxml")
valid_element = data_soup.find_all("td", attrs={"data-daynum": True})
availabilities = []
for elm in valid_element:
date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True)
status = STATUS_MAPPING[elm["class"][0]]
date = convert_to_datestring(
elm.get_text(strip=True),
date_raw.split(" ")[0],
date_raw.split(" ")[1],
)
availabilities.append(Availability(date=date, status=status))
return Entry(
index=index,
haus=title_soup.body.header.h1.get_text().encode("utf-8"),
wohneinheit=apartment.encode("utf-8"),
availabilities=availabilities,
)
else:
return Entry(index=0)
async def extract_results() -> None:
client = AsyncClient()
tasks = [request_data(i, client) for i in range(FROM, TO)]
entries = [
await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))
]
filtered_entries = list(filter(lambda entry: entry.index != 0, entries))
sorted_entries = list(sorted(filtered_entries))
result = Result(entries=sorted_entries)
await client.aclose()
with open("results.json", "w") as file:
file.write(result.json())
generate_csv(result)
if __name__ == "__main__":
# with open("results.json", "r") as file:
# result = Result(**json.load(file))
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(extract_results())