From b00d3c941ef415efc2f6f78a3251478be6870ca6 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 17 Apr 2026 10:31:42 +0000 Subject: [PATCH] feat(dns): add SRV, SVCB, HTTPS and CAA record type support Extends `netius.clients.dns.DNS_TYPES` with the IANA-assigned values for SRV (0x21, RFC 2782), SVCB (0x40, RFC 9460), HTTPS (0x41, RFC 9460) and CAA (0x101, RFC 8659) and adds matching `DNSResponse.parse_*` methods so responses carrying these record types can be decoded. Threads the previously unused RDATA size value through `parse_an` and `parse_payload` into the per-type parsers so that variable length payloads (SVCB/HTTPS `SvcParams`, CAA `value`) can be bounded to the end of the record. Existing parsers accept the new `size` keyword with a `None` default so behaviour is unchanged. https://claude.ai/code/session_014Af7V3EGtPvjaUkJoZ4jED --- CHANGELOG.md | 1 + src/netius/clients/dns.py | 52 +++++++++++++++++++++++++++------- src/netius/test/clients/dns.py | 45 +++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07c35651..448fd63c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Additional HTTP status codes to `CODE_STRINGS`: 226 (IM Used), 308 (Permanent Redirect), 418 (I'm a teapot), 422 (Unprocessable Entity) and 451 (Unavailable For Legal Reasons) * Module-level `GZIP_LEVEL` and `EMPTY_CODES` constants in `netius.servers.http` replacing previously inlined magic numbers +* Support for additional DNS record types (`SRV`, `SVCB`, `HTTPS` and `CAA`) in `netius.clients.dns`, including matching `DNSResponse.parse_*` methods ### Changed diff --git a/src/netius/clients/dns.py b/src/netius/clients/dns.py index d8c1e4c9..da011d21 100644 --- a/src/netius/clients/dns.py +++ b/src/netius/clients/dns.py @@ -67,6 +67,10 @@ MX=0x0F, TXT=0x10, AAAA=0x1C, + SRV=0x21, + SVCB=0x40, + HTTPS=0x41, + CAA=0x101, ) DNS_CLASSES = dict(IN=0x01) @@ -192,41 +196,69 @@ def parse_an(self, data, index): index, type = self.parse_short(data, index) index, cls = self.parse_short(data, index) index, ttl = self.parse_long(data, index) - index, _size = self.parse_short(data, index) + index, size = self.parse_short(data, index) type_s = DNS_TYPES_R.get(type, "undefined") cls_s = DNS_CLASSES_R.get(cls, "undefined") - index, payload = self.parse_payload(data, index, type_s) + index, payload = self.parse_payload(data, index, type_s, size=size) return (index, (name, type_s, cls_s, ttl, payload)) - def parse_payload(self, data, index, type_s): + def parse_payload(self, data, index, type_s, size=None): type_s = type_s.lower() method_name = "parse_" + type_s method = getattr(self, method_name) - return method(data, index) + return method(data, index, size=size) - def parse_a(self, data, index): + def parse_a(self, data, index, size=None): index, address = self.parse_ip4(data, index) return (index, address) - def parse_aaaa(self, data, index): + def parse_aaaa(self, data, index, size=None): index, address = self.parse_ip6(data, index) return (index, address) - def parse_mx(self, data, index): + def parse_mx(self, data, index, size=None): index, preference = self.parse_short(data, index) index, address = self.parse_label(data, index) return (index, (preference, address)) - def parse_cname(self, data, index): + def parse_cname(self, data, index, size=None): index, address = self.parse_label(data, index) return (index, address) - def parse_ns(self, data, index): + def parse_ns(self, data, index, size=None): pass - def parse_ar(self, data, index): + def parse_ar(self, data, index, size=None): pass + def parse_srv(self, data, index, size=None): + index, priority = self.parse_short(data, index) + index, weight = self.parse_short(data, index) + index, port = self.parse_short(data, index) + index, target = self.parse_label(data, index) + return (index, (priority, weight, port, target)) + + def parse_svcb(self, data, index, size=None): + start = index + index, priority = self.parse_short(data, index) + index, target = self.parse_label(data, index) + end = start + size if size else index + params = data[index:end] + return (end, (priority, target, params)) + + def parse_https(self, data, index, size=None): + return self.parse_svcb(data, index, size=size) + + def parse_caa(self, data, index, size=None): + start = index + index, flags = self.parse_byte(data, index) + index, tag_l = self.parse_byte(data, index) + tag = data[index : index + tag_l] + index += tag_l + end = start + size if size else index + value = data[index:end] + return (end, (flags, tag, value)) + def parse_label(self, data, index): buffer = [] diff --git a/src/netius/test/clients/dns.py b/src/netius/test/clients/dns.py index ad1f84d6..24175c96 100644 --- a/src/netius/test/clients/dns.py +++ b/src/netius/test/clients/dns.py @@ -28,6 +28,7 @@ __license__ = "Apache License, Version 2.0" """ The license for the module """ +import struct import unittest import netius.clients @@ -110,6 +111,50 @@ def close(self): netius.clients.DNSClient.protocol = MockProtocol +class DNSResponseParserTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + self.response = netius.clients.DNSResponse(b"") + + def test_extended_types(self): + self.assertEqual(netius.clients.dns.DNS_TYPES["SRV"], 0x21) + self.assertEqual(netius.clients.dns.DNS_TYPES["SVCB"], 0x40) + self.assertEqual(netius.clients.dns.DNS_TYPES["HTTPS"], 0x41) + self.assertEqual(netius.clients.dns.DNS_TYPES["CAA"], 0x101) + + def test_parse_srv(self): + rdata = struct.pack("!HHH", 10, 20, 443) + rdata += b"\x04_sip\x07example\x03com\x00" + index, payload = self.response.parse_srv(rdata, 0, size=len(rdata)) + self.assertEqual(index, len(rdata)) + self.assertEqual(payload, (10, 20, 443, b"_sip.example.com")) + + def test_parse_svcb(self): + target = b"\x03svc\x07example\x03com\x00" + params = b"\x00\x01\x00\x02h3" + rdata = struct.pack("!H", 1) + target + params + index, payload = self.response.parse_svcb(rdata, 0, size=len(rdata)) + self.assertEqual(index, len(rdata)) + self.assertEqual(payload, (1, b"svc.example.com", params)) + + def test_parse_https_matches_svcb(self): + target = b"\x03svc\x07example\x03com\x00" + params = b"\x00\x01\x00\x02h3" + rdata = struct.pack("!H", 1) + target + params + _index_svcb, svcb = self.response.parse_svcb(rdata, 0, size=len(rdata)) + _index_https, https = self.response.parse_https(rdata, 0, size=len(rdata)) + self.assertEqual(svcb, https) + + def test_parse_caa(self): + tag = b"issue" + value = b"letsencrypt.org" + rdata = struct.pack("!BB", 0x80, len(tag)) + tag + value + index, payload = self.response.parse_caa(rdata, 0, size=len(rdata)) + self.assertEqual(index, len(rdata)) + self.assertEqual(payload, (0x80, b"issue", b"letsencrypt.org")) + + class _MockTransport(object): def close(self):