diff --git a/checkin_bot.py b/checkin_bot.py new file mode 100644 index 0000000..2da6d2c --- /dev/null +++ b/checkin_bot.py @@ -0,0 +1,87 @@ +import sqlite3 +from datetime import datetime, timedelta +from typing import List, Tuple + +AUTO_CHECKOUT_HOURS = 6 + + +class CheckinDB: + def __init__(self, db_name: str) -> None: + self.con = sqlite3.connect(db_name) + self.con.row_factory = sqlite3.Row + self.try_create_tables() + + def try_create_tables(self) -> bool: + try: + with self.con: + self.con.execute( + "CREATE TABLE checkin (ID INTEGER PRIMARY KEY AUTOINCREMENT, NAME VARCHAR UNIQUE, IS_PRESENT BOOL, UPDATED INT)" + ) + print("Creating new table") + except sqlite3.OperationalError: + print("Table already exists in database, nothing to create.") + return True + + def update(self, name: str, is_present: bool) -> bool: + present_int = 1 if is_present else 0 + with self.con: + self.con.execute( + f"INSERT INTO checkin (NAME, IS_PRESENT, UPDATED) VALUES ('{name}', '{present_int}', strftime('%s','now')) ON CONFLICT(name) DO UPDATE SET is_present=excluded.is_present" + ) + return True + return False + + def get_full_entries(self) -> List[Tuple]: + with self.con: + return [ + (x["name"], x["is_present"], x["updated"]) + for x in self.con.execute( + "SELECT name, is_present, updated FROM checkin ORDER BY name" + ) + ] + + def get_checkin_names(self) -> List[str]: + with self.con: + return [ + x["name"] + for x in self.con.execute( + "SELECT name FROM checkin WHERE is_present ORDER BY name" + ) + ] + + def get_checkin_count(self) -> int: + with self.con: + return self.con.execute( + "SELECT COUNT(name) FROM checkin WHERE is_present" + ).fetchone()[0] + + def clear_rows(self) -> None: + with self.con: + self.con.execute("DELETE FROM checkin") + + def get_updated_for_name(self, name: str) -> datetime: + with self.con: + return datetime.fromtimestamp( + float( + self.con.execute( + f"SELECT updated FROM checkin WHERE name = '{name}'" + ).fetchone()[0] + ) + ) + + def auto_checkout(self) -> None: + filter_time = ( + datetime.now() - timedelta(hours=AUTO_CHECKOUT_HOURS) + ).timestamp() + with self.con: + self.con.execute( + f"UPDATE checkin SET IS_PRESENT = 0 WHERE updated <= {filter_time}" + ) + + +def main() -> None: + counter_bot = CheckinDB("checkin.db") + + +if __name__ == "__main__": + main() diff --git a/counter_bot.py b/counter_bot.py deleted file mode 100644 index fd90f65..0000000 --- a/counter_bot.py +++ /dev/null @@ -1,65 +0,0 @@ -import sqlite3 -from typing import List, Tuple - - -class CounterBot: - def __init__(self, db_name: str) -> None: - self.con = sqlite3.connect(db_name) - self.con.row_factory = sqlite3.Row - self.try_create_tables() - - def try_create_tables(self) -> bool: - try: - with self.con: - self.con.execute( - "CREATE TABLE presence (ID INTEGER PRIMARY KEY AUTOINCREMENT, NAME VARCHAR UNIQUE, IS_PRESENT BOOL)" - ) - print("Creating new table") - except sqlite3.OperationalError: - print("Table already exists in database, nothing to create.") - return True - - def update_presence(self, name: str, is_present: bool) -> bool: - present_int = 1 if is_present else 0 - with self.con: - self.con.execute( - f"INSERT INTO presence (NAME, IS_PRESENT) VALUES ('{name}', '{present_int}') ON CONFLICT(name) DO UPDATE SET is_present=excluded.is_present" - ) - return True - return False - - def get_full_entries(self) -> List[Tuple]: - with self.con: - return [ - (x["name"], x["is_present"]) - for x in self.con.execute( - "SELECT name, is_present FROM presence ORDER BY name" - ) - ] - - def get_present_names(self) -> List[str]: - with self.con: - return [ - x["name"] - for x in self.con.execute( - "SELECT name FROM presence WHERE is_present ORDER BY name" - ) - ] - - def get_present_count(self) -> int: - with self.con: - return self.con.execute( - "SELECT COUNT(name) FROM presence WHERE is_present" - ).fetchone()[0] - - def clear_rows(self) -> None: - with self.con: - self.con.execute("DELETE FROM presence") - - -def main() -> None: - counter_bot = CounterBot("counter.db") - - -if __name__ == "__main__": - main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9b28806 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pytest +discord diff --git a/test_checkin_bot.py b/test_checkin_bot.py new file mode 100644 index 0000000..be09207 --- /dev/null +++ b/test_checkin_bot.py @@ -0,0 +1,36 @@ +from datetime import timedelta, datetime +from .checkin_bot import CheckinDB + +TEST_DB_NAME = "test.db" + +checkin_db = CheckinDB(TEST_DB_NAME) +checkin_db.clear_rows() + + +def test_get_present_count() -> None: + checkin_db.update("Alice", True) + checkin_db.update("Bob", True) + checkin_db.update("Bob", False) + checkin_db.update("Bob", True) + assert checkin_db.get_checkin_count() == 2 + + +def test_present_names() -> None: + assert checkin_db.get_checkin_names() == ["Alice", "Bob"] + + +def test_updated_timestamp() -> None: + assert checkin_db.get_updated_for_name( + "Alice" + ) >= datetime.now() - timedelta(minutes=1) + + +def test_auto_checkout() -> None: + assert "Alice" in checkin_db.get_checkin_names() + old_timestamp = (datetime.now() - timedelta(hours=6)).timestamp() + with checkin_db.con: + checkin_db.con.execute( + f"UPDATE checkin SET updated = {old_timestamp} WHERE name = 'Alice'" + ) + checkin_db.auto_checkout() + assert "Alice" not in checkin_db.get_checkin_names() diff --git a/test_counter_bot.py b/test_counter_bot.py deleted file mode 100644 index 8fd73bd..0000000 --- a/test_counter_bot.py +++ /dev/null @@ -1,17 +0,0 @@ -import sqlite3 -from .counter_bot import CounterBot - -TEST_DB_NAME = "test.db" - -counter_bot = CounterBot(TEST_DB_NAME) -counter_bot.clear_rows() - -def test_get_present_count() -> None: - counter_bot.update_presence("Alice", True) - counter_bot.update_presence("Bob", True) - counter_bot.update_presence("Bob", False) - counter_bot.update_presence("Bob", True) - assert counter_bot.get_present_count() == 2 - -def test_present_names() -> None: - assert counter_bot.get_present_names() == ["Alice", "Bob"]