Cloud Defense Logo

Products

Solutions

Company

Book A Live Demo

Top 10 Examples of "gitdb in functional component" in Python

Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'gitdb' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.

def is_equal_canonical_sha(canonical_length, match, sha1):
    """
    :return: True if the given lhs and rhs 20 byte binary shas
        The comparison will take the canonical_length of the match sha into account,
        hence the comparison will only use the last 4 bytes for uneven canonical representations
    :param match: less than 20 byte sha
    :param sha1: 20 byte sha"""
    binary_length = canonical_length // 2
    if match[:binary_length] != sha1[:binary_length]:
        return False

    if canonical_length - binary_length and \
            (byte_ord(match[-1]) ^ byte_ord(sha1[len(match) - 1])) & 0xf0:
        return False
    # END handle uneven canonnical length
    return True
def sha_to_index(self, sha):
        """
        :return: index usable with the ``offset`` or ``entry`` method, or None
            if the sha was not found in this pack index
        :param sha: 20 byte sha to lookup"""
        first_byte = byte_ord(sha[0])
        get_sha = self.sha
        lo = 0                  # lower index, the left bound of the bisection
        if first_byte != 0:
            lo = self._fanout_table[first_byte - 1]
        hi = self._fanout_table[first_byte]     # the upper, right bound of the bisection

        # bisect until we have the sha
        while lo < hi:
            mid = (lo + hi) // 2
            mid_sha = get_sha(mid)
            if sha < mid_sha:
                hi = mid
            elif sha == mid_sha:
                return mid
            else:
                lo = mid + 1
def partial_sha_to_index(self, partial_bin_sha, canonical_length):
        """
        :return: index as in `sha_to_index` or None if the sha was not found in this
            index file
        :param partial_bin_sha: an at least two bytes of a partial binary sha as bytes
        :param canonical_length: length of the original hexadecimal representation of the
            given partial binary sha
        :raise AmbiguousObjectName:"""
        if len(partial_bin_sha) < 2:
            raise ValueError("Require at least 2 bytes of partial sha")

        assert isinstance(partial_bin_sha, bytes), "partial_bin_sha must be bytes"
        first_byte = byte_ord(partial_bin_sha[0])

        get_sha = self.sha
        lo = 0                  # lower index, the left bound of the bisection
        if first_byte != 0:
            lo = self._fanout_table[first_byte - 1]
        hi = self._fanout_table[first_byte]     # the upper, right bound of the bisection

        # fill the partial to full 20 bytes
        filled_sha = partial_bin_sha + NULL_BYTE * (20 - len(partial_bin_sha))

        # find lowest
        while lo < hi:
            mid = (lo + hi) // 2
            mid_sha = get_sha(mid)
            if filled_sha < mid_sha:
                hi = mid
commits = list()
		nc = 5000
		st = time()
		for i in xrange(nc):
			cm = Commit(	rwrepo, Commit.NULL_BIN_SHA, hc.tree, 
							hc.author, hc.authored_date, hc.author_tz_offset, 
							hc.committer, hc.committed_date, hc.committer_tz_offset, 
							str(i), parents=hc.parents, encoding=hc.encoding)
			
			stream = StringIO()
			cm._serialize(stream)
			slen = stream.tell()
			stream.seek(0)
			
			cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha
		# END commit creation
		elapsed = time() - st
		
		print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed)
lfd.commit()
			lfd.rollback()
			
			# test reading
			lfd = LockedFD(my_file)
			rfd = lfd.open(write=False)
			assert os.read(rfd, len(orig_data)) == orig_data
			
			assert os.path.isfile(lockfilepath)
			# deletion rolls back
			del(lfd)
			assert not os.path.isfile(lockfilepath)
			
			
			# write data - concurrently
			lfd = LockedFD(my_file)
			olfd = LockedFD(my_file)
			assert not os.path.isfile(lockfilepath)
			wfdstream = lfd.open(write=True, stream=True)		# this time as stream
			assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
		
		# try non-existing file for reading
		lfd = LockedFD(tempfile.mktemp())
		try:
			lfd.open(write=False)
		except OSError:
			assert not os.path.exists(lfd._lockfilepath())
		else:
			self.fail("expected OSError")
		# END handle exceptions
lfd.rollback()
			
			# test reading
			lfd = LockedFD(my_file)
			rfd = lfd.open(write=False)
			assert os.read(rfd, len(orig_data)) == orig_data
			
			assert os.path.isfile(lockfilepath)
			# deletion rolls back
			del(lfd)
			assert not os.path.isfile(lockfilepath)
			
			
			# write data - concurrently
			lfd = LockedFD(my_file)
			olfd = LockedFD(my_file)
			assert not os.path.isfile(lockfilepath)
			wfdstream = lfd.open(write=True, stream=True)		# this time as stream
			assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
assert my_istream is istream
				assert db.has_object(sha) != dry_run
				assert len(sha) == 20	
				
				# verify data - the slow way, we want to run code
				if not dry_run:
					info = db.info(sha)
					assert str_blob_type == info.type
					assert info.size == len(data)
					
					ostream = db.stream(sha)
					assert ostream.read() == data
					assert ostream.type == str_blob_type
					assert ostream.size == len(data)
				else:
					self.failUnlessRaises(BadObject, db.info, sha)
					self.failUnlessRaises(BadObject, db.stream, sha)
					
					# DIRECT STREAM COPY
					# our data hase been written in object format to the StringIO
					# we pasesd as output stream. No physical database representation
					# was created.
					# Test direct stream copy of object streams, the result must be 
					# identical to what we fed in
					ostream.seek(0)
					istream.stream = ostream
					assert istream.binsha is not None
					prev_sha = istream.binsha
					
					db.set_ostream(ZippedStoreShaWriter())
					db.store(istream)
					assert istream.binsha == prev_sha
def test_reading(self):
		gdb = GitDB(fixture_path('../../.git/objects'))
		
		# we have packs and loose objects, alternates doesn't necessarily exist
		assert 1 < len(gdb.databases()) < 4
		
		# access should be possible
		gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
		assert isinstance(gdb.info(gitdb_sha), OInfo)
		assert isinstance(gdb.stream(gitdb_sha), OStream)
		assert gdb.size() > 200
		sha_list = list(gdb.sha_iter())
		assert len(sha_list) == gdb.size()
		
		
		# This is actually a test for compound functionality, but it doesn't 
		# have a separate test module
		# test partial shas
		# this one as uneven and quite short
		assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
		
		# mix even/uneven hexshas
		for i, binsha in enumerate(sha_list):
			assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
		
		for randomize in range(2):
			desc = (randomize and 'random ') or ''
			print >> sys.stderr, "Creating %s data ..." % desc
			st = time()
			size, stream = make_memory_file(self.large_data_size_bytes, randomize)
			elapsed = time() - st
			print >> sys.stderr, "Done (in %f s)" % elapsed
			
			# writing - due to the compression it will seem faster than it is 
			st = time()
			binsha = ldb.store(IStream('blob', size, stream)).binsha
			elapsed_add = time() - st
			assert ldb.has_object(binsha)
			db_file = ldb.readable_db_object_path(bin_to_hex(binsha))
			fsize_kib = os.path.getsize(db_file) / 1000
			
			
			size_kib = size / 1000
			print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
			
			# reading all at once
			st = time()
			ostream = ldb.stream(binsha)
			shadata = ostream.read()
			elapsed_readall = time() - st
			
			stream.seek(0)
			assert shadata == stream.getvalue()
			print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)

Is your System Free of Underlying Vulnerabilities?
Find Out Now