题目源码
# -*- coding:utf-8 -*-
from Crypto.Util.number import isPrime, long_to_bytes, getStrongPrime, bytes_to_long
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import os
import binascii
import random
import string
import hashlib
import socketserver
FLAG = '**********'
KEY = b'****************'
IV = b'****************'
def cbc_decrypt(c, iv):
aes = AES.new(KEY, AES.MODE_CBC, iv=iv)
return aes.decrypt(c)
def encrypt():
plain_text = ''.join([random.choice(string.ascii_letters) for _ in range(2)]) + FLAG
aes = AES.new(KEY, AES.MODE_CBC, iv=IV)
plain_text = pad(plain_text.encode(), AES.block_size)
cipher = aes.encrypt(plain_text)
return IV.hex() + cipher.hex()
def asserts(pt: bytes):
num = pt[-1]
if len(pt) == 16:
result = pt[::-1]
count = 0
for i in result:
if i == num:
count += 1
else:
break
if count == num:
return True
else:
return False
else:
return False
def decrypt(c):
iv = c[:32]
cipher = c[32:]
plain_text = cbc_decrypt(binascii.unhexlify(cipher), binascii.unhexlify(iv))
if asserts(plain_text):
return True
else:
return False
class MyServer(socketserver.BaseRequestHandler):
def proof(self):
random.seed(os.urandom(8))
random_str = ''.join([random.choice(string.ascii_letters + string.digits) for _ in range(20)])
str_sha256 = hashlib.sha256(random_str.encode()).hexdigest()
self.request.sendall(('SHA256(XXXX + %s):%s\n' % (random_str[4:], str_sha256)).encode())
self.request.sendall('Give Me XXXX:\n'.encode())
XXXX = self.request.recv(2048).strip()
if hashlib.sha256((XXXX + random_str[4:].encode())).hexdigest() != str_sha256:
return False
return True
def handle(self):
if not self.proof():
self.request.sendall(b'Error Hash!')
return
cipher = encrypt()
self.request.sendall('Welcome to AES System, please choose the following options:\n1. encrypt the flag\n2. decrypt the flag\n'.encode())
n = 0
while n < 65536:
options = self.request.recv(512).strip().decode()
if options == '1':
self.request.sendall(('This is your flag: %s\n' % cipher).encode())
elif options == '2':
self.request.sendall('Please enter ciphertext:\n'.encode())
recv_cipher = self.request.recv(512).strip().decode()
if decrypt(recv_cipher):
self.request.sendall('True\n'.encode())
else:
self.request.sendall('False\n'.encode())
else:
self.request.sendall('Input wrong! Please re-enter\n'.encode())
n += 1
return
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == '__main__':
sever = socketserver.ThreadingTCPServer(('0.0.0.0', 10010), MyServer)
ThreadedTCPServer.allow_reuse_address = True
ThreadedTCPServer.allow_reuse_port = True
sever.serve_forever()分析
如果明文只有一个块,则CBC解密过程如下
\(middle\ \oplus\ iv = plaintext\),即使已知iv和ciphertext,但是加密器的key未知,所以不能算出middle,也就解不出plaintext了
这一题的核心思路就是利用Padding Oracle Attack爆破出middle,但是这个攻击是有条件的,我们先分析一下题目的关键代码
plaintext(FLAG)经过pad,方式是pkcs7
from Crypto.Util.Padding import pad
plain_text = ''.join([random.choice(string.ascii_letters) for _ in range(2)]) + FLAG
plain_text = pad(plain_text.encode(), AES.block_size)已知iv和ciphertext
if options == '1':
self.request.sendall(('This is your flag: %s\n' % cipher).encode())服务端会用key以及我们发过去的iv和ciphertext解密得到一个plaintext,然后用asserts函数检查该plaintext的padding字符,如果符合pkcs7会输出"True",不符合则输出"False"
elif options == '2':
self.request.sendall('Please enter ciphertext:\n'.encode())
recv_cipher = self.request.recv(512).strip().decode()
if decrypt(recv_cipher):
self.request.sendall('True\n'.encode())
else:
self.request.sendall('False\n'.encode())总结一下,我们现在已知
- plaintext经过pad
- 已知iv和ciphertext(其实只要有iv就足够了)
- 解密的iv和ciphertext可控,解密得到plaintext,检查其padding是否正确并输出检查结果
符合上面这些条件,就可以用Padding Oracle Attack来恢复plaintext了
这里做个小小的拓展:在Shiro721(CVE-2019-12422)中,就有用到Padding Oracle Attack,不同于这一题,攻击者是通过状态码来推断解密和检验是否成功的
- 如果data值没有被篡改,则解密成功,并且业务校验成功,响应200
- 如果data值被篡改,服务端无法完成解密,解密校验失败,则响应500
- 如果data值被篡改,但是服务端解密成功,但业务逻辑校验失败,则可能返回200或302等响应码,而不是响应500
Padding Oracle Attack
下面是从网上找来的一张攻击原理图
就用上图里的例子
首先爆破my_iv的最后一位,当我们爆破到0x2e时,因为0x2e ^ 0x2f = 0x01,解出来的plaintext最后一位是0x01,符合padding,服务端就会返回True,我们就能确定middle的最后一位是0x2f
接着爆破my_iv的倒数第二位,这里需要注意,my_iv的最后一位不是0x2e了,因为我们的目标是使plaintext符合padding,两位的情况下plaintext最后两位应该是0x0202,0x2f ^ 0x02 = 0x2d,所以最后一位应该设置为0x2d。然后进行爆破,当爆破到0x1c2d时,因为0x1c2d ^ 0x1e2f = 0x0202,符合padding,返回True,确定middle的最后两位是0x1e2f
继续爆破my_iv的倒数第三位,0x1e2f ^ 0x0303 = 0x1d2c,最后两位设置为0x1d2c,当爆破到0x041d2c时,因为0x041d2c ^ 0x071e2f = 0x030303,符合padding,返回True,确定middle的最后三位是0x071e2f
。。。
以此类推,爆破完16位,最多需要交互16 * 256 = 4096次(题目限制了最多65536次),即可推出完整的middle,然后跟一开始题目给出的iv进行异或就可以得到flag
具体实现
验证Padding Oracle Attack
from pwn import *
from copy import deepcopy
from tqdm import trange
from Crypto.Cipher import AES
import os
import random
def AES_CBC_enc(m, key, iv):
aes = AES.new(key, AES.MODE_CBC, iv)
return aes.encrypt(m)
def AES_CBC_dec(c, key, iv):
aes = AES.new(key, AES.MODE_CBC, iv)
return aes.decrypt(c)
def padding_to_16(msg):
padding = 16 - (len(msg) % 16)
return msg + bytes([padding]) * padding
# unpadding and check padding characters
def unpadding(msg):
padding = msg[-1]
if padding == 0:
return msg, False
for i in range(padding):
if (msg[-i-1] != padding):
return msg, False
return msg[:-padding], True
def padding_oracle_attack(msg_enc, key, iv):
middle = [0] * 16
for i in trange(16):
my_iv = deepcopy(middle)
if i != 0:
my_iv[-i:] = xor(my_iv[-i:], [i+1]*i)
print(my_iv)
for j in range(256):
my_iv[-i-1] = j
msg_after_padding = AES_CBC_dec(msg_enc, key, bytes(my_iv))
flag = unpadding(msg_after_padding)[1]
if flag == True:
middle[-i-1] = j ^ (i+1)
break
msg_after_padding = xor(middle, bytearray(iv))
print(msg_after_padding)
msg, flag = unpadding(bytes(msg_after_padding))
if flag == False:
return None
return msg
msg = os.urandom(random.randint(1,15))
key = os.urandom(16)
iv = os.urandom(16)
print(msg)
# padding msg
msg_after_padding = padding_to_16(msg)
msg_enc = AES_CBC_enc(msg_after_padding, key, iv)
# padding_oracle_attack recover msg
msg = padding_oracle_attack(msg_enc, key, iv)
print(msg)
本地复现
直接用题目源码开服务即可
FLAG = 'flag{test}'
KEY = os.urandom(16)
IV = os.urandom(16)from pwn import *
import itertools
from copy import deepcopy
from tqdm import trange
from string import ascii_letters, digits
import hashlib
import binascii
# context.log_level = "debug"
p = remote("127.0.0.1", 10010)
def get_proof():
p.recvuntil(b"SHA256(XXXX + ")
last = p.recvuntil(b"):", drop=True)
shav = p.recvline()[:-1]
print(f"last = {last}")
print(f"shav = {shav}")
for cont in itertools.product(ascii_letters + digits, repeat=4):
cont = ''.join(cont).encode()
if hashlib.new("sha256", cont + last).hexdigest() == shav.decode():
print(cont)
break
p.sendlineafter(b"Give Me XXXX:\n", cont)
# unpadding and check padding characters
def unpadding(msg):
padding = msg[-1]
if padding == 0:
return msg, False
for i in range(padding):
if (msg[-i-1] != padding):
return msg, False
return msg[:-padding], True
def padding_oracle_attack(iv, c):
solved_dec = [0] * 16
for i in trange(16):
new_iv = deepcopy(solved_dec)
if i != 0:
new_iv[-i:] = xor(new_iv[-i:], [i+1]*i)
for j in range(256):
new_iv[-i-1] = j
p.sendline(b"2")
p.sendlineafter(b"Please enter ciphertext:\n", (bytes(new_iv).hex() + c.hex()).encode())
if p.recvline() != b"False\n":
solved_dec[-i-1] = j ^ (i+1)
break
msg_after_padding = xor(solved_dec, bytearray(iv))
print(msg_after_padding)
msg, flag = unpadding(bytes(msg_after_padding))
if flag == False:
return None
return msg
get_proof()
p.sendlineafter(b"1. encrypt the flag\n2. decrypt the flag\n", b"1")
iv_and_c = binascii.unhexlify(p.recvline().decode()[19:-1])
iv, c = iv_and_c[:16], iv_and_c[16:]
print(f"iv = {iv}")
print(f"c = {c}")
msg = padding_oracle_attack(iv, c)
print(msg)
"""
last = b'OfBypjLi4BizYvHW'
shav = b'c0786d83da7177ab64ac113343ad157e4a7784998fb2be1ff84ab1e378499375'
b'OTnk'
iv = b'`\xb9\x9c\xa7K>SfC|\xabz\x8b*\x00`'
c = b'#\xad\xfd\x84m\x16F\x17\xc3\xc9s\xce\x02\x86\x04\x88'
100%|██████████| 16/16 [00:00<00:00, 51.35it/s]
b'Obflag{test}\x04\x04\x04\x04'
b'Obflag{test}'
"""转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 skatexu@qq.com