Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'gitdb' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def is_equal_canonical_sha(canonical_length, match, sha1):
"""
:return: True if the given lhs and rhs 20 byte binary shas
The comparison will take the canonical_length of the match sha into account,
hence the comparison will only use the last 4 bytes for uneven canonical representations
:param match: less than 20 byte sha
:param sha1: 20 byte sha"""
binary_length = canonical_length // 2
if match[:binary_length] != sha1[:binary_length]:
return False
if canonical_length - binary_length and \
(byte_ord(match[-1]) ^ byte_ord(sha1[len(match) - 1])) & 0xf0:
return False
# END handle uneven canonnical length
return True
def sha_to_index(self, sha):
"""
:return: index usable with the ``offset`` or ``entry`` method, or None
if the sha was not found in this pack index
:param sha: 20 byte sha to lookup"""
first_byte = byte_ord(sha[0])
get_sha = self.sha
lo = 0 # lower index, the left bound of the bisection
if first_byte != 0:
lo = self._fanout_table[first_byte - 1]
hi = self._fanout_table[first_byte] # the upper, right bound of the bisection
# bisect until we have the sha
while lo < hi:
mid = (lo + hi) // 2
mid_sha = get_sha(mid)
if sha < mid_sha:
hi = mid
elif sha == mid_sha:
return mid
else:
lo = mid + 1
def partial_sha_to_index(self, partial_bin_sha, canonical_length):
"""
:return: index as in `sha_to_index` or None if the sha was not found in this
index file
:param partial_bin_sha: an at least two bytes of a partial binary sha as bytes
:param canonical_length: length of the original hexadecimal representation of the
given partial binary sha
:raise AmbiguousObjectName:"""
if len(partial_bin_sha) < 2:
raise ValueError("Require at least 2 bytes of partial sha")
assert isinstance(partial_bin_sha, bytes), "partial_bin_sha must be bytes"
first_byte = byte_ord(partial_bin_sha[0])
get_sha = self.sha
lo = 0 # lower index, the left bound of the bisection
if first_byte != 0:
lo = self._fanout_table[first_byte - 1]
hi = self._fanout_table[first_byte] # the upper, right bound of the bisection
# fill the partial to full 20 bytes
filled_sha = partial_bin_sha + NULL_BYTE * (20 - len(partial_bin_sha))
# find lowest
while lo < hi:
mid = (lo + hi) // 2
mid_sha = get_sha(mid)
if filled_sha < mid_sha:
hi = mid
commits = list()
nc = 5000
st = time()
for i in xrange(nc):
cm = Commit( rwrepo, Commit.NULL_BIN_SHA, hc.tree,
hc.author, hc.authored_date, hc.author_tz_offset,
hc.committer, hc.committed_date, hc.committer_tz_offset,
str(i), parents=hc.parents, encoding=hc.encoding)
stream = StringIO()
cm._serialize(stream)
slen = stream.tell()
stream.seek(0)
cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha
# END commit creation
elapsed = time() - st
print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed)
lfd.commit()
lfd.rollback()
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
assert not os.path.isfile(lockfilepath)
wfdstream = lfd.open(write=True, stream=True) # this time as stream
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
# try non-existing file for reading
lfd = LockedFD(tempfile.mktemp())
try:
lfd.open(write=False)
except OSError:
assert not os.path.exists(lfd._lockfilepath())
else:
self.fail("expected OSError")
# END handle exceptions
lfd.rollback()
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
assert not os.path.isfile(lockfilepath)
wfdstream = lfd.open(write=True, stream=True) # this time as stream
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
assert my_istream is istream
assert db.has_object(sha) != dry_run
assert len(sha) == 20
# verify data - the slow way, we want to run code
if not dry_run:
info = db.info(sha)
assert str_blob_type == info.type
assert info.size == len(data)
ostream = db.stream(sha)
assert ostream.read() == data
assert ostream.type == str_blob_type
assert ostream.size == len(data)
else:
self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.stream, sha)
# DIRECT STREAM COPY
# our data hase been written in object format to the StringIO
# we pasesd as output stream. No physical database representation
# was created.
# Test direct stream copy of object streams, the result must be
# identical to what we fed in
ostream.seek(0)
istream.stream = ostream
assert istream.binsha is not None
prev_sha = istream.binsha
db.set_ostream(ZippedStoreShaWriter())
db.store(istream)
assert istream.binsha == prev_sha
def test_reading(self):
gdb = GitDB(fixture_path('../../.git/objects'))
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
# access should be possible
gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
assert isinstance(gdb.info(gitdb_sha), OInfo)
assert isinstance(gdb.stream(gitdb_sha), OStream)
assert gdb.size() > 200
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
# This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list):
assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
for randomize in range(2):
desc = (randomize and 'random ') or ''
print >> sys.stderr, "Creating %s data ..." % desc
st = time()
size, stream = make_memory_file(self.large_data_size_bytes, randomize)
elapsed = time() - st
print >> sys.stderr, "Done (in %f s)" % elapsed
# writing - due to the compression it will seem faster than it is
st = time()
binsha = ldb.store(IStream('blob', size, stream)).binsha
elapsed_add = time() - st
assert ldb.has_object(binsha)
db_file = ldb.readable_db_object_path(bin_to_hex(binsha))
fsize_kib = os.path.getsize(db_file) / 1000
size_kib = size / 1000
print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
# reading all at once
st = time()
ostream = ldb.stream(binsha)
shadata = ostream.read()
elapsed_readall = time() - st
stream.seek(0)
assert shadata == stream.getvalue()
print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)