137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
import asyncio
|
|
import csv
|
|
import json
|
|
import locale
|
|
locale.setlocale(locale.LC_TIME, "German") # dates on that page are German
|
|
import pickle
|
|
import platform
|
|
from configparser import ConfigParser
|
|
from datetime import datetime
|
|
from typing import List, Optional, Tuple
|
|
|
|
if platform.system() == "Windows":
|
|
import encodings.idna
|
|
import tqdm
|
|
from bs4 import BeautifulSoup
|
|
from httpx import AsyncClient
|
|
from pydantic import BaseModel
|
|
|
|
config = ConfigParser()
|
|
config.read("config.ini")
|
|
|
|
MAIN_URL = config['Allgemein'].get('haupt_url')
|
|
DATA_URL = config['Allgemein'].get('data_url')
|
|
FROM = config['Allgemein'].getint('von_id')
|
|
TO = config['Allgemein'].getint('bis_id')
|
|
DATEFORMAT = "%Y-%m-%d"
|
|
STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 0.5, "DayBF": 0.5}
|
|
|
|
|
|
class Availability(BaseModel):
|
|
date: datetime
|
|
status: float
|
|
|
|
|
|
class Entry(BaseModel):
|
|
index: int
|
|
haus: Optional[str]
|
|
wohneinheit: Optional[str]
|
|
availabilities: Optional[List[Availability]]
|
|
def __lt__(self, other):
|
|
return self.index < other.index
|
|
|
|
def __le__(self, other):
|
|
return self.index <= other.index
|
|
|
|
def __gt__(self, other):
|
|
return self.index > other.index
|
|
|
|
def __ge__(self, other):
|
|
return self.index >= other.index
|
|
|
|
|
|
class Result(BaseModel):
|
|
entries: List[Entry]
|
|
|
|
|
|
def generate_csv(result: Result) -> None:
|
|
with open("result.csv", "w", newline='') as csvfile:
|
|
fieldnames = list(Entry.schema()["properties"].keys())
|
|
fieldnames.remove("availabilities")
|
|
fieldnames.extend(
|
|
[a.date.strftime(DATEFORMAT) for a in result.entries[0].availabilities]
|
|
)
|
|
csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
csvwriter.writeheader()
|
|
for entry in result.entries:
|
|
row_content = {
|
|
"index": entry.index,
|
|
"haus": entry.haus,
|
|
"wohneinheit": entry.wohneinheit,
|
|
}
|
|
for avail in entry.availabilities:
|
|
row_content[avail.date.strftime(DATEFORMAT)] = avail.status
|
|
csvwriter.writerow(row_content)
|
|
|
|
|
|
def convert_to_datestring(day: str, month: str, year: str) -> datetime:
|
|
date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y")
|
|
return date
|
|
|
|
|
|
async def request_data(index: int, client: AsyncClient) -> Optional[Entry]:
|
|
response_data = await client.get(DATA_URL + str(index), timeout=20.0)
|
|
if "Fehler aufgetreten" not in response_data.text:
|
|
response_title = await client.get(MAIN_URL + str(index), timeout=20.0)
|
|
title_soup = BeautifulSoup(response_title.text, "lxml")
|
|
apartment = (
|
|
title_soup.body.header.h2.get_text()
|
|
.replace("\xa0", " ")
|
|
.replace("Wohneinheit: ", "")
|
|
)
|
|
|
|
data_soup = BeautifulSoup(response_data.text, "lxml")
|
|
valid_element = data_soup.find_all("td", attrs={"data-daynum": True})
|
|
availabilities = []
|
|
for elm in valid_element:
|
|
date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True)
|
|
status = STATUS_MAPPING[elm["class"][0]]
|
|
date = convert_to_datestring(
|
|
elm.get_text(strip=True),
|
|
date_raw.split(" ")[0],
|
|
date_raw.split(" ")[1],
|
|
)
|
|
availabilities.append(Availability(date=date, status=status))
|
|
return Entry(
|
|
index=index,
|
|
haus=title_soup.body.header.h1.get_text().encode("utf-8"),
|
|
wohneinheit=apartment.encode("utf-8"),
|
|
availabilities=availabilities,
|
|
)
|
|
else:
|
|
return Entry(index=0)
|
|
|
|
|
|
async def extract_results() -> None:
|
|
client = AsyncClient()
|
|
tasks = [request_data(i, client) for i in range(FROM, TO)]
|
|
entries = [
|
|
await f for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))
|
|
]
|
|
filtered_entries = list(filter(lambda entry: entry.index != 0, entries))
|
|
sorted_entries = list(sorted(filtered_entries))
|
|
result = Result(entries=sorted_entries)
|
|
|
|
await client.aclose()
|
|
with open("results.json", "w") as file:
|
|
file.write(result.json())
|
|
generate_csv(result)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# with open("results.json", "r") as file:
|
|
# result = Result(**json.load(file))
|
|
if platform.system() == "Windows":
|
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
|
asyncio.run(extract_results())
|