-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession.py
More file actions
121 lines (107 loc) · 4.16 KB
/
session.py
File metadata and controls
121 lines (107 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# nomadForum - a forum on the NomadNetwork
# Copyright (C) 2023-2026 AutumnSpark1226
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import string
import uuid
import main
class Session:
session_uuid: str
link_id: str
remote_identity: str
def __init__(self, link_id: str, remote_identity: str) -> None:
if (
len(link_id) != 32
or not set(link_id).issubset(set(string.hexdigits))
or (
remote_identity
and (
len(remote_identity) != 32
or not set(remote_identity).issubset(set(string.hexdigits))
)
)
):
print("something went wrong...")
main.close_database(write_changes=False)
exit(0)
self.link_id = link_id
self.remote_identity = remote_identity
# load already existing session or create a new one
if self.is_logged_in():
self.session_uuid = main.query_database(
f"SELECT session_uuid FROM sessions WHERE link_id = '{self.link_id}'"
)[0][0]
else:
self.session_uuid = str(uuid.uuid4)
self.check_remote_identity(remote_identity)
def check_remote_identity(self, remote_identity: str) -> None:
if self.has_remote_identity():
query_result = main.query_database(
"SELECT username, remote_id FROM connections WHERE allow_login = 1 AND verified = 1"
)
for data in query_result:
if main.decrypt(data[1]) == remote_identity:
self.bind_user(main.user_id_from_username(data[0]))
break
def is_logged_in(self) -> bool:
return (
len(
main.query_database(
f"SELECT user_id FROM sessions WHERE link_id = '{self.link_id}'"
)
)
== 1
)
# bind a user to this session
def bind_user(self, user_id: int):
if not self.is_logged_in():
if (
len(
main.query_database(
f"SELECT enabled FROM users WHERE user_id = {user_id} AND enabled = 1"
)
)
!= 1
):
raise Exception("user disabled / does not exist, cannot proceed")
main.execute_sql(
f"INSERT INTO sessions (session_uuid, link_id, login_time, user_id) VALUES ('{self.session_uuid}', '{self.link_id}', unixepoch(), {user_id})"
)
else:
raise Exception("link_id already bound to a session")
def delete(self):
# delete session from db
main.execute_sql(
f"DELETE FROM sessions WHERE session_uuid = '{self.session_uuid}'"
)
# get a new id
self.session_uuid = str(uuid.uuid4)
def get_user_id(self) -> int:
query_result = main.query_database(
f"SELECT user_id FROM sessions WHERE session_uuid = '{self.session_uuid}'"
)
if len(query_result) == 1:
return query_result[0][0]
elif len(query_result) == 0:
raise Exception("not logged in")
else:
raise Exception("multiple sessions with the same id stored???")
def get_username(self) -> str:
return main.username_from_user_id(self.get_user_id())
def get_link_id(self) -> str:
return self.link_id
def get_remote_identity(self) -> str:
return self.remote_identity
def has_remote_identity(self) -> bool:
return self.get_remote_identity() != ""