Writing Tests¶
Comprehensive testing is crucial for validating lab functionality and ensuring student exploits work reliably. This guide covers the EmbSec testing framework and best practices.
Testing Philosophy¶
Goals of Lab Testing¶
- Validate vulnerability exists: Ensure the intended vulnerability is exploitable
- Verify deterministic behavior: Same input produces same output
- Check solution correctness: Reference exploit successfully gets flag
- Prevent regressions: Catch breaking changes early
- Support multiple platforms: Work across different host systems
Types of Tests¶
- Unit tests: Individual function behavior
- Integration tests: Full exploit chains
- Negative tests: Ensure non-vulnerable paths are secure
- Performance tests: Verify timing constraints
- Compatibility tests: Cross-platform validation
Test Framework Overview¶
Architecture¶
# Test framework inheritance hierarchy
unittest.TestCase
└── LabTestBase (common/test_framework.py)
├── BufferOverflowTestBase
├── FormatStringTestBase
└── CustomLabTestBase
Key Components¶
- QEMU automation: Spawns and controls emulator
- I/O handling: Sends input and captures output
- Pattern matching: Extracts addresses and flags
- Timeout management: Prevents hanging tests
- Platform abstraction: Handles OS differences
Basic Test Structure¶
Minimal Test File¶
#!/usr/bin/env python3
"""Tests for Lab X: Name"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '../../common'))
from test_framework import LabTestBase, p32
class TestLabX(LabTestBase):
# Required configuration
LAB_NAME = "0X-lab-name"
TIMEOUT = 30
# Expected menu options for validation
EXPECTED_MENU_OPTIONS = [
"Lab Menu",
"1. Option One",
"2. Option Two"
]
# Required: Generate exploit payload
def get_exploit_payload(self, **kwargs):
"""Build the exploit payload"""
target = kwargs.get('target', 0x1000)
offset = kwargs.get('offset', 64)
payload = b"A" * offset
payload += p32(target)
return payload + b"\n"
# Required: Prepare exploit data
def prepare_exploit(self):
"""Gather information needed for exploit"""
output = self.get_menu_choice("2") # Get debug info
target_addr = self.extract_address(
output,
r'Target function:\s*0x([0-9a-fA-F]+)'
)
return {'target': target_addr | 1} # Thumb bit
# Required: Send exploit to target
def send_exploit(self, payload):
"""Send exploit payload to vulnerable function"""
self.get_menu_choice("1") # Select vulnerable option
self.send_input(payload)
if __name__ == "__main__":
import unittest
unittest.main()
Implementing Test Methods¶
Standard Test Sequence¶
The framework provides standard tests that run in order:
def test_01_binary_exists(self) # Verify build succeeded
def test_02_normal_execution(self) # Check normal behavior
def test_03_flag_not_accessible(self) # Ensure flag is protected
def test_04_vulnerability_exists(self) # Verify vulnerable behavior
def test_05_exploit_gets_flag(self) # Test complete exploit
def test_06_flag_deterministic(self) # Ensure consistent flags
Customizing Standard Tests¶
class TestAdvancedLab(LabTestBase):
LAB_NAME = "advanced-lab"
def test_04_vulnerability_exists(self):
"""Test for specific vulnerable behavior"""
self.start_qemu()
# Test information leak
self.get_menu_choice("1")
self.send_input("%p %p %p %p\n")
output = self.read_output()
# Should see pointer values
pointers = re.findall(r'0x[0-9a-fA-F]{8}', output)
self.assertGreater(len(pointers), 2,
"Format string not leaking addresses")
# Test write capability
self.get_menu_choice("1")
self.send_input("AAAA%n\n")
output = self.read_output()
# Should not crash
self.assertNotIn("fault", output.lower())
Adding Custom Tests¶
def test_07_alternative_solution(self):
"""Test alternative exploitation path"""
self.start_qemu()
# Try logic bug instead of memory corruption
output = self.get_menu_choice("99") # Hidden menu
self.assertIn("Debug mode", output)
# Extract leaked secret
secret = self.extract_pattern(output, r'Secret: (\w+)')
self.assertIsNotNone(secret)
# Use secret to authenticate
self.get_menu_choice("3") # Auth option
self.send_input(secret + "\n")
output = self.read_output()
flag = self.extract_flag(output)
self.assertIsNotNone(flag)
def test_08_timing_attack(self):
"""Test timing-based vulnerability"""
self.start_qemu()
timings = []
# Try different inputs and measure response time
for pin in ["0000", "1000", "1200", "1230", "1234"]:
start = time.time()
self.get_menu_choice("1")
self.send_input(pin + "\n")
output = self.read_output()
elapsed = time.time() - start
timings.append((pin, elapsed))
# Later inputs should take longer (more correct digits)
self.assertGreater(timings[4][1], timings[0][1],
"Timing attack not possible")
Advanced Testing Patterns¶
State Machine Testing¶
class TestStateMachine(LabTestBase):
"""Test complex state-based vulnerabilities"""
def test_state_confusion(self):
"""Test state machine confusion attack"""
self.start_qemu()
# Define state transitions
states = [
("1", "init"), # Initialize
("2", "load"), # Load data
("99", "debug"), # Hidden state
("2", "load"), # Load again (confusion!)
("3", "execute") # Execute with wrong state
]
# Execute state sequence
for choice, expected_state in states:
output = self.get_menu_choice(choice)
self.assertIn(expected_state, output.lower(),
f"State {expected_state} not reached")
# Check for successful exploitation
flag = self.extract_flag(output)
self.assertIsNotNone(flag,
"State confusion did not yield flag")
Race Condition Testing¶
def test_race_condition(self):
"""Test TOCTOU vulnerability"""
import threading
self.start_qemu()
# Function to repeatedly check status
def check_status():
for _ in range(10):
self.send_input("s\n") # Status command
time.sleep(0.01)
# Function to attempt privileged operation
def attempt_privilege():
time.sleep(0.05) # Small delay
self.send_input("p\n") # Privileged command
# Start both threads
t1 = threading.Thread(target=check_status)
t2 = threading.Thread(target=attempt_privilege)
t1.start()
t2.start()
t1.join()
t2.join()
# Check if race was won
output = self.read_output()
self.assertIn("Privileged operation succeeded", output)
Heap Exploitation Testing¶
class TestHeapExploit(LabTestBase):
"""Test heap-based vulnerabilities"""
def prepare_heap_exploit(self):
"""Set up heap for exploitation"""
self.start_qemu()
# Allocate chunks in specific pattern
chunks = []
for size in [32, 64, 32, 128, 32]:
output = self.get_menu_choice("1") # Allocate
self.send_input(f"{size}\n")
# Extract chunk address
addr = self.extract_address(output,
r'Allocated at: 0x([0-9a-fA-F]+)')
chunks.append((size, addr))
# Free specific chunks to create holes
for idx in [1, 3]: # Free 64 and 128 byte chunks
self.get_menu_choice("2") # Free
self.send_input(f"{idx}\n")
return chunks
def test_use_after_free(self):
"""Test UAF vulnerability"""
chunks = self.prepare_heap_exploit()
# Use freed chunk
self.get_menu_choice("3") # Use chunk
self.send_input("1\n") # Use freed chunk 1
# Should be able to control freed memory
self.send_input(p32(0x41414141) * 16)
# Trigger use of controlled data
self.get_menu_choice("4") # Process chunks
output = self.read_output()
self.assertIn("embsec{", output,
"UAF exploit did not succeed")
Testing Best Practices¶
1. Make Tests Deterministic¶
# BAD: Time-dependent test
def test_bad_timing(self):
self.start_qemu()
time.sleep(random.random()) # NO!
output = self.read_output()
# GOOD: Predictable timing
def test_good_timing(self):
self.start_qemu()
# Wait for specific output
output = self.wait_for_output("Ready>")
self.assertIsNotNone(output)
2. Handle Platform Differences¶
def read_output_crossplatform(self):
"""Read output handling platform differences"""
if sys.platform == "win32":
# Windows-specific handling
import msvcrt
msvcrt.setmode(self.proc.stdout.fileno(),
os.O_BINARY)
# Use non-blocking I/O
import fcntl
fd = self.proc.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Read with timeout
return self._read_with_timeout(2.0)
3. Provide Clear Failure Messages¶
# BAD: Generic assertion
self.assertTrue(flag)
# GOOD: Descriptive failure
self.assertIsNotNone(flag,
f"No flag found in output. Output was:\n{output[-500:]}")
self.assertEqual(len(chunks), 5,
f"Expected 5 chunks allocated, got {len(chunks)}. "
f"Chunks: {chunks}")
4. Test Error Conditions¶
def test_error_handling(self):
"""Ensure errors don't crash the lab"""
self.start_qemu()
# Test invalid menu option
output = self.get_menu_choice("999")
self.assertIn("Invalid", output)
# Test overflow without crash
self.get_menu_choice("1")
self.send_input("A" * 10000 + "\n")
# Should still be responsive
output = self.get_menu_choice("2")
self.assertIn("Menu", output)
5. Use Helper Methods¶
def assert_exploit_succeeds(self, payload,
expected_output="embsec{"):
"""Helper for common exploit testing pattern"""
self.start_qemu()
exploit_data = self.prepare_exploit()
# Add payload info to exploit data
exploit_data['payload'] = payload
full_payload = self.get_exploit_payload(**exploit_data)
self.send_exploit(full_payload)
output = self.read_output()
self.assertIn(expected_output, output,
f"Exploit failed. Payload: {payload.hex()}")
return output
Performance and Timing Tests¶
Measuring Execution Time¶
def test_performance_requirements(self):
"""Ensure lab meets performance requirements"""
self.start_qemu()
# Measure menu response time
start = time.time()
output = self.get_menu_choice("1")
menu_time = time.time() - start
self.assertLess(menu_time, 1.0,
"Menu response too slow")
# Measure exploit execution
start = time.time()
self.send_exploit(self.get_exploit_payload())
output = self.read_output()
exploit_time = time.time() - start
self.assertLess(exploit_time, 5.0,
"Exploit takes too long")
Testing Timing Attacks¶
def measure_timing_differential(self, input1, input2):
"""Measure timing difference between two inputs"""
timings = []
for inp in [input1, input2]:
times = []
# Multiple measurements for accuracy
for _ in range(5):
self.start_qemu()
start = time.perf_counter()
self.get_menu_choice("1")
self.send_input(inp + "\n")
output = self.read_output()
elapsed = time.perf_counter() - start
times.append(elapsed)
self.tearDown()
self.setUp()
# Use median to reduce noise
timings.append(statistics.median(times))
return abs(timings[0] - timings[1])
Debugging Failed Tests¶
Enhanced Output Capture¶
def debug_test_failure(self):
"""Helper for debugging test failures"""
# Enable verbose output
self.proc = subprocess.Popen(
self.qemu_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # Capture stderr too
bufsize=0
)
# Log all I/O
with open(f"test_debug_{self.LAB_NAME}.log", "w") as f:
f.write(f"QEMU Command: {' '.join(self.qemu_cmd)}\n")
f.write(f"Binary path: {self.lab_binary}\n")
f.write("="*50 + "\n")
# Your test operations here
self.send_input("test\n")
output = self.read_output()
f.write(f"Sent: test\\n\n")
f.write(f"Received: {repr(output)}\n")
f.write(f"Stderr: {self.proc.stderr.read()}\n")
Common Test Failures¶
Binary Not Found¶
def setUp(self):
super().setUp()
# Better error for missing binary
if not os.path.exists(self.lab_binary):
self.skipTest(
f"Lab binary not found at {self.lab_binary}. "
f"Run: make {self.LAB_NAME}"
)
QEMU Crashes¶
def test_with_crash_detection(self):
self.start_qemu()
# Check if QEMU is still running
if self.proc.poll() is not None:
# Get any error output
_, stderr = self.proc.communicate()
self.fail(f"QEMU crashed: {stderr.decode()}")
Timeout Issues¶
def wait_for_output(self, expected, timeout=5.0):
"""Wait for specific output with timeout"""
start = time.time()
buffer = ""
while time.time() - start < timeout:
try:
chunk = self.proc.stdout.read(1024)
if chunk:
buffer += chunk.decode('latin-1')
if expected in buffer:
return buffer
except:
pass
time.sleep(0.1)
self.fail(f"Timeout waiting for '{expected}'. "
f"Got: {buffer[-200:]}")
Continuous Integration¶
GitLab CI Configuration¶
test-lab-X:
stage: test
script:
- cd build-qemu
- make 0X-lab-name
- python3 ../labs/0X-lab-name/tests/test_lab.py -v
artifacts:
when: on_failure
paths:
- build-qemu/test_debug_*.log
- build-qemu/labs/0X-lab-name/*.map
Test Organization¶
# Group related tests
class TestLabCore(LabTestBase):
"""Core functionality tests"""
# Basic tests here
class TestLabExploits(LabTestBase):
"""Exploit-specific tests"""
# Exploit tests here
class TestLabEdgeCases(LabTestBase):
"""Edge case and error handling"""
# Edge cases here
# Run all test classes
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestLabCore))
suite.addTest(unittest.makeSuite(TestLabExploits))
suite.addTest(unittest.makeSuite(TestLabEdgeCases))
return suite
if __name__ == "__main__":
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite())
Test Checklist¶
Before releasing a lab, ensure tests cover:
- Binary builds successfully
- Normal execution works without exploit
- Flag is not accessible through normal paths
- Vulnerability exists and is exploitable
- Reference exploit successfully gets flag
- Flag is deterministic (same every time)
- Alternative solutions work (if applicable)
- Error conditions don't crash lab
- Performance meets requirements
- Works on Linux, macOS, and Windows (WSL)
- CI/CD tests pass
- Test output is informative on failure