Cloud Defense Logo

Products

Solutions

Company

Book A Live Demo

Top 10 Examples of "aiodns in functional component" in Python

Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'aiodns' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.

def start_dnscan(domain):
    resolver = aiodns.DNSResolver()
    loop = asyncio.get_event_loop()
    tasks = list()
    tasks.append(loop.create_task(resolve(domain_to_brute, 'NS')))
    tasks.append(loop.create_task(resolve(domain_to_brute, 'MX')))
    loop.run_until_complete(asyncio.wait(tasks))

    futures = list()

    for t in tasks:
        for domain_name in t.result():
            name = resolver.query(domain_name, 'A')
            name.add_done_callback(error_checker_callback)
            name.data = domain_name
        futures.append(name)

    result = loop.run_until_complete(asyncio.wait(futures))
def setUp(self) -> None:
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.set_debug(True)
        self.timeout = int(os.environ.get('ASYNC_TIMEOUT', '5'))
        self.timeout_handle = self.loop.call_later(
            self.timeout, self.on_timeout)
        self.normalizer = email_normalize.Normalizer()
        self.resolver = aiodns.DNSResolver(loop=self.loop)
        self.normalizer._resolver = self.resolver
async def test_async_resolver_negative_lookup(loop) -> None:
    with patch('aiodns.DNSResolver') as mock:
        mock().gethostbyname.side_effect = aiodns.error.DNSError()
        resolver = AsyncResolver(loop=loop)
        with pytest.raises(OSError):
            await resolver.resolve('doesnotexist.bla')
async def _check(self):
        if os.getenv('CI'):
            # on CI we should always be online
            # I assume that all CI services set the CI environment variable!
            return False

        resolver = aiodns.DNSResolver()
        try:
            with timeout(1):
                await resolver.query('google.com', 'A')
        except (aiodns.error.DNSError, asyncio.TimeoutError) as e:
            print(f'\nnot online: {e.__class__.__name__} {e}\n', file=sys.stderr)
            return True
        else:
            return False
def test_query_cancel(self):
        f = self.resolver.query('google.com', 'A')
        self.resolver.cancel()
        try:
            self.loop.run_until_complete(f)
        except aiodns.error.DNSError as e:
            self.assertEqual(e.args[0], aiodns.error.ARES_ECANCELLED)
def test_query_a_bad(self):
        f = self.resolver.query('hgf8g2od29hdohid.com', 'A')
        try:
            self.loop.run_until_complete(f)
        except aiodns.error.DNSError as e:
            self.assertEqual(e.args[0], aiodns.error.ARES_ENOTFOUND)
def test_query_timeout(self):
        self.resolver = aiodns.DNSResolver(timeout=0.1, loop=self.loop)
        self.resolver.nameservers = ['1.2.3.4']
        f = self.resolver.query('google.com', 'A')
        try:
            self.loop.run_until_complete(f)
        except aiodns.error.DNSError as e:
            self.assertEqual(e.args[0], aiodns.error.ARES_ETIMEOUT)
print('Invalid infohash.')
        exit(1)

    nodes = SortedQueue(args.infohash)

    logger = logging.getLogger('ih2torrent')
    handler = StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
    handler.setFormatter(formatter)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)

    try:
        loop = asyncio.get_event_loop()
        resolver = aiodns.DNSResolver(loop=loop)
        loop.run_until_complete(ih2torrent(loop, args.infohash, args.file, args.bootstrap))
    except KeyboardInterrupt:
        print()
        print('Letting the remaining tasks finish before termination.')
    except Exception as e:
        print('Unexpected error:', e)

    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
    try:
        loop.run_until_complete(asyncio.gather(*pending))
    except CancelledError:
        pass

    loop.close()
async def validate_email(email, loop):
    """
    check an email is likely to exist

    could do SMTP looks ups: https://gist.github.com/samuelcolvin/3652427c07fac775d0cdc8af127c0ed1
    but not really worth it
    """
    domain = email.split('@', 1)[1]
    resolver = aiodns.DNSResolver(loop=loop)
    try:
        with timeout(2, loop=loop):
            await resolver.query(domain, 'MX')
    except (aiodns.error.DNSError, ValueError, asyncio.TimeoutError) as e:
        logger.info('looking up "%s": error %s %s', email, e.__class__.__name__, e)
        return False
    else:
        return True
async def query(self, sub):
        """
        Query domain
        :param sub:
        :return:
        """
        ret = None
        # root domain
        if sub == '@' or sub == '':
            sub_domain = self.domain
        else:
            sub = ''.join(sub.rsplit(self.domain, 1)).rstrip('.')
            sub_domain = '{sub}.{domain}'.format(sub=sub, domain=self.domain)
        try:
            ret = await self.resolver.query(sub_domain, 'A')
        except aiodns.error.DNSError as e:
            err_code, err_msg = e.args[0], e.args[1]
            # 1:  DNS server returned answer with no data
            # 4:  Domain name not found
            # 11: Could not contact DNS servers
            # 12: Timeout while contacting DNS servers
            if err_code not in [1, 4, 11, 12]:
                logger.warning('{domain} {exception}'.format(domain=sub_domain, exception=e))
        except Exception as e:
            logger.info(sub_domain)
            logger.warning(traceback.format_exc())

        else:
            ret = [r.host for r in ret]
            domain_ips = [s for s in ret]
            # It is a wildcard domain name and
            # the subdomain IP that is burst is consistent with the IP

Is your System Free of Underlying Vulnerabilities?
Find Out Now