opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
dns
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
[DIR] dnssecalgs
N/A
[DIR] quic
N/A
[DIR] rdtypes
N/A
__init__.py
1.62 KB
Rename
Delete
_asyncbackend.py
2.34 KB
Rename
Delete
_asyncio_backend.py
8.84 KB
Rename
Delete
_ddr.py
5.12 KB
Rename
Delete
_features.py
2.43 KB
Rename
Delete
_immutable_ctx.py
2.40 KB
Rename
Delete
_trio_backend.py
8.27 KB
Rename
Delete
asyncbackend.py
2.73 KB
Rename
Delete
asyncquery.py
30.10 KB
Rename
Delete
asyncresolver.py
17.43 KB
Rename
Delete
dnssec.py
40.74 KB
Rename
Delete
dnssectypes.py
1.76 KB
Rename
Delete
e164.py
3.88 KB
Rename
Delete
edns.py
16.69 KB
Rename
Delete
entropy.py
4.14 KB
Rename
Delete
enum.py
3.60 KB
Rename
Delete
exception.py
5.81 KB
Rename
Delete
flags.py
2.69 KB
Rename
Delete
grange.py
2.09 KB
Rename
Delete
immutable.py
1.97 KB
Rename
Delete
inet.py
5.64 KB
Rename
Delete
ipv4.py
2.49 KB
Rename
Delete
ipv6.py
6.40 KB
Rename
Delete
message.py
66.59 KB
Rename
Delete
name.py
41.78 KB
Rename
Delete
namedict.py
3.91 KB
Rename
Delete
nameserver.py
9.88 KB
Rename
Delete
node.py
12.37 KB
Rename
Delete
opcode.py
2.67 KB
Rename
Delete
py.typed
0 bytes
Rename
Delete
query.py
54.98 KB
Rename
Delete
rcode.py
4.06 KB
Rename
Delete
rdata.py
30.29 KB
Rename
Delete
rdataclass.py
2.91 KB
Rename
Delete
rdataset.py
16.27 KB
Rename
Delete
rdatatype.py
7.27 KB
Rename
Delete
renderer.py
10.99 KB
Rename
Delete
resolver.py
72.00 KB
Rename
Delete
reversename.py
3.74 KB
Rename
Delete
rrset.py
8.96 KB
Rename
Delete
serial.py
3.52 KB
Rename
Delete
set.py
9.00 KB
Rename
Delete
tokenizer.py
23.03 KB
Rename
Delete
transaction.py
22.06 KB
Rename
Delete
tsig.py
11.15 KB
Rename
Delete
tsigkeyring.py
2.57 KB
Rename
Delete
ttl.py
2.91 KB
Rename
Delete
update.py
11.96 KB
Rename
Delete
version.py
1.88 KB
Rename
Delete
versioned.py
11.49 KB
Rename
Delete
win32util.py
8.67 KB
Rename
Delete
wire.py
2.76 KB
Rename
Delete
xfr.py
12.96 KB
Rename
Delete
zone.py
50.87 KB
Rename
Delete
zonefile.py
27.27 KB
Rename
Delete
zonetypes.py
690 bytes
Rename
Delete
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license """asyncio library query support""" import asyncio import socket import sys import dns._asyncbackend import dns._features import dns.exception import dns.inet _is_win32 = sys.platform == "win32" def _get_running_loop(): try: return asyncio.get_running_loop() except AttributeError: # pragma: no cover return asyncio.get_event_loop() class _DatagramProtocol: def __init__(self): self.transport = None self.recvfrom = None def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): if self.recvfrom and not self.recvfrom.done(): self.recvfrom.set_result((data, addr)) def error_received(self, exc): # pragma: no cover if self.recvfrom and not self.recvfrom.done(): self.recvfrom.set_exception(exc) def connection_lost(self, exc): if self.recvfrom and not self.recvfrom.done(): if exc is None: # EOF we triggered. Is there a better way to do this? try: raise EOFError("EOF") except EOFError as e: self.recvfrom.set_exception(e) else: self.recvfrom.set_exception(exc) def close(self): self.transport.close() async def _maybe_wait_for(awaitable, timeout): if timeout is not None: try: return await asyncio.wait_for(awaitable, timeout) except asyncio.TimeoutError: raise dns.exception.Timeout(timeout=timeout) else: return await awaitable class DatagramSocket(dns._asyncbackend.DatagramSocket): def __init__(self, family, transport, protocol): super().__init__(family, socket.SOCK_DGRAM) self.transport = transport self.protocol = protocol async def sendto(self, what, destination, timeout): # pragma: no cover # no timeout for asyncio sendto self.transport.sendto(what, destination) return len(what) async def recvfrom(self, size, timeout): # ignore size as there's no way I know to tell protocol about it done = _get_running_loop().create_future() try: assert self.protocol.recvfrom is None self.protocol.recvfrom = done await _maybe_wait_for(done, timeout) return done.result() finally: self.protocol.recvfrom = None async def close(self): self.protocol.close() async def getpeername(self): return self.transport.get_extra_info("peername") async def getsockname(self): return self.transport.get_extra_info("sockname") async def getpeercert(self, timeout): raise NotImplementedError class StreamSocket(dns._asyncbackend.StreamSocket): def __init__(self, af, reader, writer): super().__init__(af, socket.SOCK_STREAM) self.reader = reader self.writer = writer async def sendall(self, what, timeout): self.writer.write(what) return await _maybe_wait_for(self.writer.drain(), timeout) async def recv(self, size, timeout): return await _maybe_wait_for(self.reader.read(size), timeout) async def close(self): self.writer.close() async def getpeername(self): return self.writer.get_extra_info("peername") async def getsockname(self): return self.writer.get_extra_info("sockname") async def getpeercert(self, timeout): return self.writer.get_extra_info("peercert") if dns._features.have("doh"): import anyio import httpcore import httpcore._backends.anyio import httpx _CoreAsyncNetworkBackend = httpcore.AsyncNetworkBackend _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream from dns.query import _compute_times, _expiration_for_this_attempt, _remaining class _NetworkBackend(_CoreAsyncNetworkBackend): def __init__(self, resolver, local_port, bootstrap_address, family): super().__init__() self._local_port = local_port self._resolver = resolver self._bootstrap_address = bootstrap_address self._family = family if local_port != 0: raise NotImplementedError( "the asyncio transport for HTTPX cannot set the local port" ) async def connect_tcp( self, host, port, timeout, local_address, socket_options=None ): # pylint: disable=signature-differs addresses = [] _, expiration = _compute_times(timeout) if dns.inet.is_address(host): addresses.append(host) elif self._bootstrap_address is not None: addresses.append(self._bootstrap_address) else: timeout = _remaining(expiration) family = self._family if local_address: family = dns.inet.af_for_address(local_address) answers = await self._resolver.resolve_name( host, family=family, lifetime=timeout ) addresses = answers.addresses() for address in addresses: try: attempt_expiration = _expiration_for_this_attempt(2.0, expiration) timeout = _remaining(attempt_expiration) with anyio.fail_after(timeout): stream = await anyio.connect_tcp( remote_host=address, remote_port=port, local_host=local_address, ) return _CoreAnyIOStream(stream) except Exception: pass raise httpcore.ConnectError async def connect_unix_socket( self, path, timeout, socket_options=None ): # pylint: disable=signature-differs raise NotImplementedError async def sleep(self, seconds): # pylint: disable=signature-differs await anyio.sleep(seconds) class _HTTPTransport(httpx.AsyncHTTPTransport): def __init__( self, *args, local_port=0, bootstrap_address=None, resolver=None, family=socket.AF_UNSPEC, **kwargs, ): if resolver is None and bootstrap_address is None: # pylint: disable=import-outside-toplevel,redefined-outer-name import dns.asyncresolver resolver = dns.asyncresolver.Resolver() super().__init__(*args, **kwargs) self._pool._network_backend = _NetworkBackend( resolver, local_port, bootstrap_address, family ) else: _HTTPTransport = dns._asyncbackend.NullTransport # type: ignore class Backend(dns._asyncbackend.Backend): def name(self): return "asyncio" async def make_socket( self, af, socktype, proto=0, source=None, destination=None, timeout=None, ssl_context=None, server_hostname=None, ): loop = _get_running_loop() if socktype == socket.SOCK_DGRAM: if _is_win32 and source is None: # Win32 wants explicit binding before recvfrom(). This is the # proper fix for [#637]. source = (dns.inet.any_for_af(af), 0) transport, protocol = await loop.create_datagram_endpoint( _DatagramProtocol, source, family=af, proto=proto, remote_addr=destination, ) return DatagramSocket(af, transport, protocol) elif socktype == socket.SOCK_STREAM: if destination is None: # This shouldn't happen, but we check to make code analysis software # happier. raise ValueError("destination required for stream sockets") (r, w) = await _maybe_wait_for( asyncio.open_connection( destination[0], destination[1], ssl=ssl_context, family=af, proto=proto, local_addr=source, server_hostname=server_hostname, ), timeout, ) return StreamSocket(af, r, w) raise NotImplementedError( "unsupported socket " + f"type {socktype}" ) # pragma: no cover async def sleep(self, interval): await asyncio.sleep(interval) def datagram_connection_required(self): return False def get_transport_class(self): return _HTTPTransport async def wait_for(self, awaitable, timeout): return await _maybe_wait_for(awaitable, timeout)
Save