16. Capstone: an OSPF area-0 simulator
1. Problem
This capstone brings together everything from the Routing: BGP, OSPF, Control Plane phase into a single working project. You will build a complete, functional implementation of an OSPF area-0 simulator that demonstrates mastery of all concepts covered so far.
Without integrating individual components into a complete system, you only have isolated pieces of knowledge. Real networking code must handle all edge cases, state transitions, and error conditions simultaneously. This is where theory meets practice.
2. Theory
The an OSPF area-0 simulator project integrates multiple components built in previous lessons. The architecture follows a layered design where each module handles one responsibility:
+------------------------------+
| Application Logic |
+------------------------------+
| Protocol Processing |
+------------------------------+
| Packet I/O & Buffering |
+------------------------------+
| Network Device (TUN/TAP) |
+------------------------------+
Each layer communicates through well-defined interfaces. Error handling is critical: network operations can fail at any layer, and the system must degrade gracefully. The implementation should be structured for testability.
3. Math / Spec
The protocol defines specific algorithms and data formats that must be implemented exactly for interoperability:
- Header format: fixed and variable-length fields with specific byte ordering
- Checksum: error detection method (one's complement sum or CRC)
- State transitions: valid sequences of operations and responses
- Timer values: retransmission timeouts, keepalive intervals, expiration times
All multi-byte integer fields are in network byte order (big-endian) unless explicitly stated otherwise.
4. Code
"""
capstone_an_ospf_area0_simulator.py -- Capstone: Capstone: an OSPF area-0 simulator
Integrates all phase components into a complete working system.
"""
import socket
import struct
import select
import logging
import argparse
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger(__name__)
@dataclass
class Stats:
rx: int = 0
tx: int = 0
errors: int = 0
class ProtocolHandler:
"""Base protocol handler with parse/build/process pipeline."""
def __init__(self):
self.state = {}
self.stats = Stats()
def parse(self, data: bytes) -> dict:
if len(data) < 8:
raise ValueError(f"Packet too short: {len(data)} bytes")
ver_type, flags, length, pkt_id = struct.unpack('!BBHI', data[:8])
version = (ver_type >> 4) & 0xF
pkt_type = ver_type & 0xF
payload = data[8:8 + length - 8] if length > 8 else b''
return {
'version': version, 'type': pkt_type, 'flags': flags,
'length': length, 'id': pkt_id, 'payload': payload,
}
def process(self, msg: dict) -> bytes | None:
self.stats.rx += 1
log.info("rx: type=%d id=%d len=%d", msg['type'], msg['id'], msg['length'])
return None
def build(self, pkt_type: int, flags: int, payload: bytes, pkt_id: int = 0) -> bytes:
ver_type = (1 << 4) | (pkt_type & 0xF)
length = 8 + len(payload)
header = struct.pack('!BBHI', ver_type, flags, length, pkt_id)
self.stats.tx += 1
return header + payload
class Server:
def __init__(self, handler: ProtocolHandler, host='0.0.0.0', port=9000):
self.handler = handler
self.host = host
self.port = port
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((self.host, self.port))
sock.setblocking(False)
log.info("Listening on %s:%d", self.host, self.port)
try:
while True:
readable, _, _ = select.select([sock], [], [], 1.0)
for s in readable:
data, addr = s.recvfrom(65535)
try:
msg = self.handler.parse(data)
resp = self.handler.process(msg)
if resp:
s.sendto(resp, addr)
except Exception as e:
log.error("Error from %s: %s", addr, e)
self.handler.stats.errors += 1
except KeyboardInterrupt:
log.info("Shutdown. %s", self.handler.stats)
finally:
sock.close()
if __name__ == '__main__':
ap = argparse.ArgumentParser(description='Capstone: an OSPF area-0 simulator')
ap.add_argument('--port', type=int, default=9000)
args = ap.parse_args()
Server(ProtocolHandler(), port=args.port).run()
5. Tests
import pytest
import struct
def test_roundtrip():
payload = b"test data"
pkt = PacketBuilder.build(pkt_type=1, payload=payload, pkt_id=42)
parsed = PacketBuilder.parse(pkt)
assert parsed['version'] == 1
assert parsed['type'] == 1
assert parsed['payload'] == payload
assert parsed['id'] == 42
def test_truncated():
with pytest.raises(ValueError):
PacketBuilder.parse(b"\x10\x00")
def test_checksum_roundtrip():
data = b"\x00\x01\x00\x02"
cs = PacketBuilder.checksum(data)
combined = data + struct.pack('!H', cs)
assert PacketBuilder.checksum(combined) == 0
def test_empty_payload():
pkt = PacketBuilder.build(0, b"")
parsed = PacketBuilder.parse(pkt)
assert parsed['length'] == PacketBuilder.HDR_SIZE
assert parsed['payload'] == b""
def test_large_payload():
big = b"X" * 10000
pkt = PacketBuilder.build(1, big)
parsed = PacketBuilder.parse(pkt)
assert parsed['payload'] == big
6. Exercises
★ Build the project skeleton with a proper Makefile. Verify it compiles and runs with empty input.
★ Implement the packet parser for the primary protocol. Test with at least 3 known-good packet captures.
★★ Add comprehensive error handling: every failure path should log a clear message and increment an error counter.
★★ Implement the core protocol state machine with all required transitions. Draw the state diagram.
★★ Write integration tests using real network tools (
ping,curl,dig,tcpdump).★★★ Profile with
perfand optimize the hot path. Target 100,000+ packets/sec on a single core.★★★ Fuzz with AFL++ or libFuzzer. Fix all crashes.
★★★ Test interoperability with at least two existing implementations. Document any compatibility issues.