Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Additional HTTP status codes to `CODE_STRINGS`: 226 (IM Used), 308 (Permanent Redirect), 418 (I'm a teapot), 422 (Unprocessable Entity) and 451 (Unavailable For Legal Reasons)
* Module-level `GZIP_LEVEL` and `EMPTY_CODES` constants in `netius.servers.http` replacing previously inlined magic numbers
* Support for additional DNS record types (`SRV`, `SVCB`, `HTTPS` and `CAA`) in `netius.clients.dns`, including matching `DNSResponse.parse_*` methods

### Changed

Expand Down
52 changes: 42 additions & 10 deletions src/netius/clients/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
MX=0x0F,
TXT=0x10,
AAAA=0x1C,
SRV=0x21,
SVCB=0x40,
HTTPS=0x41,
CAA=0x101,
)

DNS_CLASSES = dict(IN=0x01)
Expand Down Expand Up @@ -192,41 +196,69 @@ def parse_an(self, data, index):
index, type = self.parse_short(data, index)
index, cls = self.parse_short(data, index)
index, ttl = self.parse_long(data, index)
index, _size = self.parse_short(data, index)
index, size = self.parse_short(data, index)
type_s = DNS_TYPES_R.get(type, "undefined")
cls_s = DNS_CLASSES_R.get(cls, "undefined")
index, payload = self.parse_payload(data, index, type_s)
index, payload = self.parse_payload(data, index, type_s, size=size)

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new size plumbing is introduced in parse_an() (reading RDLENGTH and passing it into parse_payload()), but the added tests only exercise the parse_* methods directly. Consider adding a unit test that builds a minimal DNS answer and asserts DNSResponse.parse_an() (or DNSResponse.parse()) advances by RDLENGTH and produces the expected payload for one of the new types (e.g., SVCB/CAA), so the size-threading path is covered.

Suggested change
index, payload = self.parse_payload(data, index, type_s, size=size)
payload_index = index
_index, payload = self.parse_payload(data, index, type_s, size=size)
index = payload_index + size

Copilot uses AI. Check for mistakes.
return (index, (name, type_s, cls_s, ttl, payload))

def parse_payload(self, data, index, type_s):
def parse_payload(self, data, index, type_s, size=None):
type_s = type_s.lower()
method_name = "parse_" + type_s
method = getattr(self, method_name)
return method(data, index)
return method(data, index, size=size)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep parse_ dispatch backward-compatible*

parse_payload now always calls parser methods with size as a keyword argument, which breaks existing DNSResponse subclasses that override parse_* methods using the previous (self, data, index) signature. In those deployments, any parsed record routed through an override will raise TypeError: ... got an unexpected keyword argument 'size', turning previously working custom decoders into runtime failures.

Useful? React with 👍 / 👎.

Comment on lines +205 to +209

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_payload() now always calls the per-type parser with a size= keyword argument. This is a backwards-incompatible change for any external subclasses/overrides of parse_* methods that previously only accepted (data, index) (and don’t accept **kwargs), as it will raise TypeError: got an unexpected keyword argument 'size'. Consider keeping compatibility by only passing size when the target method can accept it (e.g., via inspect.signature), or by falling back to calling the method without size when a signature mismatch is detected (while being careful not to swallow real TypeErrors raised inside the parser).

Copilot uses AI. Check for mistakes.

def parse_a(self, data, index):
def parse_a(self, data, index, size=None):
index, address = self.parse_ip4(data, index)
return (index, address)

def parse_aaaa(self, data, index):
def parse_aaaa(self, data, index, size=None):
index, address = self.parse_ip6(data, index)
return (index, address)

def parse_mx(self, data, index):
def parse_mx(self, data, index, size=None):
index, preference = self.parse_short(data, index)
index, address = self.parse_label(data, index)
return (index, (preference, address))

def parse_cname(self, data, index):
def parse_cname(self, data, index, size=None):
index, address = self.parse_label(data, index)
return (index, address)

def parse_ns(self, data, index):
def parse_ns(self, data, index, size=None):
pass

def parse_ar(self, data, index):
def parse_ar(self, data, index, size=None):
pass

def parse_srv(self, data, index, size=None):
index, priority = self.parse_short(data, index)
index, weight = self.parse_short(data, index)
index, port = self.parse_short(data, index)
index, target = self.parse_label(data, index)
return (index, (priority, weight, port, target))

def parse_svcb(self, data, index, size=None):
start = index
index, priority = self.parse_short(data, index)
index, target = self.parse_label(data, index)
end = start + size if size else index

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parse_svcb(), end is computed from start + size and returned, but there’s no validation that parsing priority + target consumed <= size. With malformed RDLENGTH this can produce end < index and move the main parse index backwards, corrupting subsequent RR parsing. Consider validating start + size >= index and raising a parse error (or clamping end to index).

