From 6244120bb1daeb11bdb4166df3535c107a3e382e Mon Sep 17 00:00:00 2001 From: Johannes Rothe Date: Wed, 29 Dec 2021 02:04:44 +0100 Subject: [PATCH] Add discord connection --- checkin_bot.py | 119 +++++++--------------- checkin_db.py | 79 ++++++++++++++ test_checkin_bot.py => test_checkin_db.py | 2 +- 3 files changed, 118 insertions(+), 82 deletions(-) create mode 100644 checkin_db.py rename test_checkin_bot.py => test_checkin_db.py (96%) diff --git a/checkin_bot.py b/checkin_bot.py index 2da6d2c..bcaf971 100644 --- a/checkin_bot.py +++ b/checkin_bot.py @@ -1,87 +1,44 @@ -import sqlite3 -from datetime import datetime, timedelta -from typing import List, Tuple - -AUTO_CHECKOUT_HOURS = 6 +import os +from discord.ext import commands +from checkin_db import CheckinDB -class CheckinDB: - def __init__(self, db_name: str) -> None: - self.con = sqlite3.connect(db_name) - self.con.row_factory = sqlite3.Row - self.try_create_tables() - - def try_create_tables(self) -> bool: - try: - with self.con: - self.con.execute( - "CREATE TABLE checkin (ID INTEGER PRIMARY KEY AUTOINCREMENT, NAME VARCHAR UNIQUE, IS_PRESENT BOOL, UPDATED INT)" - ) - print("Creating new table") - except sqlite3.OperationalError: - print("Table already exists in database, nothing to create.") - return True - - def update(self, name: str, is_present: bool) -> bool: - present_int = 1 if is_present else 0 - with self.con: - self.con.execute( - f"INSERT INTO checkin (NAME, IS_PRESENT, UPDATED) VALUES ('{name}', '{present_int}', strftime('%s','now')) ON CONFLICT(name) DO UPDATE SET is_present=excluded.is_present" - ) - return True - return False - - def get_full_entries(self) -> List[Tuple]: - with self.con: - return [ - (x["name"], x["is_present"], x["updated"]) - for x in self.con.execute( - "SELECT name, is_present, updated FROM checkin ORDER BY name" - ) - ] - - def get_checkin_names(self) -> List[str]: - with self.con: - return [ - x["name"] - for x in self.con.execute( - "SELECT name FROM checkin WHERE is_present ORDER BY name" - ) - ] - - def get_checkin_count(self) -> int: - with self.con: - return self.con.execute( - "SELECT COUNT(name) FROM checkin WHERE is_present" - ).fetchone()[0] - - def clear_rows(self) -> None: - with self.con: - self.con.execute("DELETE FROM checkin") - - def get_updated_for_name(self, name: str) -> datetime: - with self.con: - return datetime.fromtimestamp( - float( - self.con.execute( - f"SELECT updated FROM checkin WHERE name = '{name}'" - ).fetchone()[0] - ) - ) - - def auto_checkout(self) -> None: - filter_time = ( - datetime.now() - timedelta(hours=AUTO_CHECKOUT_HOURS) - ).timestamp() - with self.con: - self.con.execute( - f"UPDATE checkin SET IS_PRESENT = 0 WHERE updated <= {filter_time}" - ) +checkin_db = CheckinDB("checkin.db") +bot = commands.Bot(command_prefix="!") -def main() -> None: - counter_bot = CheckinDB("checkin.db") +@bot.command() +async def checkin(context: commands.Context) -> None: + checkin_db.auto_checkout() + checkin_db.update(context.message.author.display_name, True) + count = checkin_db.get_checkin_count() + if count == 1: + await context.send(f"Aktuell {count} Mensch im Erfindergeist") + else: + await context.send(f"Aktuell {count} Menschen im Erfindergeist") -if __name__ == "__main__": - main() +@bot.command() +async def checkout(context: commands.Context) -> None: + checkin_db.update(context.message.author.display_name, False) + count = checkin_db.get_checkin_count() + if count == 1: + await context.send(f"Aktuell {count} Mensch im Erfindergeist") + else: + await context.send(f"Aktuell {count} Menschen im Erfindergeist") + + +@bot.command() +async def checkin_list(context: commands.Context) -> None: + count = checkin_db.get_checkin_count() + if count == 1: + await context.send( + f"Aktuell {checkin_db.get_checkin_count()} Mensch im Erfindergeist: {', '.join(checkin_db.get_checkin_names())}" + ) + else: + await context.send( + f"Aktuell {checkin_db.get_checkin_count()} Menschen im Erfindergeist: {', '.join(checkin_db.get_checkin_names())}" + ) + + +bot.run(os.getenv("DISCORD_KEY")) diff --git a/checkin_db.py b/checkin_db.py new file mode 100644 index 0000000..667803e --- /dev/null +++ b/checkin_db.py @@ -0,0 +1,79 @@ +from datetime import datetime, timedelta +from typing import List, Tuple +import sqlite3 + +AUTO_CHECKOUT_HOURS = 6 + + +class CheckinDB: + def __init__(self, db_name: str) -> None: + self.con = sqlite3.connect(db_name) + self.con.row_factory = sqlite3.Row + self.try_create_tables() + + def try_create_tables(self) -> bool: + try: + with self.con: + self.con.execute( + "CREATE TABLE checkin (ID INTEGER PRIMARY KEY AUTOINCREMENT, NAME VARCHAR UNIQUE, IS_PRESENT BOOL, UPDATED INT)" + ) + print("Creating new table") + except sqlite3.OperationalError: + print("Table already exists in database, nothing to create.") + return True + + def update(self, name: str, is_present: bool) -> bool: + present_int = 1 if is_present else 0 + with self.con: + self.con.execute( + f"INSERT INTO checkin (NAME, IS_PRESENT, UPDATED) VALUES ('{name}', '{present_int}', strftime('%s','now')) ON CONFLICT(name) DO UPDATE SET is_present=excluded.is_present" + ) + return True + return False + + def get_full_entries(self) -> List[Tuple]: + with self.con: + return [ + (x["name"], x["is_present"], x["updated"]) + for x in self.con.execute( + "SELECT name, is_present, updated FROM checkin ORDER BY name" + ) + ] + + def get_checkin_names(self) -> List[str]: + with self.con: + return [ + x["name"] + for x in self.con.execute( + "SELECT name FROM checkin WHERE is_present ORDER BY name" + ) + ] + + def get_checkin_count(self) -> int: + with self.con: + return self.con.execute( + "SELECT COUNT(name) FROM checkin WHERE is_present" + ).fetchone()[0] + + def clear_rows(self) -> None: + with self.con: + self.con.execute("DELETE FROM checkin") + + def get_updated_for_name(self, name: str) -> datetime: + with self.con: + return datetime.fromtimestamp( + float( + self.con.execute( + f"SELECT updated FROM checkin WHERE name = '{name}'" + ).fetchone()[0] + ) + ) + + def auto_checkout(self) -> None: + filter_time = ( + datetime.now() - timedelta(hours=AUTO_CHECKOUT_HOURS) + ).timestamp() + with self.con: + self.con.execute( + f"UPDATE checkin SET IS_PRESENT = 0 WHERE updated <= {filter_time}" + ) diff --git a/test_checkin_bot.py b/test_checkin_db.py similarity index 96% rename from test_checkin_bot.py rename to test_checkin_db.py index be09207..7b03ab6 100644 --- a/test_checkin_bot.py +++ b/test_checkin_db.py @@ -1,5 +1,5 @@ from datetime import timedelta, datetime -from .checkin_bot import CheckinDB +from .checkin_db import CheckinDB TEST_DB_NAME = "test.db"