scraperoog/scrape.py

120 lines
3.6 KiB
Python
Raw Normal View History

2022-01-14 22:43:23 +01:00
import asyncio
2022-01-15 00:10:00 +01:00
import csv
import json
2022-01-14 22:43:23 +01:00
import locale
import pickle
from datetime import datetime
2022-01-15 00:10:00 +01:00
from typing import List, Optional, Tuple
2022-01-14 22:43:23 +01:00
from bs4 import BeautifulSoup
from httpx import AsyncClient
2022-01-15 00:10:00 +01:00
from pydantic import BaseModel
2022-01-14 22:43:23 +01:00
MAIN_URL = (
"https://www.spiekeroog-vermieter.de/suche/monatskalenderSite.htm?wohnids="
)
DATA_URL = (
"https://www.spiekeroog-vermieter.de/suche/monatskalender.htm?wohnids="
)
FROM = 0
TO = 2000
2022-01-14 22:43:23 +01:00
STATUS_MAPPING = {"DayF": 0, "DayB": 1, "DayFB": 2, "DayBF": 2}
2022-01-15 00:10:00 +01:00
class Availability(BaseModel):
date: datetime
status: int
class Entry(BaseModel):
index: int
haus: Optional[str]
wohneinheit: Optional[str]
availabilities: Optional[List[Availability]]
2022-01-15 00:10:00 +01:00
class Result(BaseModel):
entries: List[Entry]
def generate_csv(result: Result) -> None:
with open("result.csv", "w") as csvfile:
fieldnames = list(Entry.schema()["properties"].keys())
fieldnames.remove("availabilities")
fieldnames.extend(
[
a.date.strftime("%Y-%m-%d")
for a in result.entries[0].availabilities
]
)
2022-01-15 00:10:00 +01:00
csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames)
csvwriter.writeheader()
for entry in result.entries:
row_content = {
"index": entry.index,
"haus": entry.haus,
"wohneinheit": entry.wohneinheit,
}
2022-01-15 00:10:00 +01:00
for avail in entry.availabilities:
row_content[avail.date.strftime("%Y-%m-%d")] = avail.status
csvwriter.writerow(row_content)
def convert_to_datestring(day: str, month: str, year: str) -> datetime:
locale.setlocale(locale.LC_TIME, "de_DE.utf8")
2022-01-14 22:43:23 +01:00
date = datetime.strptime(f"{day.zfill(2)} {month} {year}", "%d %B %Y")
2022-01-15 00:10:00 +01:00
return date
2022-01-14 22:43:23 +01:00
2022-01-15 00:10:00 +01:00
async def request_data(index: int, client: AsyncClient) -> Optional[Entry]:
2022-01-14 22:43:23 +01:00
response_data = await client.get(DATA_URL + str(index), timeout=20.0)
if "Fehler aufgetreten" not in response_data.text:
response_title = await client.get(MAIN_URL + str(index), timeout=20.0)
title_soup = BeautifulSoup(response_title.text, "lxml")
2022-01-15 00:10:00 +01:00
apartment = (
2022-01-14 22:43:23 +01:00
title_soup.body.header.h2.get_text()
.replace("\xa0", " ")
.replace("Wohneinheit: ", "")
)
data_soup = BeautifulSoup(response_data.text, "lxml")
valid_element = data_soup.find_all("td", attrs={"data-daynum": True})
2022-01-15 00:10:00 +01:00
availabilities = []
2022-01-14 22:43:23 +01:00
for elm in valid_element:
date_raw = elm.parent.parent.parent.thead.tr.td.get_text(strip=True)
status = STATUS_MAPPING[elm["class"][0]]
2022-01-15 00:10:00 +01:00
date = convert_to_datestring(
elm.get_text(strip=True),
date_raw.split(" ")[0],
date_raw.split(" ")[1],
2022-01-15 00:10:00 +01:00
)
availabilities.append(Availability(date=date, status=status))
return Entry(
index=index,
haus=title_soup.body.header.h1.get_text(),
wohneinheit=apartment,
availabilities=availabilities,
)
2022-01-14 22:43:23 +01:00
else:
return Entry(index=0)
2022-01-14 22:43:23 +01:00
async def extract_results() -> None:
client = AsyncClient()
entries = await asyncio.gather(
*[request_data(i, client) for i in range(FROM, TO)]
2022-01-15 00:10:00 +01:00
)
entries = list(filter(lambda entry: entry.index != 0, entries))
result = Result(entries=entries)
2022-01-15 00:10:00 +01:00
2022-01-14 22:43:23 +01:00
await client.aclose()
2022-01-15 00:10:00 +01:00
with open("results.json", "w") as file:
file.write(result.json())
2022-01-14 22:43:23 +01:00
if __name__ == "__main__":
2022-01-15 00:10:00 +01:00
with open("results.json", "r") as file:
result = Result(**json.load(file))
2022-01-15 00:10:00 +01:00
generate_csv(result)
#asyncio.run(extract_results())