Suggested change
end = start + size if size else index
end = start + size if not size == None else index
if end < index:
raise ValueError("Invalid SVCB RDLENGTH")

Copilot uses AI. Check for mistakes.
Comment on lines +241 to +245

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_svcb() uses end = start + size if size else index, so when called directly without size it will treat the SvcParams as empty even if data contains trailing bytes. For consistency with other parse_* methods (which can be called with just (data, index)), consider interpreting size=None as “consume the rest of data” (e.g., end = len(data)), or make size required for variable-length RDATA parsers.

Copilot uses AI. Check for mistakes.
params = data[index:end]
return (end, (priority, target, params))

def parse_https(self, data, index, size=None):
return self.parse_svcb(data, index, size=size)

def parse_caa(self, data, index, size=None):
start = index
index, flags = self.parse_byte(data, index)
index, tag_l = self.parse_byte(data, index)
tag = data[index : index + tag_l]
index += tag_l
end = start + size if size else index
Comment on lines +254 to +258

Copilot AI Apr 17, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_caa() slices tag using tag_l without checking that tag_l fits within the advertised size (RDLENGTH). If tag_l is too large, index can advance past start + size, and returning end = start + size will move the parse index backwards. Consider validating tag_l against size (and that start + size >= index) and raising a parse error for malformed RDATA.

Suggested change
index, flags = self.parse_byte(data, index)
index, tag_l = self.parse_byte(data, index)
tag = data[index : index + tag_l]
index += tag_l
end = start + size if size else index
end = start + size if size != None else None
if not end == None:
if size < 2:
raise ValueError("Malformed CAA RDATA")
if end > len(data):
raise ValueError("Malformed CAA RDATA")
index, flags = self.parse_byte(data, index)
index, tag_l = self.parse_byte(data, index)
if not end == None and index + tag_l > end:
raise ValueError("Malformed CAA RDATA")
tag = data[index : index + tag_l]
index += tag_l
end = end if not end == None else index
if index > end:
raise ValueError("Malformed CAA RDATA")

Copilot uses AI. Check for mistakes.
value = data[index:end]
return (end, (flags, tag, value))

def parse_label(self, data, index):
buffer = []

Expand Down
45 changes: 45 additions & 0 deletions src/netius/test/clients/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import struct
import unittest

import netius.clients
Expand Down Expand Up @@ -110,6 +111,50 @@ def close(self):
netius.clients.DNSClient.protocol = MockProtocol


class DNSResponseParserTest(unittest.TestCase):

def setUp(self):
unittest.TestCase.setUp(self)
self.response = netius.clients.DNSResponse(b"")

def test_extended_types(self):
self.assertEqual(netius.clients.dns.DNS_TYPES["SRV"], 0x21)
self.assertEqual(netius.clients.dns.DNS_TYPES["SVCB"], 0x40)
self.assertEqual(netius.clients.dns.DNS_TYPES["HTTPS"], 0x41)
self.assertEqual(netius.clients.dns.DNS_TYPES["CAA"], 0x101)

def test_parse_srv(self):
rdata = struct.pack("!HHH", 10, 20, 443)
rdata += b"\x04_sip\x07example\x03com\x00"
index, payload = self.response.parse_srv(rdata, 0, size=len(rdata))
self.assertEqual(index, len(rdata))
self.assertEqual(payload, (10, 20, 443, b"_sip.example.com"))

def test_parse_svcb(self):
target = b"\x03svc\x07example\x03com\x00"
params = b"\x00\x01\x00\x02h3"
rdata = struct.pack("!H", 1) + target + params
index, payload = self.response.parse_svcb(rdata, 0, size=len(rdata))
self.assertEqual(index, len(rdata))
self.assertEqual(payload, (1, b"svc.example.com", params))

def test_parse_https_matches_svcb(self):
target = b"\x03svc\x07example\x03com\x00"
params = b"\x00\x01\x00\x02h3"
rdata = struct.pack("!H", 1) + target + params
_index_svcb, svcb = self.response.parse_svcb(rdata, 0, size=len(rdata))
_index_https, https = self.response.parse_https(rdata, 0, size=len(rdata))
self.assertEqual(svcb, https)

def test_parse_caa(self):
tag = b"issue"
value = b"letsencrypt.org"
rdata = struct.pack("!BB", 0x80, len(tag)) + tag + value
index, payload = self.response.parse_caa(rdata, 0, size=len(rdata))
self.assertEqual(index, len(rdata))
self.assertEqual(payload, (0x80, b"issue", b"letsencrypt.org"))


class _MockTransport(object):

def close(self):
Expand Down