Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'voluptuous' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
from ledfx.effects.audio import AudioReactiveEffect, MIN_MIDI, MAX_MIDI
from ledfx.effects.gradient import GradientEffect
from ledfx.effects import mix_colors
from ledfx.color import COLORS
import voluptuous as vol
import numpy as np
import aubio
class PitchSpectrumAudioEffect(AudioReactiveEffect, GradientEffect):
NAME = "PitchSpectrum"
CONFIG_SCHEMA = vol.Schema({
vol.Optional('blur', description='Amount to blur the effect', default = 1.0): vol.Coerce(float),
vol.Optional('mirror', description='Mirror the effect', default = True): bool,
vol.Optional('fade_rate', description='Rate at which notes fade', default = 0.15): vol.All(vol.Coerce(float), vol.Range(min=0.0, max=1.0)),
vol.Optional('responsiveness', description='Responsiveness of the note changes', default = 0.15): vol.All(vol.Coerce(float), vol.Range(min=0.0, max=1.0)),
})
def config_updated(self, config):
win_s = 1024
hop_s = 48000 // 60
tolerance = 0.8
# TODO: Move into the base audio effect class
self.pitch_o = aubio.pitch("schmitt", win_s, hop_s, 48000)
self.pitch_o.set_unit("midi")
self.pitch_o.set_tolerance(tolerance)
log_level = vol.In(["debug", "info", "warning", "error", "critical"])
def dns_url(url: str) -> str:
""" takes a DNS url (str) and validates that it matches the scheme dns://."""
if not url.lower().startswith("dns://"):
raise vol.Invalid("Doesn't start with dns://")
address: str = url[6:] # strip the dns:// off
try:
ipaddress.ip_address(address) # matches ipv4 or ipv6 addresses
except ValueError:
raise vol.Invalid("Invalid DNS URL: {}".format(url))
return url
dns_server_list = vol.All(vol.Length(max=8), [dns_url])
def validate_repository(repository: str) -> str:
"""Validate a valid repository."""
data = RE_REPOSITORY.match(repository)
if not data:
raise vol.Invalid("No valid repository format!")
# Validate URL
# pylint: disable=no-value-for-parameter
vol.Url()(data.group("url"))
return repository
# pylint: disable=no-value-for-parameter
ATTR_WATCHDOG,
CHANNEL_BETA,
CHANNEL_DEV,
CHANNEL_STABLE,
)
from .utils.validate import validate_timezone
RE_REPOSITORY = re.compile(r"^(?P[^#]+)(?:#(?P[\w\-]+))?$")
# pylint: disable=no-value-for-parameter
# pylint: disable=invalid-name
network_port = vol.All(vol.Coerce(int), vol.Range(min=1, max=65535))
wait_boot = vol.All(vol.Coerce(int), vol.Range(min=1, max=60))
docker_image = vol.Match(r"^[\w{}]+/[\-\w{}]+$")
alsa_device = vol.Maybe(vol.Match(r"\d+,\d+"))
channels = vol.In([CHANNEL_STABLE, CHANNEL_BETA, CHANNEL_DEV])
uuid_match = vol.Match(r"^[0-9a-f]{32}$")
sha256 = vol.Match(r"^[0-9a-f]{64}$")
token = vol.Match(r"^[0-9a-f]{32,256}$")
log_level = vol.In(["debug", "info", "warning", "error", "critical"])
def dns_url(url: str) -> str:
""" takes a DNS url (str) and validates that it matches the scheme dns://."""
if not url.lower().startswith("dns://"):
raise vol.Invalid("Doesn't start with dns://")
address: str = url[6:] # strip the dns:// off
try:
ipaddress.ip_address(address) # matches ipv4 or ipv6 addresses
except ValueError:
raise vol.Invalid("Invalid DNS URL: {}".format(url))
CHANNEL_DEV,
CHANNEL_STABLE,
)
from .utils.validate import validate_timezone
RE_REPOSITORY = re.compile(r"^(?P[^#]+)(?:#(?P[\w\-]+))?$")
# pylint: disable=no-value-for-parameter
# pylint: disable=invalid-name
network_port = vol.All(vol.Coerce(int), vol.Range(min=1, max=65535))
wait_boot = vol.All(vol.Coerce(int), vol.Range(min=1, max=60))
docker_image = vol.Match(r"^[\w{}]+/[\-\w{}]+$")
alsa_device = vol.Maybe(vol.Match(r"\d+,\d+"))
channels = vol.In([CHANNEL_STABLE, CHANNEL_BETA, CHANNEL_DEV])
uuid_match = vol.Match(r"^[0-9a-f]{32}$")
sha256 = vol.Match(r"^[0-9a-f]{64}$")
token = vol.Match(r"^[0-9a-f]{32,256}$")
log_level = vol.In(["debug", "info", "warning", "error", "critical"])
def dns_url(url: str) -> str:
""" takes a DNS url (str) and validates that it matches the scheme dns://."""
if not url.lower().startswith("dns://"):
raise vol.Invalid("Doesn't start with dns://")
address: str = url[6:] # strip the dns:// off
try:
ipaddress.ip_address(address) # matches ipv4 or ipv6 addresses
except ValueError:
raise vol.Invalid("Invalid DNS URL: {}".format(url))
return url
def sequence_of_sequences(min=None, max=None):
return All(
Any(
None,
[Any(list, tuple)],
tuple([Any(list, tuple)]),
),
Any(
None,
[Length(min=min, max=max)],
tuple([Length(min=min, max=max)]),
),
'seealso': Any(None, seealso_schema),
'requirements': list_string_types,
'todo': Any(None, list_string_types, *string_types),
'options': Any(None, *list_dict_option_schema),
'extends_documentation_fragment': Any(list_string_types, *string_types)
}
if version_added:
doc_schema_dict[Required('version_added')] = Any(float, *string_types)
else:
# Optional
doc_schema_dict['version_added'] = Any(float, *string_types)
if deprecated_module:
deprecation_required_scheme = {
Required('deprecated'): Any(deprecation_schema),
}
doc_schema_dict.update(deprecation_required_scheme)
return Schema(
doc_schema_dict,
extra=PREVENT_EXTRA
)
async def test_set_invalid_osc(hass, calls):
"""Test set invalid oscillating when fan has valid osc."""
await _register_components(hass)
# Turn on fan
await common.async_turn_on(hass, _TEST_FAN)
# Set fan's osc to True
await common.async_oscillate(hass, _TEST_FAN, True)
# verify
assert hass.states.get(_OSC_INPUT).state == "True"
_verify(hass, STATE_ON, None, True, None)
# Set fan's osc to None
with pytest.raises(vol.Invalid):
await common.async_oscillate(hass, _TEST_FAN, None)
# verify osc is unchanged
assert hass.states.get(_OSC_INPUT).state == "True"
_verify(hass, STATE_ON, None, True, None)
def test_schema_extend_overrides():
"""Verify that Schema.extend can override required/extra parameters."""
base = Schema({'a': int}, required=True)
extended = base.extend({'b': str}, required=False, extra=voluptuous.ALLOW_EXTRA)
assert base.required == True
assert base.extra == voluptuous.PREVENT_EXTRA
assert extended.required == False
assert extended.extra == voluptuous.ALLOW_EXTRA
def test_config_parameters(self):
w = wilson.Wilson({'qd1_1123': 1}, 1000, 'SMEFT', 'Warsaw')
with self.assertRaises(vol.MultipleInvalid):
# value must be dict
w.set_option('parameters', 4)
with self.assertRaises(vol.MultipleInvalid):
# dict value must be number
w.set_option('parameters', {'bla': 'blo'})
# int should be OK but corced to float
w.set_option('parameters', {'bla': 1})
self.assertTrue(type(w.get_option('parameters')['bla']), float)
self.assertEqual(w.get_option('parameters'), {'bla': 1.})
w.set_option('parameters', {'m_b': 4.0})
self.assertEqual(w.get_option('parameters'), {'m_b': 4.0})
self.assertEqual(w.parameters['m_b'], 4.0)
def test_schema_bad_schema(self):
"""Test bad customize schemas."""
for value in (
{'test.test': 10},
{'test.test': ['hello']},
{'entity_id': {'a': 'b'}},
{'entity_id': 10},
[{'test.test': 'value'}],
):
with pytest.raises(
MultipleInvalid,
message="{} should have raised MultipleInvalid".format(
value)):
customize.CUSTOMIZE_SCHEMA(value)