Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'hvac' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def get_hvac_client(vault_url, cacert=None):
"""Return an hvac client for the given URL.
:param vault_url: Vault url to point client at
:type vault_url: str
:param cacert: Path to CA cert used for vaults api cert.
:type cacert: str
:returns: hvac client for given url
:rtype: hvac.Client
"""
return hvac.Client(url=vault_url, verify=cacert)
"renewable": False,
"request_id": "e7c8b2e1-95e8-cb17-e98a-6c428201f1d5",
"warnings": None,
"wrap_info": None
}
mock_url = 'http://localhost:8200/v1/auth/{0}/role/{1}/custom-secret-id'.format(
'approle' if mount_point is None else mount_point,
role_name,
)
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
json=mock_response,
)
client = Client()
if mount_point is None:
actual_response = client.create_role_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
)
else:
actual_response = client.create_role_custom_secret_id(
role_name=role_name,
secret_id=secret_id,
mount_point=mount_point,
)
self.assertEquals(
first=mock_response,
second=actual_response,
)
"request_id": "2310dc21-0fea-a2de-2d94-bb4edd59f1e9",
"warnings": None,
"wrap_info": None
}
mock_url = 'http://localhost:8200/v1/auth/{0}/role/{1}/secret-id'.format(
'approle' if mount_point is None else mount_point,
role_name,
)
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
json=mock_response,
)
client = Client()
if mount_point is None:
actual_response = client.create_role_secret_id(
role_name=role_name,
)
else:
actual_response = client.create_role_secret_id(
role_name=role_name,
mount_point=mount_point,
)
self.assertEquals(
first=mock_response,
second=actual_response,
)
('incorrect tls version', dict(url=MockLdapServer.ldap_url, tls_min_version='cats'), exceptions.InvalidRequest,
"invalid 'tls_min_version'"),
])
def test_configure(self, test_label, parameters, raises=None, exception_message=''):
parameters.update({
'user_dn': MockLdapServer.ldap_users_dn,
'group_dn': MockLdapServer.ldap_groups_dn,
'mount_point': self.TEST_LDAP_PATH,
})
if raises:
with self.assertRaises(raises) as cm:
self.client.auth.ldap.configure(**parameters)
self.assertIn(
member=exception_message,
container=str(cm.exception),
)
else:
raises=exceptions.InvalidPath,
),
])
def test_read_config(self, label, configure_first=True, raises=None, exception_msg=''):
if configure_first:
configure_response = self.client.auth.okta.configure(
org_name=self.TEST_ORG_NAME,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('configure_response: %s' % configure_response)
if raises:
with self.assertRaises(raises) as cm:
self.client.auth.gcp.read_config(
mount_point=self.TEST_MOUNT_POINT,
)
self.assertIn(
pki_issue_response = self.client.write(
path='pki/issue/my-role',
common_name='test.hvac.com',
)
# Revoke the lease of our test cert that was just issued.
revoke_lease_response = self.client.sys.revoke_lease(
lease_id=pki_issue_response['lease_id'],
)
logging.debug('revoke_lease_response: %s' % revoke_lease_response)
self.assertEqual(
first=revoke_lease_response.status_code,
second=204,
)
with self.assertRaises(exceptions.InvalidPath):
self.client.sys.list_leases(
prefix='pki',
)
raises=exceptions.InvalidPath,
),
])
def test_read_role(self, label, role_name='hvac', configure_role_first=True, raises=None, exception_message=''):
bound_service_principal_ids = ['some-dummy-sp-id']
if configure_role_first:
create_role_response = self.client.auth.azure.create_role(
name=role_name,
bound_service_principal_ids=bound_service_principal_ids,
mount_point=self.TEST_MOUNT_POINT,
)
logging.debug('create_role_response: %s' % create_role_response)
if raises is not None:
with self.assertRaises(raises):
self.client.auth.azure.read_role(
name=role_name,
raises=exceptions.InvalidPath,
),
])
def test_list_roles(self, label, num_roles_to_create=1, write_config_first=True, raises=None):
if write_config_first:
self.client.auth.azure.configure(
tenant_id='my-tenant-id',
resource='my-resource',
mount_point=self.TEST_MOUNT_POINT,
)
roles_to_create = ['hvac%s' % n for n in range(0, num_roles_to_create)]
bound_service_principal_ids = ['some-dummy-sp-id']
logging.debug('roles_to_create: %s' % roles_to_create)
for role_to_create in roles_to_create:
create_role_response = self.client.auth.azure.create_role(
name=role_to_create,
bound_service_principal_ids=bound_service_principal_ids,
def test_configure_duo_access(self, test_label, mount_point, requests_mocker):
expected_status_code = 204
mock_url = 'http://localhost:8200/v1/auth/{mount_point}/duo/access'.format(
mount_point=mount_point,
)
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
)
mfa = Mfa(adapter=Request())
response = mfa.configure_duo_access(
mount_point=mount_point,
host='someapisubdomain.python-hvac.org',
integration_key='ikey',
secret_key='supersecret',
)
self.assertEqual(
first=expected_status_code,
second=response.status_code,
)
mock_response = {
"auth": {
"client_token": "f33f8c72-924e-11f8-cb43-ac59d697597c",
"accessor": "0e9e354a-520f-df04-6867-ee81cae3d42d",
"policies": test_policies,
"lease_duration": 2764800,
"renewable": True,
},
}
requests_mocker.register_uri(
method='POST',
url=mock_url,
status_code=expected_status_code,
json=mock_response,
)
azure = Azure(adapter=Request())
if raises is not None:
with self.assertRaises(raises):
azure.login(
role=role_name,
jwt='my-jwt',
mount_point=self.TEST_MOUNT_POINT,
**test_params
)
else:
login_response = azure.login(
role=role_name,
jwt='my-jwt',
mount_point=self.TEST_MOUNT_POINT,
**test_params
)
logging.debug('login_response: %s' % login_response)