题目源码
# -*- coding:utf-8 -*-
from Crypto.Util.number import isPrime, long_to_bytes, getStrongPrime, bytes_to_long
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import os
import binascii
import random
import string
import hashlib
import socketserver
= '**********'
FLAG = b'****************'
KEY = b'****************'
IV
def cbc_decrypt(c, iv):
= AES.new(KEY, AES.MODE_CBC, iv=iv)
aes return aes.decrypt(c)
def encrypt():
= ''.join([random.choice(string.ascii_letters) for _ in range(2)]) + FLAG
plain_text = AES.new(KEY, AES.MODE_CBC, iv=IV)
aes = pad(plain_text.encode(), AES.block_size)
plain_text = aes.encrypt(plain_text)
cipher return IV.hex() + cipher.hex()
def asserts(pt: bytes):
= pt[-1]
num if len(pt) == 16:
= pt[::-1]
result = 0
count for i in result:
if i == num:
+= 1
count else:
break
if count == num:
return True
else:
return False
else:
return False
def decrypt(c):
= c[:32]
iv = c[32:]
cipher = cbc_decrypt(binascii.unhexlify(cipher), binascii.unhexlify(iv))
plain_text if asserts(plain_text):
return True
else:
return False
class MyServer(socketserver.BaseRequestHandler):
def proof(self):
8))
random.seed(os.urandom(= ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(20)])
random_str = hashlib.sha256(random_str.encode()).hexdigest()
str_sha256 self.request.sendall(('SHA256(XXXX + %s):%s\n' % (random_str[4:], str_sha256)).encode())
self.request.sendall('Give Me XXXX:\n'.encode())
= self.request.recv(2048).strip()
XXXX
if hashlib.sha256((XXXX + random_str[4:].encode())).hexdigest() != str_sha256:
return False
return True
def handle(self):
if not self.proof():
self.request.sendall(b'Error Hash!')
return
= encrypt()
cipher self.request.sendall('Welcome to AES System, please choose the following options:\n1. encrypt the flag\n2. decrypt the flag\n'.encode())
= 0
n while n < 65536:
= self.request.recv(512).strip().decode()
options if options == '1':
self.request.sendall(('This is your flag: %s\n' % cipher).encode())
elif options == '2':
self.request.sendall('Please enter ciphertext:\n'.encode())
= self.request.recv(512).strip().decode()
recv_cipher if decrypt(recv_cipher):
self.request.sendall('True\n'.encode())
else:
self.request.sendall('False\n'.encode())
else:
self.request.sendall('Input wrong! Please re-enter\n'.encode())
+= 1
n return
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == '__main__':
= socketserver.ThreadingTCPServer(('0.0.0.0', 10010), MyServer)
sever = True
ThreadedTCPServer.allow_reuse_address = True
ThreadedTCPServer.allow_reuse_port sever.serve_forever()
分析
如果明文只有一个块,则CBC解密过程如下
\(middle\ \oplus\ iv = plaintext\),即使已知iv和ciphertext,但是加密器的key未知,所以不能算出middle,也就解不出plaintext了
这一题的核心思路就是利用Padding Oracle Attack爆破出middle,但是这个攻击是有条件的,我们先分析一下题目的关键代码
plaintext(FLAG)经过pad,方式是pkcs7
from Crypto.Util.Padding import pad
= ''.join([random.choice(string.ascii_letters) for _ in range(2)]) + FLAG
plain_text = pad(plain_text.encode(), AES.block_size) plain_text
已知iv和ciphertext
if options == '1':
self.request.sendall(('This is your flag: %s\n' % cipher).encode())
服务端会用key以及我们发过去的iv和ciphertext解密得到一个plaintext,然后用asserts
函数检查该plaintext的padding字符,如果符合pkcs7会输出"True",不符合则输出"False"
elif options == '2':
self.request.sendall('Please enter ciphertext:\n'.encode())
= self.request.recv(512).strip().decode()
recv_cipher if decrypt(recv_cipher):
self.request.sendall('True\n'.encode())
else:
self.request.sendall('False\n'.encode())
总结一下,我们现在已知
- plaintext经过pad
- 已知iv和ciphertext(其实只要有iv就足够了)
- 解密的iv和ciphertext可控,解密得到plaintext,检查其padding是否正确并输出检查结果
符合上面这些条件,就可以用Padding Oracle Attack来恢复plaintext了
这里做个小小的拓展:在Shiro721(CVE-2019-12422)中,就有用到Padding Oracle Attack,不同于这一题,攻击者是通过状态码来推断解密和检验是否成功的
- 如果data值没有被篡改,则解密成功,并且业务校验成功,响应200
- 如果data值被篡改,服务端无法完成解密,解密校验失败,则响应500
- 如果data值被篡改,但是服务端解密成功,但业务逻辑校验失败,则可能返回200或302等响应码,而不是响应500
Padding Oracle Attack
下面是从网上找来的一张攻击原理图
就用上图里的例子
首先爆破my_iv的最后一位,当我们爆破到0x2e时,因为0x2e ^ 0x2f = 0x01
,解出来的plaintext最后一位是0x01,符合padding,服务端就会返回True,我们就能确定middle的最后一位是0x2f
接着爆破my_iv的倒数第二位,这里需要注意,my_iv的最后一位不是0x2e了,因为我们的目标是使plaintext符合padding,两位的情况下plaintext最后两位应该是0x0202,0x2f ^ 0x02 = 0x2d
,所以最后一位应该设置为0x2d。然后进行爆破,当爆破到0x1c2d时,因为0x1c2d ^ 0x1e2f = 0x0202
,符合padding,返回True,确定middle的最后两位是0x1e2f
继续爆破my_iv的倒数第三位,0x1e2f ^ 0x0303 = 0x1d2c
,最后两位设置为0x1d2c,当爆破到0x041d2c时,因为0x041d2c ^ 0x071e2f = 0x030303
,符合padding,返回True,确定middle的最后三位是0x071e2f
。。。
以此类推,爆破完16位,最多需要交互16 * 256 = 4096
次(题目限制了最多65536次),即可推出完整的middle,然后跟一开始题目给出的iv进行异或就可以得到flag
具体实现
验证Padding Oracle Attack
from pwn import *
from copy import deepcopy
from tqdm import trange
from Crypto.Cipher import AES
import os
import random
def AES_CBC_enc(m, key, iv):
= AES.new(key, AES.MODE_CBC, iv)
aes return aes.encrypt(m)
def AES_CBC_dec(c, key, iv):
= AES.new(key, AES.MODE_CBC, iv)
aes return aes.decrypt(c)
def padding_to_16(msg):
= 16 - (len(msg) % 16)
padding return msg + bytes([padding]) * padding
# unpadding and check padding characters
def unpadding(msg):
= msg[-1]
padding if padding == 0:
return msg, False
for i in range(padding):
if (msg[-i-1] != padding):
return msg, False
return msg[:-padding], True
def padding_oracle_attack(msg_enc, key, iv):
= [0] * 16
middle for i in trange(16):
= deepcopy(middle)
my_iv if i != 0:
-i:] = xor(my_iv[-i:], [i+1]*i)
my_iv[print(my_iv)
for j in range(256):
-i-1] = j
my_iv[
= AES_CBC_dec(msg_enc, key, bytes(my_iv))
msg_after_padding = unpadding(msg_after_padding)[1]
flag if flag == True:
-i-1] = j ^ (i+1)
middle[break
= xor(middle, bytearray(iv))
msg_after_padding print(msg_after_padding)
= unpadding(bytes(msg_after_padding))
msg, flag if flag == False:
return None
return msg
= os.urandom(random.randint(1,15))
msg = os.urandom(16)
key = os.urandom(16)
iv print(msg)
# padding msg
= padding_to_16(msg)
msg_after_padding = AES_CBC_enc(msg_after_padding, key, iv)
msg_enc
# padding_oracle_attack recover msg
= padding_oracle_attack(msg_enc, key, iv)
msg print(msg)
本地复现
直接用题目源码开服务即可
= 'flag{test}'
FLAG = os.urandom(16)
KEY = os.urandom(16) IV
from pwn import *
import itertools
from copy import deepcopy
from tqdm import trange
from string import ascii_letters, digits
import hashlib
import binascii
# context.log_level = "debug"
= remote("127.0.0.1", 10010)
p
def get_proof():
b"SHA256(XXXX + ")
p.recvuntil(= p.recvuntil(b"):", drop=True)
last = p.recvline()[:-1]
shav print(f"last = {last}")
print(f"shav = {shav}")
for cont in itertools.product(ascii_letters + digits, repeat=4):
= ''.join(cont).encode()
cont if hashlib.new("sha256", cont + last).hexdigest() == shav.decode():
print(cont)
break
b"Give Me XXXX:\n", cont)
p.sendlineafter(
# unpadding and check padding characters
def unpadding(msg):
= msg[-1]
padding if padding == 0:
return msg, False
for i in range(padding):
if (msg[-i-1] != padding):
return msg, False
return msg[:-padding], True
def padding_oracle_attack(iv, c):
= [0] * 16
solved_dec for i in trange(16):
= deepcopy(solved_dec)
new_iv if i != 0:
-i:] = xor(new_iv[-i:], [i+1]*i)
new_iv[for j in range(256):
-i-1] = j
new_iv[
b"2")
p.sendline(b"Please enter ciphertext:\n", (bytes(new_iv).hex() + c.hex()).encode())
p.sendlineafter(if p.recvline() != b"False\n":
-i-1] = j ^ (i+1)
solved_dec[break
= xor(solved_dec, bytearray(iv))
msg_after_padding print(msg_after_padding)
= unpadding(bytes(msg_after_padding))
msg, flag if flag == False:
return None
return msg
get_proof()
b"1. encrypt the flag\n2. decrypt the flag\n", b"1")
p.sendlineafter(= binascii.unhexlify(p.recvline().decode()[19:-1])
iv_and_c = iv_and_c[:16], iv_and_c[16:]
iv, c print(f"iv = {iv}")
print(f"c = {c}")
= padding_oracle_attack(iv, c)
msg print(msg)
"""
last = b'OfBypjLi4BizYvHW'
shav = b'c0786d83da7177ab64ac113343ad157e4a7784998fb2be1ff84ab1e378499375'
b'OTnk'
iv = b'`\xb9\x9c\xa7K>SfC|\xabz\x8b*\x00`'
c = b'#\xad\xfd\x84m\x16F\x17\xc3\xc9s\xce\x02\x86\x04\x88'
100%|██████████| 16/16 [00:00<00:00, 51.35it/s]
b'Obflag{test}\x04\x04\x04\x04'
b'Obflag{test}'
"""
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 skatexu@qq.com