Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'uvloop' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
async def test():
port = tb.find_free_port()
srv = await self.loop.create_server(Proto, '127.0.0.1', port)
wsrv = weakref.ref(srv)
del srv
gc.collect()
gc.collect()
gc.collect()
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
d = await self.loop.sock_recv(s, 100)
self.assertEqual(d, b'hello')
srv = wsrv()
srv.close()
await srv.wait_closed()
self.loop.run_until_complete(test())
self.assertTrue(eof)
self.assertIsNone(done)
self.assertEqual(recvd, b'a' * (SIZE + 1))
class Test_AIO_Unix(_TestUnix, tb.AIOTestCase):
pass
class _TestSSL(tb.SSLTestCase):
ONLYCERT = tb._cert_fullname(__file__, 'ssl_cert.pem')
ONLYKEY = tb._cert_fullname(__file__, 'ssl_key.pem')
def test_create_unix_server_ssl_1(self):
CNT = 0 # number of clients that were successful
TOTAL_CNT = 25 # total number of clients that test will create
TIMEOUT = 10.0 # timeout for this test
A_DATA = b'A' * 1024 * 1024
B_DATA = b'B' * 1024 * 1024
sslctx = self._create_server_ssl_context(self.ONLYCERT, self.ONLYKEY)
client_sslctx = self._create_client_ssl_context()
clients = []
async def handle_client(reader, writer):
@unittest.skipIf(
multiprocessing.get_start_method(False) == 'spawn',
'no need to test on macOS where spawn is used instead of fork')
def test_executors_process_pool_01(self):
self.run_pool_test(concurrent.futures.ProcessPoolExecutor)
def test_executors_process_pool_02(self):
self.run_pool_test(concurrent.futures.ThreadPoolExecutor)
class TestUVExecutors(_TestExecutors, tb.UVTestCase):
pass
class TestAIOExecutors(_TestExecutors, tb.AIOTestCase):
pass
loop.run_until_complete(server.wait_closed())
try:
loop.close()
except Exception as exc:
print(exc)
qout.put('stopped')
thread = threading.Thread(target=server_thread, daemon=True)
thread.start()
quin.get()
server_loop.call_soon_threadsafe(server_loop.stop)
thread.join(1)
class TestIssue39Regr(tb.UVTestCase):
"""See https://github.com/MagicStack/uvloop/issues/39 for details.
Original code to reproduce the bug is by Jim Fulton.
"""
def on_alarm(self, sig, fr):
if self.running:
raise FailedTestError
def run_test(self):
try:
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
'Python version must be < 3.7')
def test_transport_unclosed_warning(self):
async def test(sock):
return await self.loop.create_unix_connection(
asyncio.Protocol,
None,
sock=sock)
with self.assertWarnsRegex(ResourceWarning, 'unclosed'):
s1, s2 = socket.socketpair(socket.AF_UNIX)
with s1, s2:
self.loop.run_until_complete(test(s1))
self.loop.close()
class Test_UV_Unix(_TestUnix, tb.UVTestCase):
@unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath()')
def test_create_unix_connection_pathlib(self):
async def run(addr):
t, _ = await self.loop.create_unix_connection(
asyncio.Protocol, addr)
t.close()
with self.unix_server(lambda sock: time.sleep(0.01)) as srv:
addr = pathlib.Path(srv.addr)
self.loop.run_until_complete(run(addr))
@unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath()')
def test_create_unix_server_pathlib(self):
with self.unix_sock_name() as srv_path:
srv_path = pathlib.Path(srv_path)
with tempfile.NamedTemporaryFile('w') as stderr:
with contextlib.redirect_stdout(stdout):
with contextlib.redirect_stderr(stderr):
self.loop.run_until_complete(test())
stdout.flush()
stderr.flush()
with open(stdout.name, 'rb') as so:
self.assertEqual(so.read(), b'out\n')
with open(stderr.name, 'rb') as se:
self.assertEqual(se.read(), b'err\n')
class Test_AIO_Process(_TestProcess, tb.AIOTestCase):
pass
class TestAsyncio_UV_Process(_AsyncioTests, tb.UVTestCase):
pass
class TestAsyncio_AIO_Process(_AsyncioTests, tb.AIOTestCase):
pass
class Test_UV_Process_Delayed(tb.UVTestCase):
class TestProto:
def __init__(self):
self.lost = 0
yield from waiter
proof += 1
@asyncio.coroutine
def outer():
nonlocal proof
d, p = yield from asyncio.wait([inner()])
proof += 100
f = asyncio.ensure_future(outer())
tb.run_briefly(self.loop)
f.cancel()
self.assertRaises(
asyncio.CancelledError, self.loop.run_until_complete, f)
waiter.set_result(None)
tb.run_briefly(self.loop)
self.assertEqual(proof, 1)
await srv.wait_closed()
self.loop.run_until_complete(test())
self.assertTrue(eof)
self.assertIsNone(done)
self.assertEqual(recvd, b'a' * (SIZE + 1))
class Test_AIO_Unix(_TestUnix, tb.AIOTestCase):
pass
class _TestSSL(tb.SSLTestCase):
ONLYCERT = tb._cert_fullname(__file__, 'ssl_cert.pem')
ONLYKEY = tb._cert_fullname(__file__, 'ssl_key.pem')
def test_create_unix_server_ssl_1(self):
CNT = 0 # number of clients that were successful
TOTAL_CNT = 25 # total number of clients that test will create
TIMEOUT = 10.0 # timeout for this test
A_DATA = b'A' * 1024 * 1024
B_DATA = b'B' * 1024 * 1024
sslctx = self._create_server_ssl_context(self.ONLYCERT, self.ONLYKEY)
client_sslctx = self._create_client_ssl_context()
clients = []
async def handle_client(reader, writer):
nonlocal CNT
async def test():
port = tb.find_free_port()
srv = await self.loop.create_server(Proto, '127.0.0.1', port)
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
await self.loop.sock_sendall(s, b'a' * SIZE)
d = await self.loop.sock_recv(s, 100)
self.assertEqual(d, b'hello')
srv.close()
await srv.wait_closed()
async def test(proto_factory, exc_type, exc_re):
port = tb.find_free_port()
proto = proto_factory()
srv = await self.loop.create_server(
lambda: proto, '127.0.0.1', port)
try:
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
await self.loop.sock_sendall(s, b'a')
d = await self.loop.sock_recv(s, 100)
if not d:
raise ConnectionResetError
except ConnectionResetError:
pass
else: