Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'sshtunnel' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def _check_server_auth(self):
# Check if authentication to server was successfulZ
self.ssh_event.wait(sshtunnel.SSH_TIMEOUT) # wait for transport
self.assertTrue(self.ssh_event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual(self.ts.get_username(),
SSH_USERNAME)
self.assertTrue(self.ts.is_authenticated())
def _do_forwarding(self, timeout=sshtunnel.SSH_TIMEOUT):
self.log.debug('forward-server Start')
self.ssh_event.wait(THREADS_TIMEOUT) # wait for SSH server's transport
try:
schan = self.ts.accept(timeout=timeout)
info = "forward-server schan <> echo"
self.log.info(info + " accept()")
echo = socket.create_connection(
(self.eaddr, self.eport)
)
while self.is_server_working:
rqst, _, _ = select.select([schan, echo],
[],
[],
timeout)
if schan in rqst:
data = schan.recv(1024)
def test_show_running_version(self):
""" Test that _cli_main() function quits when Enter is pressed """
with capture_stdout_stderr() as (out, err):
with self.assertRaises(SystemExit):
sshtunnel._cli_main(args=['-V'])
if sys.version_info < (3, 4):
version = err.getvalue().split()[-1]
else:
version = out.getvalue().split()[-1]
self.assertEqual(version,
sshtunnel.__version__)
def test_show_running_version(self):
""" Test that _cli_main() function quits when Enter is pressed """
with capture_stdout_stderr() as (out, err):
with self.assertRaises(SystemExit):
sshtunnel._cli_main(args=['-V'])
if sys.version_info < (3, 4):
version = err.getvalue().split()[-1]
else:
version = out.getvalue().split()[-1]
self.assertEqual(version,
sshtunnel.__version__)
def sql_command(self, command):
"""Execute an sql command and return the results.
The db connection is established over an ssh tunnel
"""
try:
with sshtunnel.SSHTunnelForwarder(
(self._config['host'], 22),
ssh_username = self._config['ssh_user'],
ssh_password = self._config['ssh_passwd'],
remote_bind_address=('127.0.0.1', 3306),
local_bind_address=('0.0.0.0', 8306)
) as tunnel:
db = MySQLdb.connect(
host = '127.0.0.1',
port = 8306,
user = 'root',
passwd = '',
db = 'chaipcr'
)
data = pd.read_sql(command+';', con=db)
db.close()
return data
ssh_username=SSH_USERNAME,
ssh_password=SSH_PASSWORD,
remote_bind_address=(self.eaddr, self.eport),
local_bind_address=('', self.randomize_eport()),
logger=self.log
) as server:
keys = server.get_keys()
self.assertIsInstance(keys, list)
self.assertFalse(any('keys loaded from agent' in l for l in
self.sshtunnel_log_messages['info']))
tmp_dir = tempfile.mkdtemp()
shutil.copy(get_test_data_path(PKEY_FILE),
os.path.join(tmp_dir, 'id_rsa'))
keys = sshtunnel.SSHTunnelForwarder.get_keys(
self.log,
host_pkey_directories=[tmp_dir, ]
)
self.assertIsInstance(keys, list)
self.assertTrue(
any('1 keys loaded from host directory' in l
for l in self.sshtunnel_log_messages['info'])
)
shutil.rmtree(tmp_dir)
def test_raise_fwd_ext(self):
""" Test that we can silence the exceptions on sshtunnel creation """
server = open_tunnel(
'10.10.10.10',
ssh_username=SSH_USERNAME,
ssh_password=SSH_PASSWORD,
remote_bind_address=('10.0.0.1', 8080),
mute_exceptions=True,
)
# This should not raise an exception
server._raise(sshtunnel.BaseSSHTunnelForwarderError, 'test')
server._raise_fwd_exc = True # now exceptions are not silenced
with self.assertRaises(sshtunnel.BaseSSHTunnelForwarderError):
server._raise(sshtunnel.BaseSSHTunnelForwarderError, 'test')
""" Test processing deprecated API attributes """
kwargs = {'ssh_host': '10.0.0.1',
'ssh_address': '10.0.0.1',
'ssh_private_key': 'testrsa.key',
'raise_exception_if_any_forwarder_have_a_problem': True}
for item in kwargs:
self.assertEqual(kwargs[item],
sshtunnel.SSHTunnelForwarder._process_deprecated(
None,
item,
kwargs.copy()
))
# use both deprecated and not None new attribute should raise exception
for item in kwargs:
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
item,
kwargs.copy())
# deprecated attribute not in deprecation list should raise exception
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
'item',
kwargs.copy())
def test_process_deprecations(self):
""" Test processing deprecated API attributes """
kwargs = {'ssh_host': '10.0.0.1',
'ssh_address': '10.0.0.1',
'ssh_private_key': 'testrsa.key',
'raise_exception_if_any_forwarder_have_a_problem': True}
for item in kwargs:
self.assertEqual(kwargs[item],
sshtunnel.SSHTunnelForwarder._process_deprecated(
None,
item,
kwargs.copy()
))
# use both deprecated and not None new attribute should raise exception
for item in kwargs:
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
item,
kwargs.copy())
# deprecated attribute not in deprecation list should raise exception
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
'item',
kwargs.copy())
for item in kwargs:
self.assertEqual(kwargs[item],
sshtunnel.SSHTunnelForwarder._process_deprecated(
None,
item,
kwargs.copy()
))
# use both deprecated and not None new attribute should raise exception
for item in kwargs:
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
item,
kwargs.copy())
# deprecated attribute not in deprecation list should raise exception
with self.assertRaises(ValueError):
sshtunnel.SSHTunnelForwarder._process_deprecated('some value',
'item',
kwargs.copy())