Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'supervisor' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def test_clearLog_unreadable(self):
supervisord = DummySupervisor()
interface = self._makeOne(supervisord)
self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.clearLog)
def shutdownSupervisor(self):
xmlrpclib.ServerProxy('http://127.0.0.1', SupervisorTransport('', '',
'unix://%s' % self.supervisor_socket)).supervisor.shutdown()
self.assertEqual(options.nocleanup, True)
self.assertEqual(len(options.programs), 3)
cat = options.programs[0]
self.assertEqual(cat.name, 'cat')
self.assertEqual(cat.command, '/bin/cat')
self.assertEqual(cat.priority, 1)
self.assertEqual(cat.autostart, True)
self.assertEqual(cat.autorestart, True)
self.assertEqual(cat.startsecs, 5)
self.assertEqual(cat.startretries, 10)
self.assertEqual(cat.uid, 0)
self.assertEqual(cat.logfile, '/tmp/cat.log')
self.assertEqual(cat.stopsignal, signal.SIGKILL)
self.assertEqual(cat.stopwaitsecs, 5)
self.assertEqual(cat.logfile_maxbytes, datatypes.byte_size('50MB'))
self.assertEqual(cat.logfile_backups, 10)
self.assertEqual(cat.exitcodes, [0,2])
cat2 = options.programs[1]
self.assertEqual(cat2.name, 'cat2')
self.assertEqual(cat2.command, '/bin/cat')
self.assertEqual(cat2.priority, 999)
self.assertEqual(cat2.autostart, True)
self.assertEqual(cat2.autorestart, False)
self.assertEqual(cat2.uid, None)
self.assertEqual(cat2.logfile, '/tmp/cat2.log')
self.assertEqual(cat2.stopsignal, signal.SIGTERM)
self.assertEqual(cat2.logfile_maxbytes, 1024)
self.assertEqual(cat2.logfile_backups, 2)
self.assertEqual(cat2.exitcodes, [0,2])
self.assertEqual(cat2.uid, None)
self.assertEqual(cat2.logfile, '/tmp/cat2.log')
self.assertEqual(cat2.stopsignal, signal.SIGTERM)
self.assertEqual(cat2.logfile_maxbytes, 1024)
self.assertEqual(cat2.logfile_backups, 2)
self.assertEqual(cat2.exitcodes, [0,2])
cat3 = options.programs[2]
self.assertEqual(cat3.name, 'cat3')
self.assertEqual(cat3.command, '/bin/cat')
self.assertEqual(cat3.priority, 999)
self.assertEqual(cat3.autostart, True)
self.assertEqual(cat3.autorestart, True)
self.assertEqual(cat3.uid, None)
self.assertEqual(cat3.logfile, instance.AUTOMATIC)
self.assertEqual(cat3.logfile_maxbytes, datatypes.byte_size('50MB'))
self.assertEqual(cat3.logfile_backups, 10)
self.assertEqual(cat3.exitcodes, [0,1,127])
self.assertEqual(cat2.stopsignal, signal.SIGTERM)
here = os.path.abspath(os.getcwd())
self.assertEqual(instance.uid, 0)
self.assertEqual(instance.gid, 0)
self.assertEqual(instance.directory, '/tmp')
self.assertEqual(instance.umask, 022)
self.assertEqual(instance.logfile, os.path.join(here,'supervisord.log'))
self.assertEqual(instance.logfile_maxbytes, 1000 * 1024 * 1024)
self.assertEqual(instance.logfile_backups, 5)
self.assertEqual(instance.loglevel, 40)
self.assertEqual(instance.pidfile, os.path.join(here,'supervisord.pid'))
self.assertEqual(instance.nodaemon, True)
*@param* ``bool wait``: wait for the application to be fully started.
*@throws* ``RPCError``:
* with code ``Faults.BAD_SUPVISORS_STATE`` if **Supvisors** is not in state ``OPERATION``,
* with code ``Faults.BAD_STRATEGY`` if strategy is unknown to **Supvisors**,
* with code ``Faults.BAD_NAME`` if application_name is unknown to **Supvisors**,
* with code ``Faults.ALREADY_STARTED`` if application is ``STARTING``, ``STOPPING`` or ``RUNNING``,
* with code ``Faults.ABNORMAL_TERMINATION`` if application could not be started.
*@return* ``bool``: always ``True`` unless error or nothing to start.
"""
self._check_operating()
# check strategy
if strategy not in StartingStrategies._values():
raise RPCError(Faults.BAD_STRATEGY, '{}'.format(strategy))
# check application is known
if application_name not in self.context.applications.keys():
raise RPCError(Faults.BAD_NAME, application_name)
# check application is not already RUNNING
application = self.context.applications[application_name]
if application.state != ApplicationStates.STOPPED:
raise RPCError(Faults.ALREADY_STARTED, application_name)
# TODO: develop a predictive model to check if starting can be achieved
# if impossible due to a lack of resources, second try without optionals
# return false if still impossible
done = self.starter.start_application(strategy, application)
self.logger.debug('start_application {} done={}'.format(application_name, done))
# wait until application fully RUNNING or (failed)
if wait and not done:
def onwait():
# check starter
*@throws* ``RPCError``:
* with code ``Faults.BAD_SUPVISORS_STATE`` if **Supvisors** is not in state ``OPERATION`` or ``CONCILIATION``,
* with code ``Faults.BAD_NAME`` if application_name is unknown to **Supvisors**.
* with code ``Faults.NOT_RUNNING`` if application is ``STOPPED``,
*@return* ``bool``: always ``True`` unless error.
"""
self._check_operating_conciliation()
# check application is known
if application_name not in self.context.applications.keys():
raise RPCError(Faults.BAD_NAME, application_name)
# check application is not already STOPPED
application = self.context.applications[application_name]
if application.state == ApplicationStates.STOPPED:
raise RPCError(Faults.NOT_RUNNING, application_name)
# stop the application
done = self.stopper.stop_application(application)
self.logger.debug('stop_application {} done={}'.format(application_name, done))
# wait until application fully STOPPED
if wait and not done:
def onwait():
# check stopper
if self.stopper.in_progress():
return NOT_DONE_YET
if application.state != ApplicationStates.STOPPED:
raise RPCError(Faults.ABNORMAL_TERMINATION, application_name)
return True
onwait.delay = 0.5
return onwait # deferred
# if done is True, nothing to do
return not done
def traverse(ob, method, params):
dotted_parts = method.split('.')
# security (CVE-2017-11610, don't allow object traversal)
if len(dotted_parts) != 2:
raise RPCError(Faults.UNKNOWN_METHOD)
namespace, method = dotted_parts
# security (don't allow methods that start with an underscore to
# be called remotely)
if method.startswith('_'):
raise RPCError(Faults.UNKNOWN_METHOD)
rpcinterface = getattr(ob, namespace, None)
if rpcinterface is None:
raise RPCError(Faults.UNKNOWN_METHOD)
func = getattr(rpcinterface, method, None)
if not isinstance(func, types.MethodType):
raise RPCError(Faults.UNKNOWN_METHOD)
try:
priority = integer(get(section, 'priority', 999))
processes=self.processes_from_section(parser, section, program_name,
ProcessConfig)
groups.append(
ProcessGroupConfig(self, program_name, priority, processes)
)
# process "event listener" homogeneous groups
for section in all_sections:
if not section.startswith('eventlistener:'):
continue
pool_name = section.split(':', 1)[1]
# give listeners a "high" default priority so they are started first
# and stopped last at mainloop exit
priority = integer(get(section, 'priority', -1))
buffer_size = integer(get(section, 'buffer_size', 10))
result_handler = get(section, 'result_handler',
'supervisor.dispatchers:default_handler')
try:
result_handler = self.import_spec(result_handler)
except ImportError:
raise ValueError('%s cannot be resolved within [%s]' % (
result_handler, section))
pool_event_names = [x.upper() for x in
list_of_strings(get(section, 'events', ''))]
pool_event_names = set(pool_event_names)
if not pool_event_names:
raise ValueError('[%s] section requires an "events" line' %
section)
program_name = process_or_group_name(section.split(':', 1)[1])
host_node_name = platform.node()
common_expansions = {'here':self.here,
'program_name':program_name,
'host_node_name':host_node_name,
'group_name':group_name}
def get(section, opt, *args, **kwargs):
expansions = kwargs.get('expansions', {})
expansions.update(common_expansions)
kwargs['expansions'] = expansions
return parser.saneget(section, opt, *args, **kwargs)
priority = integer(get(section, 'priority', 999))
autostart = boolean(get(section, 'autostart', 'true'))
autorestart = auto_restart(get(section, 'autorestart', 'unexpected'))
startsecs = integer(get(section, 'startsecs', 1))
startretries = integer(get(section, 'startretries', 3))
stopsignal = signal_number(get(section, 'stopsignal', 'TERM'))
stopwaitsecs = integer(get(section, 'stopwaitsecs', 10))
stopasgroup = boolean(get(section, 'stopasgroup', 'false'))
killasgroup = boolean(get(section, 'killasgroup', stopasgroup))
exitcodes = list_of_exitcodes(get(section, 'exitcodes', '0,2'))
# see also redirect_stderr check in process_groups_from_parser()
redirect_stderr = boolean(get(section, 'redirect_stderr','false'))
numprocs = integer(get(section, 'numprocs', 1))
numprocs_start = integer(get(section, 'numprocs_start', 0))
environment_str = get(section, 'environment', '', do_expand=False)
stdout_cmaxbytes = byte_size(get(section,'stdout_capture_maxbytes','0'))
stdout_events = boolean(get(section, 'stdout_events_enabled','false'))
stderr_cmaxbytes = byte_size(get(section,'stderr_capture_maxbytes','0'))
stderr_events = boolean(get(section, 'stderr_events_enabled','false'))
serverurl = get(section, 'serverurl', None)
klass = ProcessConfig
programs = []
program_name = process_or_group_name(section.split(':', 1)[1])
host_node_name = platform.node()
common_expansions = {'here':self.here,
'program_name':program_name,
'host_node_name':host_node_name,
'group_name':group_name}
def get(section, opt, *args, **kwargs):
expansions = kwargs.get('expansions', {})
expansions.update(common_expansions)
kwargs['expansions'] = expansions
return parser.saneget(section, opt, *args, **kwargs)
priority = integer(get(section, 'priority', 999))
autostart = boolean(get(section, 'autostart', 'true'))
autorestart = auto_restart(get(section, 'autorestart', 'unexpected'))
startsecs = integer(get(section, 'startsecs', 1))
startretries = integer(get(section, 'startretries', 3))
stopsignal = signal_number(get(section, 'stopsignal', 'TERM'))
stopwaitsecs = integer(get(section, 'stopwaitsecs', 10))
stopasgroup = boolean(get(section, 'stopasgroup', 'false'))
killasgroup = boolean(get(section, 'killasgroup', stopasgroup))
exitcodes = list_of_exitcodes(get(section, 'exitcodes', '0,2'))
# see also redirect_stderr check in process_groups_from_parser()
redirect_stderr = boolean(get(section, 'redirect_stderr','false'))
numprocs = integer(get(section, 'numprocs', 1))
numprocs_start = integer(get(section, 'numprocs_start', 0))
environment_str = get(section, 'environment', '', do_expand=False)
stdout_cmaxbytes = byte_size(get(section,'stdout_capture_maxbytes','0'))
stdout_events = boolean(get(section, 'stdout_events_enabled','false'))