Implemented JWT authentication and enhanced database security.

This commit is contained in:
MrEisbear 2025-06-14 13:17:41 -05:00
parent e24dbed2e6
commit 5cd28c5060
2 changed files with 76 additions and 18 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
.venv
.env
.idea

89
app.py
View file

@ -1,14 +1,21 @@
from flask import Flask, request, jsonify from flask import Flask, request, jsonify, make_response
from functools import wraps
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime from datetime import datetime, timedelta
from datetime import timezone
import mysql.connector import mysql.connector
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
import jwt
load_dotenv() load_dotenv()
app = Flask(__name__) app = Flask(__name__)
# JWT Init
jwt_key=os.getenv('JWT_KEY')
jwt_expire=int(os.getenv('JWT_EXPIRATION')) #In days
# Implemented authentication and data security practices aligned with ISO/IEC 27001 standards
# /\ AI generated for resume. Lol
# DATABASE CONNECTION # DATABASE CONNECTION
db = mysql.connector.connect( db = mysql.connector.connect(
@ -17,26 +24,39 @@ db = mysql.connector.connect(
password=os.getenv('DB_PASSWORD'), password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME') database=os.getenv('DB_NAME')
) )
cursor = db.cursor(dictionary=True) with db.cursor(dictionary=True) as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS users (" cursor.execute("CREATE TABLE IF NOT EXISTS users ("
"bid VARCHAR(32) PRIMARY KEY," "bid VARCHAR(32) PRIMARY KEY,"
"username VARCHAR(64) NOT NULL," "username VARCHAR(64) NOT NULL,"
"email VARCHAR(128) NOT NULL," "email VARCHAR(128) NOT NULL,"
"password_hash TEXT," "password_hash TEXT,"
"balance INT DEFAULT 7500," "balance DECIMAL(18, 2) DEFAULT 7500,"
"job INT," "job INT,"
"salary_class INT," "salary_class INT,"
"collected DATETIME" "collected DATETIME"
");") ");")
cursor.execute("CREATE TABLE IF NOT EXISTS salary ("
"class INT PRIMARY KEY,"
"money DECIMAL(18, 2));")
db.commit()
def get_user(uid): def token_gen(bid):
cursor.execute("SELECT * FROM users WHERE bid = %s", (uid,)) exptime = datetime.now(timezone.utc) + timedelta(days=jwt_expire)
return cursor.fetchone() token = jwt.encode(
{"bid": bid, "exp": exptime},
jwt_key,
algorithm="HS256")
return token
def get_user(bid):
with db.cursor(dictionary=True) as cur:
cur.execute("SELECT * FROM users WHERE bid = %s", (bid,))
return cur.fetchone()
@app.route('/balance', methods=['GET']) @app.route('/balance', methods=['GET'])
def get_balance(): def get_balance():
uid = request.args.get('uid') bid = request.args.get('bid')
user = get_user(uid) user = get_user(bid)
if not user: if not user:
return jsonify({"error": "User not found."}), 404 return jsonify({"error": "User not found."}), 404
return jsonify({"balance": user["balance"]}) return jsonify({"balance": user["balance"]})
@ -51,29 +71,64 @@ def register():
if not username or not email or not password or not bid: if not username or not email or not password or not bid:
return jsonify({"error": "Username, email, and password are required."}), 400 return jsonify({"error": "Username, email, and password are required."}), 400
password_hash = generate_password_hash(password, method='sha256') password_hash = generate_password_hash(password)
try: try:
cursor.execute("INSERT INTO users (bid, username, email, password_hash) VALUES (%s, %s, %s, %s)", with db.cursor(dictionary=True) as cur:
cur.execute("INSERT INTO users (bid, username, email, password_hash) VALUES (%s, %s, %s, %s)",
(bid, username, email, password_hash)) (bid, username, email, password_hash))
db.commit() db.commit()
token = None token = token_gen(bid)
return jsonify({"message": "User registered successfully.","token": token}), 201 return jsonify({"message": "User registered successfully.","token": token}), 201
except mysql.connector.IntegrityError: except mysql.connector.IntegrityError:
return jsonify({"error": "Username or email already exists."}), 409 return jsonify({"error": "Username or email already exists."}), 409
@app.route('/login', methods=['POST']) @app.route('/login', methods=['POST'])
def login(): def login():
return 501 data = request.get_json()
bid = data.get('bid')
password = data.get('password')
if not bid or not password:
return jsonify({"error": "BID and password are required"}), 400
user = get_user(bid)
if not user:
return jsonify({"error": "User not found"}), 404
if not check_password_hash(user['password_hash'], password):
return jsonify({"error": "Invalid password"}), 401
token = token_gen(bid)
response = make_response(jsonify({"message": "Login successful."}), 200)
response.set_cookie('token', token, httponly=True, samesite='Strict', max_age=30 * 24 * 60 * 60)
# secure=True - Implement once HTTPS
return response
@app.route('/admin/change-password', methods=['POST'])
def change_password():
data = request.get_json()
bid = data.get('bid')
new_password = data.get('password')
key = data.get('key')
if not bid or not new_password or not key:
return jsonify({"error": "BID, new password, and key are required"}), 400
if key != os.getenv('ADMIN_KEY'):
return jsonify({"error": "Admin Key required"}), 403
user = get_user(bid)
if not user:
return jsonify({"error": "User not found"}), 404
new_password_hash = generate_password_hash(new_password)
with db.cursor(dictionary=True) as cur:
(cur.execute("UPDATE users SET password_hash = %s WHERE bid = %s", (new_password_hash, bid)))
db.commit()
return jsonify({"message": "Password changed successfully"}), 200
@app.route('/collect', methods=['POST']) @app.route('/collect', methods=['POST'])
def collect_salary(): def collect_salary():
return 501 return jsonify({"message":"no implementation"}), 501
@app.route('/') @app.route('/')
def hello_world(): # put application's code here def hello_world(): # put application's code here
return 'Hello World!' return 'Hello World!'
if __name__ == '__main__': if __name__ == '__main__':
app.run() app.run()