LFI (Local File Inclusion)


LFI has been identified and confirmed at the export function of the iCalendar feature

┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ cat ~/Downloads/passwd | grep -i /bin/bash
root:x:0:0:root:/root:/bin/bash
dev:x:1000:1000:dev:/home/dev:/bin/bash
qa:x:1001:1001::/home/qa:/bin/bash

Those are the 3 legit system users

Continuing the enumeration through the LFI

Crontab


*/1 * * * * www-data /bin/bash /data/scripts/app_backup.sh
*/15 * * * * mysql /bin/bash /data/scripts/table_cleanup.sh
* * * * * mysql /bin/bash /data/scripts/dbmonitor.sh

Checking the crontab file reveals that there are 3 cronjobs. They are all located under the /data/scripts directory

app_backup.sh


#!/bin/bash
 
cd /var/www
/usr/bin/rm backupapp.zip
/usr/bin/zip -r backupapp.zip /opt/app

The app_backup.sh file is a Bash script that archives the web application located at /opt/app to the /var/www directory

I can obtain that backupapp.zip file via the same LFI method

┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ mv ~/Downloads/backupapp.zip .
 
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ unzip -q backupapp.zip

Extracting content

Source Code Analysis


┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ ll .             
total 6.6M
4.0K drwxrwxr-x   3 kali kali 4.0K Oct  6 06:20 .
4.0K drwxrwxr-x   3 kali kali 4.0K Oct  6 06:20 opt
6.5M -rw-rw-r--   1 kali kali 6.5M Oct  6 06:19 backupapp.zip
4.0K drwxr-xr-x 146 kali kali 4.0K Oct  6 03:52 ..
 
┌──(kali㉿kali)-[~/archive/htb/labs/yummy]
└─$ ll opt/app/      
total 40K
4.0K drwxrwxr-x 7 kali kali 4.0K Oct  6 06:29 .
4.0K drwxrwxr-x 3 kali kali 4.0K Oct  6 06:20 ..
4.0K drwxrwxr-x 2 kali kali 4.0K Sep 30 10:16 __pycache__
4.0K drwxr-xr-x 3 kali kali 4.0K Sep 30 10:16 config
4.0K drwxr-xr-x 3 kali kali 4.0K Sep 30 10:16 middleware
4.0K drwxr-xr-x 6 kali kali 4.0K Sep 30 10:16 static
4.0K drwxr-xr-x 2 kali kali 4.0K Sep 30 10:16 templates
 12K -rw-r--r-- 1 kali kali  12K Sep 25 15:54 app.py

This is the web root directory

app.py

