Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'ansible' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def test_create_jks_success(self):
set_module_args(dict(
certificate='cert-foo',
private_key='private-foo',
dest='/path/to/keystore.jks',
name='foo',
password='changeit'
))
module = AnsibleModule(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode
)
module.exit_json = Mock()
with patch('os.remove', return_value=True):
self.run_commands.side_effect = lambda args, kwargs: (0, '', '')
create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit")
module.exit_json.assert_called_once_with(
changed=True,
cmd="keytool -importkeystore "
"-destkeystore '/path/to/keystore.jks' "
"-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' "
"-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt",
msg='',
meta = api.CollectionVersionMetadata('ansible_namespace', 'collection', '0.1.0', 'https://downloadme.com',
'myhash', {})
req = collection.CollectionRequirement('ansible_namespace', 'collection', None, galaxy_server,
['0.1.0'], '*', False, metadata=meta)
req.install(to_text(output_path), temp_path)
# Ensure the temp directory is empty, nothing is left behind
assert os.listdir(temp_path) == []
actual_files = os.listdir(collection_path)
actual_files.sort()
assert actual_files == [b'FILES.json', b'MANIFEST.json', b'README.md', b'docs', b'playbooks', b'plugins', b'roles']
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == "Installing 'ansible_namespace.collection:0.1.0' to '%s'" \
% to_text(collection_path)
assert mock_download.call_count == 1
assert mock_download.mock_calls[0][1][0] == 'https://downloadme.com'
assert mock_download.mock_calls[0][1][1] == temp_path
assert mock_download.mock_calls[0][1][2] == 'myhash'
assert mock_download.mock_calls[0][1][3] is True
raise pytest.UsageError(e)
# Return the host name as a string
# metafunc.parametrize("ansible_host", hosts.keys())
# Return a HostManager instance where pattern=host (e.g. ansible_host.all.shell('date'))
# metafunc.parametrize("ansible_host", iter(plugin.initialize(config=plugin.config, pattern=h) for h in
# hosts.keys()))
# Return a ModuleDispatcher instance representing `host` (e.g. ansible_host.shell('date'))
metafunc.parametrize("ansible_host", iter(hosts[h] for h in hosts.keys()))
if 'ansible_group' in metafunc.fixturenames:
# assert required --ansible-* parameters were used
PyTestAnsiblePlugin.assert_required_ansible_parameters(metafunc.config)
try:
plugin = metafunc.config.pluginmanager.getplugin("ansible")
hosts = plugin.initialize(config=plugin.config, pattern=metafunc.config.getoption('ansible_host_pattern'))
except ansible.errors.AnsibleError as e:
raise pytest.UsageError(e)
# FIXME: Eeew, this shouldn't be interfacing with `hosts.options`
groups = hosts.options['inventory_manager'].list_groups()
# Return the group name as a string
# metafunc.parametrize("ansible_group", groups)
# Return a ModuleDispatcher instance representing the group (e.g. ansible_group.shell('date'))
metafunc.parametrize("ansible_group", iter(hosts[g] for g in groups))
def list_all_resources_patch(mocker):
return mocker.patch.object(oci_utils, "list_all_resources")
))
# Configure the parameters that would be returned by querying the
# remote device
current = ApiParameters(params=load_fixture('load_sys_global_settings.json'))
module = AnsibleModule(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode
)
mm = ModuleManager(module=module)
# Override methods to force specific logic in the module to happen
mm.exists = Mock(return_value=False)
mm.read_current_from_device = Mock(return_value=current)
mm.update_on_device = Mock(return_value=True)
results = mm.exec_module()
assert results['changed'] is True
nitro_pass='pass',
nsip='1.1.1.1',
state='present',
))
from ansible.modules.network.netscaler import netscaler_server
client_mock = Mock()
m = Mock(return_value=client_mock)
server_proxy_mock = Mock()
with patch.multiple(
'ansible.modules.network.netscaler.netscaler_server',
get_nitro_client=m,
server_exists=Mock(side_effect=[False, True]),
ConfigProxy=Mock(return_value=server_proxy_mock),
diff_list=Mock(return_value={}),
do_state_change=Mock(return_value=Mock(errorcode=0))
):
self.module = netscaler_server
self.exited()
self.assertIn(call.save_config(), client_mock.mock_calls)
def test_disable_server_graceful(self):
set_module_args(dict(
nitro_user='user',
nitro_pass='pass',
nsip='1.1.1.1',
state='present',
disabled=True,
graceful=True
))
from ansible.modules.network.netscaler import netscaler_server
client_mock = Mock()
m = Mock(return_value=client_mock)
server_proxy_mock = Mock()
d = {
'graceful': True,
'delay': 20,
}
with patch.multiple(
'ansible.modules.network.netscaler.netscaler_server',
nitro_exception=self.MockException,
get_nitro_client=m,
diff_list=Mock(return_value=d),
get_immutables_intersection=Mock(return_value=[]),
server_exists=Mock(side_effect=[True, True]),
traffic_group='traffic-group-local-only',
vlan='net1',
password='password',
server='localhost',
user='admin'
))
module = AnsibleModule(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode
)
mm = ModuleManager(module=module)
# Override methods to force specific logic in the module to happen
mm.exists = Mock(side_effect=[False, True])
mm.create_on_device = Mock(return_value=True)
results = mm.exec_module()
assert results['changed'] is True
with patch.multiple(Parameters, **to_patch) as patched:
patched['_check_active_volume'].return_value = True
patched['_volume_exists_on_device'].return_value = True
mm = RemoteManager(module=module)
mm.exit_json = Mock(return_value=True)
mm.list_volumes_on_device = Mock(
return_value=self.loaded_volumes_2
)
mm.list_hotfixes_on_device = Mock(
return_value=self.loaded_hotfixes
)
mm.list_images_on_device = Mock(return_value=self.loaded_images)
mm.wait_for_software_install_on_device = Mock(return_value=True)
mm.run_command_on_device = Mock(side_effect=cmd_side_effect_md5_ok)
mm.install_image_on_device = Mock(return_value=True)
mm.wait_for_images = Mock(return_value=True)
results = mm.exec_module()
assert results['changed'] is True
assert results['force'] is False
assert results['reuse_inactive_volume'] is False
assert results['remote_src'] is True
assert results['software'] == \
'http://fake.com/BIGIP-12.1.2.0.0.249.iso'
assert results['software_md5sum'] == \
'http://fake.com/BIGIP-12.1.2.0.0.249.iso.md5'
assert results['state'] == 'installed'
assert results['version'] == '12.1.2'
assert results['build'] == '0.0.249'
assert results['volume'] == 'HD1.4'
def test_graceful_nitro_exception_state_present(self):
set_module_args(dict(
nitro_user='user',
nitro_pass='pass',
nsip='1.1.1.1',
state='present',
))
from ansible.modules.network.netscaler import netscaler_cs_action
class MockException(Exception):
def __init__(self, *args, **kwargs):
self.errorcode = 0
self.message = ''
m = Mock(side_effect=MockException)
with patch.multiple(
'ansible.modules.network.netscaler.netscaler_cs_action',
action_exists=m,
ensure_feature_is_enabled=Mock(return_value=True),
nitro_exception=MockException
):
self.module = netscaler_cs_action
result = self.failed()
self.assertTrue(
result['msg'].startswith('nitro exception'),
msg='Nitro exception not caught on operation absent'
)