Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'portalocker' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
client = request.client and request.client.replace(':', '.')
if response.session_id_name in request.cookies:
response.session_id = \
request.cookies[response.session_id_name].value
if regex_session_id.match(response.session_id):
response.session_filename = \
os.path.join(up(request.folder), masterapp,
'sessions', response.session_id)
else:
response.session_id = None
if response.session_id:
try:
response.session_file = \
open(response.session_filename, 'rb+')
try:
portalocker.lock(response.session_file,portalocker.LOCK_EX)
response.session_locked = True
self.update(cPickle.load(response.session_file))
response.session_file.seek(0)
oc = response.session_filename.split('/')[-1].split('-')[0]
if check_client and client!=oc:
raise Exception, "cookie attack"
finally:
pass
#This causes admin login to break. Must find out why.
#self._close(response)
except:
response.session_id = None
if not response.session_id:
uuid = web2py_uuid()
response.session_id = '%s-%s' % (client, uuid)
if separate:
def open_file_safe(file_path: str, mode: str = "r+"):
# Use portalocker package for file locking if available,
# otherwise print a message to install the package.
try:
import portalocker # type: ignore
file_open_func = portalocker.Lock
file_open_args = [file_path]
file_open_kwargs = {"mode": mode, "flags": portalocker.LOCK_EX}
file_handle = file_open_func(*file_open_args, **file_open_kwargs)
return file_handle
except ImportError:
print(
"The conversion script is missing a required package: portalocker. Please run "
"python -m pip install -r requirements.txt to install the missing dependency."
)
exit(1)
def findT(path, language=DEFAULT_LANGUAGE):
"""
must be run by the admin app
"""
lang_file = pjoin(path, 'languages', language + '.py')
sentences = read_dict(lang_file)
mp = pjoin(path, 'models')
cp = pjoin(path, 'controllers')
vp = pjoin(path, 'views')
mop = pjoin(path, 'modules')
for filename in \
listdir(mp, '^.+\.py$', 0) + listdir(cp, '^.+\.py$', 0)\
+ listdir(vp, '^.+\.html$', 0) + listdir(mop, '^.+\.py$', 0):
data = read_locked(filename)
items = regex_translate.findall(data)
for item in items:
try:
message = safe_eval(item)
except:
continue # silently ignore inproperly formatted strings
if not message.startswith('#') and not '\n' in message:
tokens = message.rsplit('##', 1)
else:
# this allows markmin syntax in translations
tokens = [message]
if len(tokens) == 2:
message = tokens[0].strip() + '##' + tokens[1].strip()
if message and not message in sentences:
sentences[message] = message
if not '!langcode!' in sentences:
def findT(path, language='en-us'):
"""
must be run by the admin app
"""
filename = os.path.join(path, 'languages', '%s.py' % language)
sentences = read_dict(filename)
mp = os.path.join(path, 'models')
cp = os.path.join(path, 'controllers')
vp = os.path.join(path, 'views')
for file in listdir(mp, '.+\.py', 0) + listdir(cp, '.+\.py', 0)\
+ listdir(vp, '.+\.html', 0):
fp = open(file, 'r')
portalocker.lock(fp, portalocker.LOCK_SH)
data = fp.read()
portalocker.unlock(fp)
fp.close()
items = regex_translate.findall(data)
for item in items:
try:
message = eval(item)
if not message.startswith('#') and not '\n' in message:
tokens = message.rsplit('##', 1)
else:
# this allows markmin syntax in translations
tokens = [message]
if len(tokens) == 2:
message = tokens[0].strip() + '##' + tokens[1].strip()
if message and not message in sentences:
sentences[message] = message
sys.exit(0)
try:
with TelemetryNote(config_dir) as telemetry_note:
telemetry_note.touch()
collection = RecordsCollection(telemetry_note.get_last_sent(), config_dir)
collection.snapshot_and_read()
client = CliTelemetryClient()
for each in collection:
client.add(each, flush=True)
client.flush(force=True)
telemetry_note.update_telemetry_note(collection.next_send)
except portalocker.AlreadyLocked:
# another upload process is running.
logger.info('Lock out from note file under %s which means another process is running. Exit 0.', config_dir)
sys.exit(0)
except IOError as err:
logger.warning('Unexpected IO Error %s. Exit 1.', err)
sys.exit(1)
except Exception as err: # pylint: disable=broad-except
logger.error('Unexpected Error %s. Exit 2.', err)
logger.exception(err)
sys.exit(2)
except IndexError:
sys.exit(1)
stop == 0 if job started but did not yet complete
if a cron job started within less than 60 seconds, acquire returns None
if a cron job started before 60 seconds and did not stop,
a warning is issue "Stale cron.master detected"
"""
if sys.platform == 'win32':
locktime = 59.5
else:
locktime = 59.99
if portalocker.LOCK_EX is None:
logger.warning('WEB2PY CRON: Disabled because no file locking')
return None
self.master = open(self.path, 'rb+')
try:
ret = None
portalocker.lock(self.master, portalocker.LOCK_EX)
try:
(start, stop) = pickle.load(self.master)
except:
(start, stop) = (0, 1)
if startup or self.now - start > locktime:
ret = self.now
if not stop:
# this happens if previous cron job longer than 1 minute
logger.warning('WEB2PY CRON: Stale cron.master detected')
logger.debug('WEB2PY CRON: Acquiring lock')
self.master.seek(0)
pickle.dump((self.now, 0), self.master)
self.master.flush()
finally:
portalocker.unlock(self.master)
if not ret:
def pex_lockfile(basepath, uniquedir):
# Acquire the lockfile.
lockfile_path = os.path.join(basepath, '.lock-%s' % uniquedir)
lockfile = open(lockfile_path, "a+")
# Block until we can acquire the lockfile.
portalocker.lock(lockfile, portalocker.LOCK_EX)
lockfile.seek(0)
yield lockfile
portalocker.lock(lockfile, portalocker.LOCK_UN)
def lock(self):
if not self.locked:
portalocker.lock(self._f, portalocker.LOCK_EX)
self.locked = True
return True
else:
return False
def get(self, count: int=1, readOnly: bool=False, recurlvl=0):
with open(self.FILE, "r+") as file:
portalocker.lock(file, portalocker.LOCK_EX)
ports = []
while len(ports) < count:
file.seek(0)
port = int(file.readline())
if readOnly:
return port
port += 1
if port > self.maxPort:
port = self.minPort
file.seek(0)
file.write(str(port))
try:
checkPortAvailable(("", port))
ports.append(port)
self.logger.debug("new port dispensed: {}".format(port))
except: