Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'traitlets' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
@validate('colors')
def _validate_colors(self, proposal):
if type(proposal['value']) is dict or type(self.vertices) is dict:
return proposal['value']
required = self.vertices.size // 3 # (x, y, z) triplet per 1 color
actual = proposal['value'].size
if actual != 0 and required != actual:
raise TraitError('colors has wrong size: %s (%s required)' % (actual, required))
return proposal['value']
found = Bool()
class ArgSpec(Reference):
args = List(Unicode)
varargs = Unicode()
varkw = Unicode()
defaults = List()
class Status(Reference):
execution_state = Enum((u'busy', u'idle', u'starting'))
class CompleteReply(Reference):
matches = List(Unicode)
cursor_start = Integer()
cursor_end = Integer()
status = Unicode()
class LanguageInfo(Reference):
name = Unicode()
version = Unicode(sys.version.split()[0])
class KernelInfoReply(Reference):
protocol_version = Version(min='5.0')
implementation = Unicode()
implementation_version = Version()
language_info = Dict()
banner = Unicode()
def check(self, d):
async def test_adaptive_scaling():
# XXX: we should be able to use `InProcessClusterManager` here, but due to
# https://github.com/dask/distributed/issues/3251 this results in periodic
# failures.
config = Config()
config.DaskGateway.backend_class = LocalTestingBackend
config.ClusterConfig.adaptive_period = 0.25
async with temp_gateway(config=config) as g:
async with g.gateway_client() as gateway:
async with gateway.new_cluster() as cluster:
# Turn on adaptive scaling
await cluster.adapt()
# Worker is automatically requested
async with cluster.get_client(set_as_default=False) as client:
res = await client.submit(lambda x: x + 1, 1)
assert res == 2
# Scales down automatically
await wait_for_workers(cluster, exact=0)
resume_epoch = Int(0, config=True, help="Epoch to resume (requires using also '--resume_path'.")
coco_path = Unicode(u"/tmp/aa/coco", config=True, help="path to local coco dataset path")
init_inception = Bool(True, config=True, help="Initialize the inception networks using the paper's base network.")
#
# Network hyper parameters
#
base_network_name = Unicode("Inception3", config=True, help="Name of base network to use.")
avgpool_kernel = Int(10, config=True,
help="Size of the last avgpool layer in the Resnet. Should match the cropsize.")
classifier_name = Unicode("Inception3Classifier", config=True, help="Name of classifier to use.")
sets_network_name = Unicode("SetOpsResModule", config=True, help="Name of setops module to use.")
sets_block_name = Unicode("SetopResBlock_v1", config=True, help="Name of setops network to use.")
sets_basic_block_name = Unicode("SetopResBasicBlock", config=True,
help="Name of the basic setops block to use (where applicable).")
ops_layer_num = Int(1, config=True, help="Ops Module layers num.")
ops_latent_dim = Int(1024, config=True, help="Ops Module inner latent dim.")
setops_dropout = Float(0, config=True, help="Dropout ratio of setops module.")
crop_size = Int(299, config=True, help="Size of input crop (Resnet 224, inception 299).")
scale_size = Int(350, config=True, help="Size of input scale for data augmentation. default: 350")
paper_reproduce = Bool(False, config=True, help="Use paper reproduction settings. default: False")
discriminator_name = Unicode("AmitDiscriminator", config=True,
help="Name of discriminator (unseen classifier) to use. default: AmitDiscriminator")
embedding_dim = Int(2048, config=True, help="Dimensionality of the LaSO space. default:2048")
classifier_latent_dim = Int(2048, config=True, help="Dimensionality of the classifier latent space. default:2048")
def run(self):
#
# Setup the model
#
base_model, classifier, setops_model = self.setup_model()
class IsCompleteReplyIncomplete(Reference):
indent = Unicode()
# IOPub messages
class ExecuteInput(Reference):
code = Unicode()
execution_count = Integer()
Error = ExecuteReplyError
class Stream(Reference):
name = Enum((u'stdout', u'stderr'))
text = Unicode()
class DisplayData(MimeBundle):
pass
class ExecuteResult(MimeBundle):
execution_count = Integer()
class HistoryReply(Reference):
history = List(List())
class ClearOutput(Reference):
wait = Bool()
def test_mixed_handlers_values():
handlers = [
logging.NullHandler(),
1
]
with pytest.raises(TraitError):
HasHandlers(
handlers=handlers
)
created_files,
'enable should create files in {}'.format(dirs['conf']))
# a bit of a hack to allow initializing a new app instance
for klass in app_classes:
reset_app_class(klass)
# do disable
main_app(argv=['disable'] + argv)
# check the config directory
conf_enabled = [
path for path in created_files
if path.startswith(conf_dir) and os.path.exists(path)]
for path in conf_enabled:
with open(path, 'r') as f:
conf = Config(json.load(f))
nbapp = conf.get('NotebookApp', {})
if 'server_extensions' in nbapp:
nt.assert_not_in(
'jupyter_nbextensions_configurator',
nbapp.server_extensions,
'conf after disable should empty'
'server_extensions list in file {}'.format(path))
nbservext = nbapp.get('nbserver_extensions', {})
nt.assert_false(
{k: v for k, v in nbservext.items() if v},
'disable command should disable all '
'nbserver_extensions in file {}'.format(path))
reset_app_class(DisableJupyterNbextensionsConfiguratorApp)
async def test_slurm_backend():
c = Config()
c.SlurmClusterConfig.scheduler_cmd = "/opt/miniconda/bin/dask-gateway-scheduler"
c.SlurmClusterConfig.worker_cmd = "/opt/miniconda/bin/dask-gateway-worker"
c.SlurmClusterConfig.scheduler_memory = "256M"
c.SlurmClusterConfig.worker_memory = "256M"
c.SlurmClusterConfig.scheduler_cores = 1
c.SlurmClusterConfig.worker_cores = 1
c.DaskGateway.backend_class = SlurmTestingBackend
async with temp_gateway(config=c) as g:
auth = BasicAuth(username="alice")
async with g.gateway_client(auth=auth) as gateway:
async with gateway.new_cluster() as cluster:
db_cluster = g.gateway.backend.db.get_cluster(cluster.name)
async def test_idle_timeout(tmpdir):
config = Config()
config.DaskGateway.cluster_manager_class = InProcessClusterManager
config.DaskGateway.temp_dir = str(tmpdir)
config.InProcessClusterManager.idle_timeout = 2
async with temp_gateway(config=config) as gateway_proc:
async with Gateway(
address=gateway_proc.public_urls.connect_url,
proxy_address=gateway_proc.gateway_urls.connect_url,
asynchronous=True,
) as gateway:
# Start a cluster
cluster = await gateway.new_cluster()
# Add some workers
await cluster.scale(2)
await wait_for_workers(cluster, atleast=1)
waited = 0
from .timeunit import TimeUnit
class RangeFilter(BaseObject):
"""Wrapper for Vega-Lite RangeFilter definition.
Attributes
----------
field: Unicode
Field to be filtered.
range: List(Union(CFloat, DateTime))
Array of inclusive minimum and maximum values for a field value of a data item to be included in the filtered data.
timeUnit: TimeUnit
time unit for the field to be filtered.
"""
field = T.Unicode(allow_none=True, default_value=None, help="""Field to be filtered.""")
range = T.List(T.Union([T.CFloat(allow_none=True, default_value=None), T.Instance(DateTime, allow_none=True, default_value=None)]), allow_none=True, default_value=None, maxlen=2, minlen=2, help="""Array of inclusive minimum and maximum values for a field value of a data item to be included in the filtered data.""")
timeUnit = TimeUnit(allow_none=True, default_value=None, help="""time unit for the field to be filtered.""")
def __init__(self, field=None, range=None, timeUnit=None, **kwargs):
kwds = dict(field=field, range=range, timeUnit=timeUnit)
kwargs.update({k:v for k, v in kwds.items() if v is not None})
super(RangeFilter, self).__init__(**kwargs)