Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'networkx' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def create_graph_1(is_directed=False, return_nx=False):
g = nx.DiGraph() if is_directed else nx.Graph()
g.add_nodes_from([0, 1, 2, 3], label="movie")
g.add_nodes_from([4, 5], label="user")
g.add_edges_from([(4, 0), (4, 1), (5, 1), (4, 2), (5, 3)], label="rating")
if return_nx:
return nx.MultiDiGraph(g) if is_directed else nx.MultiGraph(g)
return StellarDiGraph(g) if is_directed else StellarGraph(g)
def fetch_balance_in(self, target: str, fee: bool=False) -> float:
"""
Return the value of current portfolio all in target asset. The transfer rate is computed from the most
profitable way possible. If there is no possible way to transfer an asset to the target one, an exception will
be raised.
Args:
target: Name of the target asset
fee: If exchange fee is considered when computing the portfolio value. Defaults to False.
Returns:
Portfolio value
"""
di_graph = nx.DiGraph()
multiplier = 1.0 - self._fee_rate / 100.0 if fee else 1.0
for symbol in self._symbols:
quote_name = self._quotes.get_ticker(symbol).quote_name
base_name = self._quotes.get_ticker(symbol).base_name
di_graph.add_edge(quote_name, base_name,
weight=-math.log(multiplier * self.__get_price(symbol, self._sell_price)))
di_graph.add_edge(base_name, quote_name,
weight=math.log(self.__get_price(symbol, self._buy_price) / multiplier))
balance = 0
for asset in self._total_balance:
if self._total_balance[asset]:
if asset == target:
balance += self._total_balance[asset]
def test_networkx_multidigraph_edge_attr(self):
print('\n---------- Multi-Digraph Edge Att Test Start -----------\n')
g = nx.MultiDiGraph()
g.add_node(1)
g.add_node(2)
g.add_node(3)
g.add_edge(1, 2)
g.add_edge(1, 2, attr_dict={'foo': 'bar'})
g.add_edge(1, 2)
g.add_edge(1, 3)
edges = g.edges(data=True, keys=True)
for edge in edges:
print(edge)
cyjs = util.from_networkx(g)
print(json.dumps(cyjs, indent=4))
edge = cyjs['elements']['edges'][0]
def has_path(G, source, target):
"""Return *True* if *G* has a path from *source* to *target*.
Parameters
----------
G : NetworkX graph
source : node
Starting node for path
target : node
Ending node for path
"""
try:
sp = nx.shortest_path(G, source, target)
except nx.NetworkXNoPath:
return False
return True
def edge(n1, n2, w):
if w in seen_w:
raise Exception("Duplicate edge weights in graph. For a unique MST, all edge weights have to be uniqe as well.")
seen_w.update({ w })
return (n1, n2, {'weight':w})
edge_list = list()
with open(file, 'r') as f:
edge_list = list( edge(ed.split()[0], ed.split()[1], int(ed.split()[2]))
for ed in
(e.strip() for e in f.readlines() if e.strip() != "")
if len(ed.split()) == 3 )
G = nx.Graph()
G.add_edges_from(edge_list)
self.graph = G
return G
tree=IntervalTree()
for fasta in fastas:
sample=os.path.basename(fasta)
idx.addsample(sample)
for i,t in enumerate(fasta_reader(fasta)):
name,seq=t
f,t=idx.addsequence(seq)
tree[f:t]=sample
if i==1:
logging.error("Can't handle multi-fasta input. Use single fasta file per sequence.")
sys.exit(1)
idx.construct()
G=nx.DiGraph()
G.graph['paths']=idx.samples
G.graph['path2id']=dict()
G.graph['id2path']=dict()
for sid,sample in enumerate(G.graph['paths']):
G.graph['path2id'][sample]=sid
G.graph['id2path'][sid]=sample
k=len(idx.samples)
T=idx.T
istart=tuple([-1]+[sep for sep in idx.nsep]) #no matches possible at these loci
iend=tuple([sep for sep in idx.nsep]+[idx.n-1]) #loci of sentinels, also no matches possible
startcoords=tuple([0]+[sep+1 for sep in idx.nsep])
G.add_node(istart,l=0)
if not column[g2]: continue
consistency_graph.add_node( g1 )
consistency_graph.add_node( g2 )
if column[g1] != column[g2]:
if options.loglevel >= 6:
options.stdlog.write("# column %i: inconsistency: %s - %i <---> %s - %i\n" % (c, identifiers[g1],column[g1],identifiers[g2],column[g2]))
ic.add( (identifiers[g1],) + tuple(identifiers[g1].split(options.separator)) )
ic.add( (identifiers[g2],) + tuple(identifiers[g2].split(options.separator)) )
is_inconsistent = True
else:
consistency_graph.add_edge( g1, g2 )
components = networkx.connected_components( consistency_graph )
if options.loglevel >= 6:
if is_inconsistent:
options.stdlog.write("# column %i: inconsistency for gene %s - %s\n" % (c, str(gene), str(components)))
component_sizes.append( len(components) )
# count maximum transripts per gene
if not ic: continue
max_counts = max( component_sizes )
inconsistent_columns.append( (c, max_counts, ic) )
if options.loglevel >= 1:
options.stdlog.write("# found %i inconsistent columns.\n" % len(inconsistent_columns) )
# Draw nodes
options = {
'marker': 'o',
's': 200,
'c': [0.85, 0.85, 1],
'facecolor': '0.5',
'lw': 0,
}
ax = plt.gca()
nodelist = list(g)
xy = numpy.asarray([pos[v] for v in nodelist])
node_collection = ax.scatter(xy[:, 0], xy[:, 1], **options)
node_collection.set_zorder(2)
# Draw edges
networkx.draw_networkx_edges(g, pos, arrows=False, edge_color='0.5')
# Draw labels
edge_labels = {(e[0], e[1]): e[2].get('label') for e in g.edges(data=True)}
networkx.draw_networkx_edge_labels(g, pos, edge_labels=edge_labels)
node_labels = {n[0]: n[1].get('label') for n in g.nodes(data=True)}
for key, label in node_labels.items():
if len(label) > 25:
parts = label.split(' ')
parts.insert(int(len(parts)/2), '\n')
label = ' '.join(parts)
node_labels[key] = label
networkx.draw_networkx_labels(g, pos, labels=node_labels)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
hamming_avg = float(line[6])
for neighbor in line[7].rstrip().split(','):
if len(neighbor) < 1:
continue
fields = neighbor.split('_')
neighbor_id = int(fields[0])
left = int(fields[1])
right = int(fields[2])
dist = int(fields[3])
G.add_edge(node_id, neighbor_id, color=colors[dist],weight=(left+right)/5)
plt.figure(figsize=(30,30))
nx.draw(G)#, pos, edges=edges, edge_color=colors, width=weights)
plt.savefig(out_file+"_basic.pdf")
edges = G.edges()
colors = [G[u][v]['color'] for u,v in edges]
weights = [G[u][v]['weight'] for u,v in edges]
plt.figure(figsize=(30,30))
pos = nx.spring_layout(G)
nx.draw(G, pos, edges=edges, edge_color=colors, width=weights, alpha=0.1, node_size=25)
plt.savefig(out_file+"_spring.pdf")
plt.figure(figsize=(18,18))
pos = nx.spectral_layout(G)
nx.draw(G, pos, edges=edges, edge_color=colors, width=weights)
plt.savefig(out_file+"_spectral.pdf")
# #print(G.node[b]['Base'])
# seq = seq + G.node[b]['Base']
# #print(seq)
# path_alternatives.append(seq)
# #Loop through and check the strings match
# for path in path_alternatives:
# if(path == transcripts[key]): print("Found path for: ", key)
#Order base in graph
#base_order = nx.topological_sort(G)
#Will crash if there is a cycle, therefore do a try
try:
base_order = nx.topological_sort(G)
except nx.NetworkXUnfeasible:
print("CYCLESSSSSS!!!!!!")
sys.exit()
seq =''
for index in base_order:
seq = seq + G.node[index]['Base']
print(seq)
#Save sequence to file
superf = open('Super.fasta','w')
superf.write('>Super\n')
superf.write(seq)
superf.close()