Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'kubernetes' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def run_test(args, test_case): # pylint: disable=too-many-branches,too-many-statements
"""Run a test."""
util.load_kube_config()
api_client = k8s_client.ApiClient()
t = test_util.TestCase()
t.class_name = "tfjob_test"
namespace, name, env = ks_util.setup_ks_app(args)
t.name = os.path.basename(name)
try: # pylint: disable=too-many-nested-blocks
util.run(["ks", "apply", env, "-c", args.component], cwd=args.app_dir)
logging.info("Created job %s in namespaces %s", name, namespace)
logging.info("Wait for conditions Failed")
results = tf_job_client.wait_for_condition(
api_client, namespace, name, ["Succeeded", "Failed"],
status_callback=tf_job_client.log_status)
def create_namespace(self, ns_name):
self.cluster.create_namespace(client.V1Namespace(metadata=client.V1ObjectMeta(name=ns_name)))
def setUpClass(cls):
cls.config = base.get_e2e_configuration()
cls.path_prefix = "kubernetes/e2e_test/test_yaml/"
cls.test_namespace = "e2e-test-utils"
k8s_client = client.api_client.ApiClient(configuration=cls.config)
core_v1 = client.CoreV1Api(api_client=k8s_client)
body = client.V1Namespace(metadata=client.V1ObjectMeta(name=cls.test_namespace))
core_v1.create_namespace(body=body)
def refresh(self):
"""Refresh the underlying Kubernetes Api Pod object."""
self.obj = client.CoreV1Api().read_namespaced_pod_status(
name=self.name,
namespace=self.namespace,
)
account=None,
namespace=namespace)
# service component
serviceComponent = "modelServer-service"
generate_command = [ks, "generate", "tf-serving-service", serviceComponent]
util.run(generate_command, cwd=app_dir)
ks_deploy(
app_dir,
serviceComponent,
params,
env=None,
account=None,
namespace=namespace)
core_api = k8s_client.CoreV1Api(api_client)
deploy = core_api.read_namespaced_service(args.deploy_name, args.namespace)
cluster_ip = deploy.spec.cluster_ip
if not cluster_ip:
raise ValueError("inception service wasn't assigned a cluster ip.")
util.wait_for_deployment(
api_client, namespace, args.deploy_name, timeout_minutes=10)
logging.info("Verified TF serving started.")
def test_create_pod_from_yaml(self):
"""
Should be able to create a pod.
"""
k8s_client = client.api_client.ApiClient(configuration=self.config)
utils.create_from_yaml(
k8s_client, self.path_prefix + "core-pod.yaml")
core_api = client.CoreV1Api(k8s_client)
pod = core_api.read_namespaced_pod(name="myapp-pod",
namespace="default")
self.assertIsNotNone(pod)
core_api.delete_namespaced_pod(
name="myapp-pod", namespace="default",
body={})
def __init__(self, get_cmd, env, image, labels={}):
global ApiException
global k8sclient
from kubernetes import config
from kubernetes import client as k8sclient
from kubernetes.client.rest import ApiException
config.load_kube_config()
super(KubernetesServer, self).__init__(get_cmd, env)
self._namespace = 'default'
self._name = 'server-fixtures-%s' % uuid.uuid4()
self._image = image
self._run_cmd = get_cmd()
self._labels = _merge_dicts(labels, {
'server-fixtures': 'kubernetes-server-fixtures',
'server-fixtures/session-id': CONFIG.session_id,
})
self._v1api = k8sclient.CoreV1Api()
def new(cls, name: str) -> 'Namespace':
"""Create a new Namespace with object backing.
Args:
name: The name of the new Namespace.
Returns:
A new Namespace instance.
"""
return cls(client.V1Namespace(
metadata=client.V1ObjectMeta(
name=name
)
def test_ns_create_new(self, mock_ns_read, mock_api, mock_log):
# TODO: We should ideally replicate the correct API exception
mock_ns_read.side_effect = ApiException()
ns_create("a-namespace")
mock_ns_read.assert_called_once_with("a-namespace")
mock_api.create_namespace.assert_called_once()
mock_log.info.assert_called_once_with('Created namespace "a-namespace"')
def wait_to_deployment_to_be_deleted(deployment_name, name_space, time_out=None):
total_sleep_time = 0
while True:
try:
resp = client.AppsV1Api().read_namespaced_deployment(name=deployment_name, namespace=name_space)
except ApiException as e:
if e.status == 404:
print("Total time waiting for delete deployment {0}: {1} sec".format(deployment_name, total_sleep_time))
break
time.sleep(1)
total_sleep_time += 1
if time_out and total_sleep_time > time_out:
raise Exception("Timeout waiting to delete deployment")