Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'aiodns' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def start_dnscan(domain):
resolver = aiodns.DNSResolver()
loop = asyncio.get_event_loop()
tasks = list()
tasks.append(loop.create_task(resolve(domain_to_brute, 'NS')))
tasks.append(loop.create_task(resolve(domain_to_brute, 'MX')))
loop.run_until_complete(asyncio.wait(tasks))
futures = list()
for t in tasks:
for domain_name in t.result():
name = resolver.query(domain_name, 'A')
name.add_done_callback(error_checker_callback)
name.data = domain_name
futures.append(name)
result = loop.run_until_complete(asyncio.wait(futures))
def setUp(self) -> None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.set_debug(True)
self.timeout = int(os.environ.get('ASYNC_TIMEOUT', '5'))
self.timeout_handle = self.loop.call_later(
self.timeout, self.on_timeout)
self.normalizer = email_normalize.Normalizer()
self.resolver = aiodns.DNSResolver(loop=self.loop)
self.normalizer._resolver = self.resolver
async def test_async_resolver_negative_lookup(loop) -> None:
with patch('aiodns.DNSResolver') as mock:
mock().gethostbyname.side_effect = aiodns.error.DNSError()
resolver = AsyncResolver(loop=loop)
with pytest.raises(OSError):
await resolver.resolve('doesnotexist.bla')
async def _check(self):
if os.getenv('CI'):
# on CI we should always be online
# I assume that all CI services set the CI environment variable!
return False
resolver = aiodns.DNSResolver()
try:
with timeout(1):
await resolver.query('google.com', 'A')
except (aiodns.error.DNSError, asyncio.TimeoutError) as e:
print(f'\nnot online: {e.__class__.__name__} {e}\n', file=sys.stderr)
return True
else:
return False
def test_query_cancel(self):
f = self.resolver.query('google.com', 'A')
self.resolver.cancel()
try:
self.loop.run_until_complete(f)
except aiodns.error.DNSError as e:
self.assertEqual(e.args[0], aiodns.error.ARES_ECANCELLED)
def test_query_a_bad(self):
f = self.resolver.query('hgf8g2od29hdohid.com', 'A')
try:
self.loop.run_until_complete(f)
except aiodns.error.DNSError as e:
self.assertEqual(e.args[0], aiodns.error.ARES_ENOTFOUND)
def test_query_timeout(self):
self.resolver = aiodns.DNSResolver(timeout=0.1, loop=self.loop)
self.resolver.nameservers = ['1.2.3.4']
f = self.resolver.query('google.com', 'A')
try:
self.loop.run_until_complete(f)
except aiodns.error.DNSError as e:
self.assertEqual(e.args[0], aiodns.error.ARES_ETIMEOUT)
print('Invalid infohash.')
exit(1)
nodes = SortedQueue(args.infohash)
logger = logging.getLogger('ih2torrent')
handler = StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
try:
loop = asyncio.get_event_loop()
resolver = aiodns.DNSResolver(loop=loop)
loop.run_until_complete(ih2torrent(loop, args.infohash, args.file, args.bootstrap))
except KeyboardInterrupt:
print()
print('Letting the remaining tasks finish before termination.')
except Exception as e:
print('Unexpected error:', e)
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
try:
loop.run_until_complete(asyncio.gather(*pending))
except CancelledError:
pass
loop.close()
async def validate_email(email, loop):
"""
check an email is likely to exist
could do SMTP looks ups: https://gist.github.com/samuelcolvin/3652427c07fac775d0cdc8af127c0ed1
but not really worth it
"""
domain = email.split('@', 1)[1]
resolver = aiodns.DNSResolver(loop=loop)
try:
with timeout(2, loop=loop):
await resolver.query(domain, 'MX')
except (aiodns.error.DNSError, ValueError, asyncio.TimeoutError) as e:
logger.info('looking up "%s": error %s %s', email, e.__class__.__name__, e)
return False
else:
return True
async def query(self, sub):
"""
Query domain
:param sub:
:return:
"""
ret = None
# root domain
if sub == '@' or sub == '':
sub_domain = self.domain
else:
sub = ''.join(sub.rsplit(self.domain, 1)).rstrip('.')
sub_domain = '{sub}.{domain}'.format(sub=sub, domain=self.domain)
try:
ret = await self.resolver.query(sub_domain, 'A')
except aiodns.error.DNSError as e:
err_code, err_msg = e.args[0], e.args[1]
# 1: DNS server returned answer with no data
# 4: Domain name not found
# 11: Could not contact DNS servers
# 12: Timeout while contacting DNS servers
if err_code not in [1, 4, 11, 12]:
logger.warning('{domain} {exception}'.format(domain=sub_domain, exception=e))
except Exception as e:
logger.info(sub_domain)
logger.warning(traceback.format_exc())
else:
ret = [r.host for r in ret]
domain_ips = [s for s in ret]
# It is a wildcard domain name and
# the subdomain IP that is burst is consistent with the IP