Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'docker' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
base_url, timeout, pool_connections=num_pools
)
self.mount('http+docker://', self._custom_adapter)
self._unmount('http://', 'https://')
self.base_url = 'http+docker://localunixsocket'
elif base_url.startswith('npipe://'):
if not IS_WINDOWS_PLATFORM:
raise DockerException(
'The npipe:// protocol is only supported on Windows'
)
try:
self._custom_adapter = NpipeAdapter(
base_url, timeout, pool_connections=num_pools
)
except NameError:
raise DockerException(
'Install pypiwin32 package to enable npipe:// support'
)
self.mount('http+docker://', self._custom_adapter)
self.base_url = 'http+docker://localnpipe'
else:
# Use SSLAdapter for the ability to specify SSL version
if isinstance(tls, TLSConfig):
tls.configure_client(self)
elif tls:
self._custom_adapter = SSLAdapter(pool_connections=num_pools)
self.mount('https://', self._custom_adapter)
self.base_url = base_url
# version detection needs to be after unix adapter mounting
if version is None:
self._version = DEFAULT_DOCKER_API_VERSION
def main(**kwargs):
if not os.path.exists(kwargs['test_results_path']):
os.makedirs(kwargs['test_results_path'])
if kwargs['linux_app_url']:
linux_app_url = kwargs['linux_app_url']
else:
linux_app_url = re.findall('https\S*AppImage', requests.get('https://status.im/nightly/').text)[0]
app_version = ([i for i in [i for i in linux_app_url.split('/') if '.AppImage' in i]])[0]
urlretrieve(linux_app_url, 'nightly.AppImage')
client = docker.from_env()
client.images.build(tag='status_desktop', path='.')
pytest.main(['--collect-only', '-m %s' % kwargs['mark']])
from tests import test_data
if kwargs['testrail_report']:
testrail_report = TestrailReportDesktop(kwargs['test_results_path'])
testrail_report.add_run(app_version if app_version else '%s' % datetime.now().strftime("%Y-%m-%d %H:%M"))
for test in test_data.tests_to_run:
print(test)
try:
container = client.containers.run("status_desktop",
detach=True, tty=True, ports={'5900/tcp': 5900},
volumes={kwargs['test_results_path']: {'bind': TEST_REPORT_DIR_CONTAINER,
'mode': 'rw'}})
outcome = container.exec_run(['/jython-2.7.1/bin/jython', '-m', 'pytest', '/home/tests/' + test],
stdout=True, stderr=False, stdin=True)
except Exception:
try:
self.docker_client.swarm.init()
except docker.errors.APIError:
pass
# docker network
self.network_name = "test-network-dfple"
self.network = self.docker_client.networks.create(name=self.network_name, driver='overlay')
# docker-flow-proxy service
# dfp_image = self.docker_client.images.pull('vfarcic/docker-flow-proxy')
dfp_service = {
'name': 'proxy_{}'.format(self.test_name),
'image': 'vfarcic/docker-flow-proxy',
'constraints': [],
'endpoint_spec': docker.types.EndpointSpec(
ports={80: 80, 443: 443, 8080: 8080}),
'env': [
"LISTENER_ADDRESS=swarm_listener_{}".format(self.test_name),
"MODE=swarm",
"DEBUG=true",
"SERVICE_NAME=proxy_{}".format(self.test_name) ],
'networks': [self.network_name]
}
# docker-flow-swarm-listener service
# dfsl_image = self.docker_client.images.pull('vfarcic/docker-flow-swarm-listener')
dfsl_service = {
'name': 'swarm_listener_{}'.format(self.test_name),
'image': 'vfarcic/docker-flow-swarm-listener',
'constraints': ["node.role == manager"],
'endpoint_spec': docker.types.EndpointSpec(
def _create_tagged_image(base_image_tag, new_image_tag, app_or_lib_name):
docker_client = get_docker_client()
command = _get_test_image_setup_command(app_or_lib_name)
split_volumes = _get_split_volumes(get_volume_mounts(app_or_lib_name, get_expanded_libs_specs(), test=True))
create_container_volumes = _get_create_container_volumes(split_volumes)
create_container_binds = _get_create_container_binds(split_volumes)
container = docker_client.create_container(image=base_image_tag,
command=command,
volumes=create_container_volumes,
host_config=docker.utils.create_host_config(binds=create_container_binds))
docker_client.start(container=container['Id'])
log_to_client('Running commands to create new image:')
for line in docker_client.logs(container['Id'], stdout=True, stderr=True, stream=True):
log_to_client(line.strip())
exit_code = docker_client.wait(container['Id'])
if exit_code:
raise ImageCreationError(exit_code)
new_image = docker_client.commit(container=container['Id'])
try:
docker_client.remove_image(image=new_image_tag)
except:
log_to_client('Not able to remove image {}'.format(new_image_tag))
docker_client.tag(image=new_image['Id'], repository=new_image_tag, force=True)
docker_client.remove_container(container=container['Id'], v=True)
def step_impl(context, image_name, basedir):
client = docker.from_env(assert_hostname=False)
full_image_path = os.path.join(IMAGE_DIR, basedir)
output = client.build(full_image_path, rm=True, tag=image_name)
response = [" %s" % (line,) for line in output]
print("Building image %s from %s" % (full_image_path, basedir))
print(response)
return True
def exec_command(cls, command, project, service, timeout_seconds=60):
"""Executes the command in the project and service container running under
docker-compose.
Throws ContainerUnavailableError if the retry limit is reached.
Args:
command: The command to be executed in the container
project: Name of the project the container is hosting
service: Name of the service that the container is hosting
timeout_seconds: Retry time limit to wait for containers to start
Default in seconds: 60
"""
docker_client = Client(version='auto')
container_id = cls._get_container_id(project, service, docker_client, timeout_seconds)
exec_id = docker_client.exec_create(container_id, command)['Id']
docker_client.exec_start(exec_id)
def addif(self, ctn, ip_addr=''):
_name = ctn.next_if_name()
self.ctns.append(ctn)
ip = ''
if not ip_addr == '':
ip = '--ip {0}'.format(ip_addr)
if self.subnet.version == 6:
ip = '--ip6 {0}'.format(ip_addr)
local("docker network connect {0} {1} {2}".format(ip, self.name, ctn.docker_name()))
i = [x for x in list(Client(timeout=60, version='auto').inspect_network(self.id)['Containers'].values()) if x['Name'] == ctn.docker_name()][0]
if self.subnet.version == 4:
eth = 'eth{0}'.format(len(ctn.ip_addrs))
addr = i['IPv4Address']
ctn.ip_addrs.append((eth, addr, self.name))
else:
eth = 'eth{0}'.format(len(ctn.ip6_addrs))
addr = i['IPv6Address']
ctn.ip6_addrs.append((eth, addr, self.name))
def test_create_container_failed(self):
create_user(dict(
username=test_cfg.USER_NAME,
password=test_cfg.USER_PASSWORD,
email=test_cfg.USER_EMAIL
))
req = dict(
image_id=test_cfg.CONTAINER_IMAGE_ID,
user_name=test_cfg.USER_NAME,
container_name=test_cfg.CONTAINER_NAME
)
with mock.patch.object(self.cli, 'create_container') as mock_cli_create:
mock_cli_create.side_effect = docker.errors.APIError(mock.Mock(), mock.Mock())
response = worker.create_container(self.cli, **req)
#The default container_serial is ''
remove_instance_by_serial('')
remove_user_by_username(test_cfg.USER_NAME)
mock_cli_create.assert_called_once()
res_dict = json.loads(response)
self.assertEqual(res_dict.get('code'), '0x3')
def test_remove_volume_exception(self):
resp = mock.MagicMock()
resp.status_code = 409
docker_except = docker_error.APIError('test error', resp)
self.dw = get_DockerWorker({'name': 'nova_compute',
'action': 'remove_volume'})
self.dw.dc.volumes.return_value = self.volumes
self.dw.dc.remove_volume.side_effect = docker_except
self.assertRaises(docker_error.APIError, self.dw.remove_volume)
self.assertTrue(self.dw.changed)
self.dw.module.fail_json.assert_called_once_with(
failed=True,
msg="Volume named 'nova_compute' is currently in-use"
)
def test_should_handle_cleanup_error_when_removing_image(self, v2_image):
self.docker_client.inspect_image.return_value = {'Id': "abcdefgh"}
self.docker_client.remove_image.side_effect = docker.errors.APIError("Message")
squash = Squash(self.log, 'image', self.docker_client, load_image=True, cleanup=True)
squash.run()
self.log.info.assert_any_call("Removing old image image...")
self.log.warn.assert_any_call("Could not remove image image: Message, skipping cleanup after squashing")