Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'atomicwrites' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
src_dir = options['directory'] / options['source_dir']
dest_dir = options['directory'] / options['build_dir']
if not dest_dir.exists():
dest_dir.mkdir()
default_args = ['-r']
if not tag:
pattern = '*{}'.format(options['extension'])
tag = (path.stem for path in src_dir.glob(pattern))
for tag_name in tag:
src = src_dir / ''.join((tag_name, options['extension']))
dest = dest_dir / '{}.txt'.format(tag_name)
console.info('building {}', click.format_filename(str(dest)))
args = default_args[:]
args += [str(src)]
args += list(pip_compile_options)
with atomicwrites.AtomicWriter(str(dest), 'w', True).open() as f:
f.write(reqwire.scaffold.MODELINES_HEADER)
with tempfile.NamedTemporaryFile() as temp_file:
args += ['-o', temp_file.name]
sh.pip_compile(*args, _out=f, _tty_out=False)
text, encoding = encode(text, encoding)
if 'a' in mode:
with open(filename, mode) as textfile:
textfile.write(text)
else:
# Based in the solution at untitaker/python-atomicwrites#42.
# Needed to fix file permissions overwritting.
# See spyder-ide/spyder#9381.
try:
original_mode = os.stat(filename).st_mode
except OSError: # Change to FileNotFoundError for PY3
# Creating a new file, emulate what os.open() does
umask = os.umask(0)
os.umask(umask)
original_mode = 0o777 & ~umask
with atomic_write(filename,
overwrite=True,
mode=mode) as textfile:
textfile.write(text)
os.chmod(filename, original_mode)
return encoding
def _save_token(token):
checkdir(expand_path(os.path.dirname(token_file)), create=True)
with atomic_write(token_file, mode='w', overwrite=True) as f:
json.dump(token, f)
upload_dircap_ro = upload_dircap_json[1]["ro_uri"]
yield self.link(admin_dircap, "admin", upload_dircap_ro)
yaml_path = os.path.join(self.nodedir, "private", "magic_folders.yaml")
try:
with open(yaml_path) as f:
yaml_data = yaml.safe_load(f)
except OSError:
yaml_data = {}
folders_data = yaml_data.get("magic-folders", {})
folders_data[os.path.basename(path)] = {
"directory": path,
"collective_dircap": collective_dircap,
"upload_dircap": upload_dircap,
"poll_interval": poll_interval,
}
with atomic_write(yaml_path, mode="w", overwrite=True) as f:
f.write(yaml.safe_dump({"magic-folders": folders_data}))
self.add_alias(alias, admin_dircap)
lambda: self.find_element_in_any_frame(
By.XPATH,
'//input[@type="image" and contains(@title, "download")]'
))
download_link.click()
logger.info('%s: Waiting to get download', document_str)
download_result, = self.wait_and_return(
self.get_downloaded_file)
name, data = download_result
if len(data) < 5000:
raise RuntimeError(
'Downloaded file size is invalid: %d' % len(data))
output_name = '%s.statement-%s.pdf' % (
pay_date.strftime('%Y-%m-%d'), document_number)
output_path = os.path.join(self.output_directory, output_name)
with atomic_write(output_path, mode='wb') as f:
f.write(data)
downloaded_statements.add((pay_date, document_number))
return True
else:
logger.info('%s: Just downloaded', document_str)
return False
def _fetch_html_pages(self, need_to_fetch: List[Tuple[str, str]]):
logger.info('Fetching details for %d purchases', len(need_to_fetch))
for i, (purchase_id, html_path) in enumerate(need_to_fetch):
url = 'https://myaccount.google.com/purchases/detail?order_id=' + purchase_id
logger.info('Fetching details %d/%d: %s', i, len(need_to_fetch), url)
with self.wait_for_page_load():
self.driver.get(url)
content = self.driver.page_source
with atomic_write(
html_path, mode='w', encoding='utf-8', newline='\n') as f:
# Write with Unicode Byte Order Mark to ensure content will be properly interpreted as UTF-8
f.write('\ufeff' + content)
logger.info('Write details %d/%d: %s', i, len(need_to_fetch), html_path)
def _download_assembly_mapping_data(
self, destination, chroms, source_assembly, target_assembly, retries
):
with atomic_write(destination, mode="wb", overwrite=True) as f:
with tarfile.open(fileobj=f, mode="w:gz") as out_tar:
for chrom in chroms:
file = chrom + ".json"
map_endpoint = "/map/human/{}/{}/{}?".format(
source_assembly, chrom, target_assembly
)
# get assembly mapping data
response = None
retry = 0
while response is None and retry < retries:
response = self._ensembl_rest_client.perform_rest_action(
map_endpoint
)
retry += 1
newline='\n') as f:
f.write(json.dumps(transaction, indent=' '))
continue
details_url = (
'https://www.paypal.com/myaccount/transactions/details/' +
transaction_id)
inline_details_url = (
'https://www.paypal.com/myaccount/transactions/details/inline/'
+ transaction_id)
html_path = output_prefix + '.html'
json_path = output_prefix + '.json'
if not os.path.exists(html_path):
logging.info('Retrieving HTML %s', details_url)
html_resp = self.driver.request('GET', details_url)
html_resp.raise_for_status()
with atomic_write(
html_path, mode='w', encoding='utf-8',
newline='\n') as f:
# Write with Unicode Byte Order Mark to ensure content will be properly interpreted as UTF-8
f.write('\ufeff' + html_resp.text)
if not os.path.exists(json_path):
logging.info('Retrieving JSON %s', inline_details_url)
json_resp = self.make_json_request(inline_details_url)
json_resp.raise_for_status()
j = json_resp.json()
jsonschema.validate(j, transaction_details_schema)
with atomic_write(json_path, mode='wb') as f:
f.write(
json.dumps(j['data']['details'], indent=' ').encode())
def atomic_write_on_fs_tmp(path, **kwargs):
"""Creates an atomic writer using a temporary file in a temporary directory
on the same filesystem as path.
"""
# TODO(mgraczyk): This use of AtomicWriter relies on implementation details to set the temp
# directory.
writer = AtomicWriter(path, **kwargs)
return writer._open(_get_fileobject_func(writer, get_tmpdir_on_same_filesystem(path)))
def atomic_write_in_dir(path, **kwargs):
"""Creates an atomic writer using a temporary file in the same directory
as the destination file.
"""
writer = AtomicWriter(path, **kwargs)
return writer._open(_get_fileobject_func(writer, os.path.dirname(path)))