┌──(kali㉿kali)-[~//labs/yummy/opt/app]
└─$ cat app.py
from flask import Flask, request, send_file, render_template, redirect, url_for, flash, jsonify, make_response
import tempfile
import os
import shutil
from datetime import datetime, timedelta, timezone
from urllib.parse import quote
from ics import Calendar, Event
from middleware.verification import verify_token
from config import signature
import pymysql.cursors
from pymysql.constants import CLIENT
import jwt
import secrets
import hashlib
 
app = Flask(__name__, static_url_path='/static')
temp_dir = ''
app.secret_key = secrets.token_hex(32)
 
db_config = {
    'host': '127.0.0.1',
    'user': 'chef',
    'password': '3wDo7gSRZIwIHRxZ!',
    'database': 'yummy_db',
    'cursorclass': pymysql.cursors.DictCursor,
    'client_flag': CLIENT.MULTI_STATEMENTS
 
}
 
access_token = ''
 
@app.route('/login', methods=['GET','POST'])
def login():
    global access_token
    if request.method == 'GET':
        return render_template('login.html', message=None)
    elif request.method == 'POST':
        email = request.json.get('email')
        password = request.json.get('password')
        password2 = hashlib.sha256(password.encode()).hexdigest()
        if not email or not password:
            return jsonify(message="email or password is missing"), 400
 
        connection = pymysql.connect(**db_config)
        try:
            with connection.cursor() as cursor:
                sql = "SELECT * FROM users WHERE email=%s AND password=%s"
                cursor.execute(sql, (email, password2))
                user = cursor.fetchone()
                if user:
                    payload = {
                        'email': email,
                        'role': user['role_id'],
                        'iat': datetime.now(timezone.utc),
                        'exp': datetime.now(timezone.utc) + timedelta(seconds=3600),
                        'jwk':{'kty': 'RSA',"n":str(signature.n),"e":signature.e}
                    }
                    access_token = jwt.encode(payload, signature.key.export_key(), algorithm='RS256')
 
                    response = make_response(jsonify(access_token=access_token), 200)
                    response.set_cookie('X-AUTH-Token', access_token)
                    return response
                else:
                    return jsonify(message="Invalid email or password"), 401
        finally:
            connection.close()
 
@app.route('/logout', methods=['GET'])
def logout():
    response = make_response(redirect('/login'))
    response.set_cookie('X-AUTH-Token', '')
    return response
 
@app.route('/register', methods=['GET', 'POST'])
def register():
        if request.method == 'GET':
            return render_template('register.html', message=None)
        elif request.method == 'POST':
            role_id = 'customer_' + secrets.token_hex(4)
            email = request.json.get('email')
            password = hashlib.sha256(request.json.get('password').encode()).hexdigest()
            if not email or not password:
                return jsonify(error="email or password is missing"), 400
            connection = pymysql.connect(**db_config)
            try:
                with connection.cursor() as cursor:
                    sql = "SELECT * FROM users WHERE email=%s"
                    cursor.execute(sql, (email,))
                    existing_user = cursor.fetchone()
                    if existing_user:
                        return jsonify(error="Email already exists"), 400
                    else:
                        sql = "INSERT INTO users (email, password, role_id) VALUES (%s, %s, %s)"
                        cursor.execute(sql, (email, password, role_id))
                        connection.commit()
                        return jsonify(message="User registered successfully"), 201
            finally:
                connection.close()
 
 
@app.route('/', methods=['GET', 'POST'])
def index():
    return render_template('index.html')
 
@app.route('/book', methods=['GET', 'POST'])
def export():
    if request.method == 'POST':
        try:
            name = request.form['name']
            date = request.form['date']
            time = request.form['time']
            email = request.form['email']
            num_people = request.form['people']
            message = request.form['message']
 
            connection = pymysql.connect(**db_config)
            try:
                with connection.cursor() as cursor:
                    sql = "INSERT INTO appointments (appointment_name, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message, role_id) VALUES (%s, %s, %s, %s, %s, %s, %s)"
                    cursor.execute(sql, (name, email, date, time, num_people, message, 'customer'))
                    connection.commit()
                    flash('Your booking request was sent. You can manage your appointment further from your account. Thank you!', 'success')
            except Exception as e:
                print(e)
            return redirect('/#book-a-table')
        except ValueError:
            flash('Error processing your request. Please try again.', 'error')
    return render_template('index.html')
 
 
def generate_ics_file(name, date, time, email, num_people, message):
    global temp_dir
    temp_dir = tempfile.mkdtemp()
    current_date_time = datetime.now()
    formatted_date_time = current_date_time.strftime("%Y%m%d_%H%M%S")
 
    cal = Calendar()
    event = Event()
 
    event.name = name
    event.begin = datetime.strptime(date, "%Y-%m-%d")
    event.description = f"Email: {email}\nNumber of People: {num_people}\nMessage: {message}"
 
    cal.events.add(event)
 
    temp_file_path = os.path.join(temp_dir, quote('Yummy_reservation_' + formatted_date_time + '.ics'))
    with open(temp_file_path, 'w') as fp:
        fp.write(cal.serialize())
 
    return os.path.basename(temp_file_path)
 
@app.route('/export/<path:filename>')
def export_file(filename):
    validation = validate_login()
    if validation is None:
        return redirect(url_for('login'))
    filepath = os.path.join(temp_dir, filename)
    if os.path.exists(filepath):
        content = send_file(filepath, as_attachment=True)
        shutil.rmtree(temp_dir)
        return content
    else:
        shutil.rmtree(temp_dir)
        return "File not found", 404
 
def validate_login():
    try:
        (email, current_role), status_code = verify_token()
        if email and status_code == 200 and current_role == "administrator":
            return current_role
        elif email and status_code == 200:
            return email
        else:
            raise Exception("Invalid token")
    except Exception as e:
        return None
 
 
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
        validation = validate_login()
        if validation is None:
            return redirect(url_for('login'))
        elif validation == "administrator":
            return redirect(url_for('admindashboard'))
 
        connection = pymysql.connect(**db_config)
        try:
            with connection.cursor() as cursor:
                sql = "SELECT appointment_id, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s"
                cursor.execute(sql, (validation,))
                connection.commit()
                appointments = cursor.fetchall()
                appointments_sorted = sorted(appointments, key=lambda x: x['appointment_id'])
 
        finally:
            connection.close()
 
        return render_template('dashboard.html', appointments=appointments_sorted)
 
@app.route('/delete/<appointID>')
def delete_file(appointID):
    validation = validate_login()
    if validation is None:
        return redirect(url_for('login'))
    elif validation == "administrator":
        connection = pymysql.connect(**db_config)
        try:
            with connection.cursor() as cursor:
                sql = "DELETE FROM appointments where appointment_id= %s;"
                cursor.execute(sql, (appointID,))
                connection.commit()
 
                sql = "SELECT * from appointments"
                cursor.execute(sql)
                connection.commit()
                appointments = cursor.fetchall()
        finally:
            connection.close()
            flash("Reservation deleted successfully","success")
            return redirect(url_for("admindashboard"))
    else:
        connection = pymysql.connect(**db_config)
        try:
            with connection.cursor() as cursor:
                sql = "DELETE FROM appointments WHERE appointment_id = %s AND appointment_email = %s;"
                cursor.execute(sql, (appointID, validation))
                connection.commit()
 
                sql = "SELECT appointment_id, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s"
                cursor.execute(sql, (validation,))
                connection.commit()
                appointments = cursor.fetchall()
        finally:
            connection.close()
            flash("Reservation deleted successfully","success")
            return redirect(url_for("dashboard"))
        flash("Something went wrong!","error")
        return redirect(url_for("dashboard"))
 
@app.route('/reminder/<appointID>')
def reminder_file(appointID):
    validation = validate_login()
    if validation is None:
        return redirect(url_for('login'))
 
    connection = pymysql.connect(**db_config)
    try:
        with connection.cursor() as cursor:
            sql = "SELECT appointment_id, appointment_name, appointment_email, appointment_date, appointment_time, appointment_people, appointment_message FROM appointments WHERE appointment_email = %s AND appointment_id = %s"
            result = cursor.execute(sql, (validation, appointID))
            if result != 0:
                connection.commit()
                appointments = cursor.fetchone()
                filename = generate_ics_file(appointments['appointment_name'], appointments['appointment_date'], appointments['appointment_time'], appointments['appointment_email'], appointments['appointment_people'], appointments['appointment_message'])
                connection.close()
                flash("Reservation downloaded successfully","success")
                return redirect(url_for('export_file', filename=filename))
            else:
                flash("Something went wrong!","error")
    except:
        flash("Something went wrong!","error")
 
    return redirect(url_for("dashboard"))
 
@app.route('/admindashboard', methods=['GET', 'POST'])
def admindashboard():
        validation = validate_login()
        if validation != "administrator":
            return redirect(url_for('login'))
 
        try:
            connection = pymysql.connect(**db_config)
            with connection.cursor() as cursor:
                sql = "SELECT * from appointments"
                cursor.execute(sql)
                connection.commit()
                appointments = cursor.fetchall()
 
                search_query = request.args.get('s', '')
 
                # added option to order the reservations
                order_query = request.args.get('o', '')
 
                sql = f"SELECT * FROM appointments WHERE appointment_email LIKE %s order by appointment_date {order_query}"
                cursor.execute(sql, ('%' + search_query + '%',))
                connection.commit()
                appointments = cursor.fetchall()
            connection.close()
 
            return render_template('admindashboard.html', appointments=appointments)
        except Exception as e:
            flash(str(e), 'error')
            return render_template('admindashboard.html', appointments=appointments)
 
 
 
if __name__ == '__main__':
    app.run(threaded=True, debug=False, host='0.0.0.0', port=3000)
  • There is a DB credential disclosure; chef:3wDo7gSRZIwIHRxZ!
    • the DB is yummy_db
  • JWT uses a custom signature verification; from config import signature
  • There is the export_file function that is vulnerable to LFI as there is no input sanitization in place
    • It only checks the file by the os.path.exists method and sends out the file if the file exist
  • There is a hidden endpoint, /admindashboard, only available to administrator
    • This is done by checking the role attribute in the JWT
    • This endpoint has a SQLi vulnerability in the order_query part of the SQL query
config/signature.py

┌──(kali㉿kali)-[~/…/labs/yummy/opt/app]
└─$ cat config/signature.py
#!/usr/bin/python3
 
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import sympy
 
 
# Generate RSA key pair
q = sympy.randprime(2**19, 2**20)
n = sympy.randprime(2**1023, 2**1024) * q
e = 65537
p = n // q
phi_n = (p - 1) * (q - 1)
d = pow(e, -1, phi_n)
key_data = {'n': n, 'e': e, 'd': d, 'p': p, 'q': q}
key = RSA.construct((key_data['n'], key_data['e'], key_data['d'], key_data['p'], key_data['q']))
private_key_bytes = key.export_key()
 
private_key = serialization.load_pem_private_key(
    private_key_bytes,
    password=None,
    backend=default_backend()
)
public_key = private_key.public_key()

RSA’s security relies heavily on the difficulty of factoring large numbers. If one of the primes used in generating the modulus is weak (too small), the modulus becomes easy to factor, defeating the entire purpose of RSA encryption/signing. In code above, since q is so small, the modulus n can be factored very quickly using even simple methods like trial division.

  • Weak Prime Generation (q):
    • q is generated as a prime in the range of to , which is quite small for cryptographic purposes.
    • A prime in this range is far too small to provide adequate security. When one prime is this small, the modulus n = p * q can be factored easily, especially since the other prime p = n // q will also be large but can be easily derived once q is known.
  • Factoring the Modulus (n):
    • With the modulus n being the product of a small prime q and a larger prime p, an attacker can quickly factor n by trial division or more sophisticated factoring methods.
    • Once an attacker has factored n, they can recover both p and q. Using these, they can compute the private key d using the formula .
    • With the private key d, they can sign any data, including forging an administrative JWT (JSON Web Token), which could be disastrous for systems relying on that RSA key pair for authentication.

If an attacker knows the modulus n and the public exponent e (which are public in RSA), they can:

  • Factor n to find p and q (since q is small).
  • Calculate the private exponent d as .
  • Once they have d, they can sign arbitrary data (such as an administrative JWT) and impersonate an admin.

To make the RSA key pair secure:

  • Both p and q must be large enough (typically at least 1024 bits each for a 2048-bit modulus).
  • In the case abov, q is only about 20 bits, which is drastically smaller than what it should be. This makes n insecure.

Moving on to JWT Forgery

table_cleanup.sh


#!/bin/sh
 
/usr/bin/mysql -h localhost -u chef yummy_db -p'3wDo7gSRZIwIHRxZ!' < /data/scripts/sqlappointments.sql

The table_cleanup.sh file is a Bash script that seems to reset the database to its original state;/data/scripts/sqlappointments.sql It contains the same DB credential found earlier

dbmonitor.sh


#!/bin/bash
 
timestamp=$(/usr/bin/date)
service=mysql
response=$(/usr/bin/systemctl is-active mysql)
 
if [ "$response" != 'active' ]; then
    /usr/bin/echo "{\"status\": \"The database is down\", \"time\": \"$timestamp\"}" > /data/scripts/dbstatus.json
    /usr/bin/echo "$service is down, restarting!!!" | /usr/bin/mail -s "$service is down!!!" root
    latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
    /bin/bash "$latest_version"
else
    if [ -f /data/scripts/dbstatus.json ]; then
        if grep -q "database is down" /data/scripts/dbstatus.json 2>/dev/null; then
            /usr/bin/echo "The database was down at $timestamp. Sending notification."
            /usr/bin/echo "$service was down at $timestamp but came back up." | /usr/bin/mail -s "$service was down!" root
            /usr/bin/rm -f /data/scripts/dbstatus.json
        else
            /usr/bin/rm -f /data/scripts/dbstatus.json
            /usr/bin/echo "The automation failed in some way, attempting to fix it."
            latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
            /bin/bash "$latest_version"
        fi
    else
        /usr/bin/echo "Response is OK."
    fi
fi
 
[ -f dbstatus.json ] && /usr/bin/rm -f dbstatus.json

The script monitors the status of the MySQL service, performs actions if it detects that the service is down, and attempts to restart it using a script named fixer-v*. Additionally, it logs and sends notifications when the service status changes.

Step-by-Step Breakdown:

  1. Shebang Line:
#!/bin/bash

This indicates that the script is executed using the Bash shell.

  1. Capturing Timestamp and Service Status:
timestamp=$(/usr/bin/date)
service=mysql
response=$(/usr/bin/systemctl is-active mysql)
  • timestamp: Stores the current date and time using the /usr/bin/date command.
  • service: Stores the name of the service being monitored (mysql).
  • response: Stores the output of /usr/bin/systemctl is-active mysql, which checks if the MySQL service is running. The possible values can be active, inactive, etc.
  1. Checking if MySQL is Down:
if [ "$response" != 'active' ]; then

If the response is anything other than active, this condition triggers the following actions (MySQL service is considered down).

  1. Log the Down Status and Attempt to Restart:
/usr/bin/echo "{\"status\": \"The database is down\", \"time\": \"$timestamp\"}" > /data/scripts/dbstatus.json
/usr/bin/echo "$service is down, restarting!!!" | /usr/bin/mail -s "$service is down!!!" root
latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
/bin/bash "$latest_version"
  • The script logs the MySQL down status with a timestamp in /data/scripts/dbstatus.json.
  • It sends an email to root notifying that the MySQL service is down, using the mail command.
  • The script finds the latest version of a script named fixer-v* located in /data/scripts/. This is done by:
    • Listing all files matching the pattern fixer-v* using ls -1.
    • Sorting them using version sorting (/usr/bin/sort -V).
    • Picking the latest file using tail -n 1.
  • The latest version of the fixer script is executed using /bin/bash "$latest_version" to attempt to restart or fix the service.
  1. Handling the Case Where MySQL is Up:
else
if [ -f /data/scripts/dbstatus.json ]; then

If the MySQL service is active, the script checks if the dbstatus.json file exists (which would indicate that the service was previously down).

  1. Database Recovery After Being Down:
if grep -q "database is down" /data/scripts/dbstatus.json 2>/dev/null; then
    /usr/bin/echo "The database was down at $timestamp. Sending notification."
    /usr/bin/echo "$service was down at $timestamp but came back up." | /usr/bin/mail -s "$service was down!" root
    /usr/bin/rm -f /data/scripts/dbstatus.json

If the dbstatus.json contains “database is down” (indicating the MySQL service was previously down), the script:

  • Logs that the database was down but has come back up.
  • Sends an email to root informing them that the MySQL service has recovered.
  • Deletes the dbstatus.json file since the issue has been resolved.
  1. Handling a Failed Automation Fix:
else
    /usr/bin/rm -f /data/scripts/dbstatus.json
    /usr/bin/echo "The automation failed in some way, attempting to fix it."
    latest_version=$(/usr/bin/ls -1 /data/scripts/fixer-v* 2>/dev/null | /usr/bin/sort -V | /usr/bin/tail -n 1)
    /bin/bash "$latest_version"
  • If the dbstatus.json file exists but does not contain “database is down,” the script assumes the previous attempt to fix the issue failed.
  • It tries to rerun the fixer-v* script again to attempt another fix.
  • It also removes the dbstatus.json file, possibly to ensure a clean state for future operations.
  1. MySQL is Running Normally:
else
    /usr/bin/echo "Response is OK."

If there is no dbstatus.json file and MySQL is running (response = active), the script logs that the service is running normally.

  1. Cleaning Up the dbstatus.json File:
[ -f dbstatus.json ] && /usr/bin/rm -f dbstatus.json

This final line ensures that any lingering dbstatus.json file in the current working directory (if it exists) is removed.