Module VITAE.inference
Expand source code
import warnings
#from typing import Optional
import numpy as np
import networkx as nx
class Inferer(object):
'''
The class for doing inference based on posterior estimations.
'''
def __init__(self, n_states: int):
'''
Parameters
----------
n_states : int
The number of vertices in the latent space.
'''
self.n_states = n_states
self.n_categories = int(n_states*(n_states+1)/2)
# self.A, self.B = np.nonzero(np.triu(np.ones(n_states)))
## indicator of the catagories
self.C = np.triu(np.ones(n_states))
self.C[self.C>0] = np.arange(self.n_categories)
self.C = self.C.astype(int)
def build_graphs(self, w_tilde, pc_x, method: str = 'mean', thres: float = 0.5, no_loop: bool = False,
cutoff = 0):
'''Build the backbone.
Parameters
----------
pc_x : np.array
\([N, K]\) The estimated \(p(c_i|Y_i,X_i)\).
method : string, optional
'mean', 'modified_mean', 'map', or 'modified_map'.
thres : float, optional
The threshold used for filtering edges \(e_{ij}\) that \((n_{i}+n_{j}+e_{ij})/N<thres\), only applied to mean method.
Retruns
----------
G : nx.Graph
The graph of edge scores.
'''
self.no_loop = no_loop
# self.w_tilde = w_tilde
graph = np.zeros((self.n_states,self.n_states))
if method=='mean':
for i in range(self.n_states-1):
for j in range(i+1,self.n_states):
idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres
if np.sum(idx)>0:
graph[i,j] = np.mean(pc_x[idx,self.C[i,j]]/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]], axis=-1))
elif method=='modified_mean':
for i in range(self.n_states-1):
for j in range(i+1,self.n_states):
idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres
if np.sum(idx)>0:
graph[i,j] = np.sum(pc_x[idx,self.C[i,j]])/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]])
elif method=='map':
c = np.argmax(pc_x, axis=-1)
for i in range(self.n_states-1):
for j in range(i+1,self.n_states):
if np.sum(c==self.C[i,j])>0:
graph[i,j] = np.sum(c==self.C[i,j])/np.sum((c==self.C[i,j])|(c==self.C[i,i])|(c==self.C[j,j]))
elif method=='modified_map':
c = np.argmax(pc_x, axis=-1)
for i in range(self.n_states-1):
for j in range(i+1,self.n_states):
graph[i,j] = np.sum(c==self.C[i,j])/(np.sum((w_tilde[:,i]>0.5)|(w_tilde[:,j]>0.5))+1e-16)
elif method=='raw_map':
c = np.argmax(pc_x, axis=-1)
for i in range(self.n_states-1):
for j in range(i+1,self.n_states):
if np.sum(c==self.C[i,j])>0:
graph[i,j] = np.sum(c==self.C[i,j])/np.sum(np.isin(c, np.diagonal(self.C)) == False)
elif method == "w_base":
for i in range(self.n_states):
for j in range(i+1,self.n_states):
two_vertice_max_w = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j),:]
num_two_vertice = two_vertice_max_w.shape[0]
if num_two_vertice > 0:
graph[i, j] = np.sum(
np.abs(two_vertice_max_w[:, i] - two_vertice_max_w[:, j]) < 0.1) / num_two_vertice
elif method == "modified_w_base":
top2_idx = np.argpartition(w_tilde, -2, axis=1)[:, -2:]
for i in range(self.n_states):
for j in range(i + 1, self.n_states):
two_vertice_max_w = np.all(top2_idx == [i, j], axis=1) | np.all(top2_idx == [j, i], axis=1)
two_vertice_max_w = w_tilde[two_vertice_max_w, :]
vertice_count = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j), :]
vertice_count = vertice_count.shape[0]
if vertice_count > 0:
edge_count = \
np.max((two_vertice_max_w[:, i], two_vertice_max_w[:, j]), axis=0) \
/ (two_vertice_max_w[:, i] + two_vertice_max_w[:, j])
edge_count = np.sum(edge_count < 0.55)
graph[i, j] = edge_count / vertice_count
else:
raise ValueError("Invalid method, must be one of 'mean', 'modified_mean', 'map', 'modified_map','raw_map','w_base', and 'modified_w_base'.")
graph[graph<=cutoff] = 0
G = nx.from_numpy_array(graph)
if self.no_loop and not nx.is_tree(G):
# prune if there are no loops
G = nx.maximum_spanning_tree(G)
return G
def modify_wtilde(self, w_tilde, edges):
'''Project \(\\tilde{w}\) to the estimated backbone.
Parameters
----------
w_tilde : np.array
\([N, k]\) The estimated \(\\tilde{w}\).
edges : np.array
\([|\\mathcal{E}(\\widehat{\\mathcal{B}})|, 2]\).
Retruns
----------
w : np.array
The projected \(\\tilde{w}\).
'''
w = np.zeros_like(w_tilde)
# projection on nodes
best_proj_err_node = np.sum(w_tilde**2, axis=-1) - 2*np.max(w_tilde, axis=-1) +1
best_proj_err_node_ind = np.argmax(w_tilde, axis=-1)
if len(edges)>0:
# projection on edges
idc = np.tile(np.arange(w.shape[0]), (2,1)).T
ide = edges[np.argmax(np.sum(w_tilde[:,edges], axis=-1)**2 -
4 * np.prod(w_tilde[:,edges], axis=-1) +
2 * np.sum(w_tilde[:,edges], axis=-1), axis=-1)]
w[idc, ide] = w_tilde[idc, ide] + (1-np.sum(w_tilde[idc, ide], axis=-1, keepdims=True))/2
best_proj_err_edge = np.sum(w_tilde**2, axis=-1) - np.sum(w_tilde[idc, ide]**2, axis=-1) + (1-np.sum(w_tilde[idc, ide], axis=-1))**2/2
idc = (best_proj_err_node<best_proj_err_edge)
w[idc,:] = np.eye(w_tilde.shape[-1])[best_proj_err_node_ind[idc]]
else:
idc = np.arange(w.shape[0])
w[idc, best_proj_err_node_ind] = 1
return w
def build_milestone_net(self, subgraph, init_node: int):
'''Build the milestone network.
Parameters
----------
subgraph : nx.Graph
The connected component of the backbone given the root vertex.
init_node : int
The root vertex.
Returns
----------
df_subgraph : pd.DataFrame
The milestone network.
'''
if len(subgraph)==1:
warnings.warn('Singular node.')
return []
elif nx.is_directed_acyclic_graph(subgraph):
milestone_net = []
for edge in list(subgraph.edges):
if edge[0]==init_node:
dist = 1
elif edge[1]==init_node:
paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0])
dist = - (np.max([len(p) for p in paths_1]) - 1)
else:
paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0])
paths_1 = nx.all_simple_paths(subgraph, source=init_node, target=edge[1])
dist = np.max([len(p) for p in paths_1]) - np.max([len(p) for p in paths_0])
milestone_net.append([edge[0], edge[1], dist])
else:
# Dijkstra's Algorithm to find the shortest path
unvisited = {node: {'parent':None,
'score':np.inf,
'distance':np.inf} for node in subgraph.nodes}
current = init_node
currentScore = 0
currentDistance = 0
unvisited[current]['score'] = currentScore
milestone_net = []
while True:
for neighbour in subgraph.neighbors(current):
if neighbour not in unvisited: continue
newScore = currentScore + subgraph[current][neighbour]['weight']
if unvisited[neighbour]['score'] > newScore:
unvisited[neighbour]['score'] = newScore
unvisited[neighbour]['parent'] = current
unvisited[neighbour]['distance'] = currentDistance+1
if len(unvisited)<len(subgraph):
milestone_net.append([unvisited[current]['parent'],
current,
unvisited[current]['distance']])
del unvisited[current]
if not unvisited: break
current, currentScore, currentDistance = \
sorted([(i[0],i[1]['score'],i[1]['distance']) for i in unvisited.items()],
key = lambda x: x[1])[0]
return np.array(milestone_net)
def comp_pseudotime(self, milestone_net, init_node: int, w):
'''Compute pseudotime.
Parameters
----------
milestone_net : pd.DataFrame
The milestone network.
init_node : int
The root vertex.
w : np.array
\([N, k]\) The projected \(\\tilde{w}\).
Returns
----------
pseudotime : np.array
\([N, k]\) The estimated pseudtotime.
'''
pseudotime = np.empty(w.shape[0])
pseudotime.fill(np.nan)
pseudotime[w[:,init_node]==1] = 0
if len(milestone_net)>0:
for i in range(len(milestone_net)):
_from, _to = milestone_net[i,:2]
_from, _to = int(_from), int(_to)
idc = ((w[:,_from]>0)&(w[:,_to]>0)) | (w[:,_to]==1)
pseudotime[idc] = w[idc,_to] + milestone_net[i,-1] - 1
return pseudotime
Classes
class Inferer (n_states: int)
-
The class for doing inference based on posterior estimations.
Parameters
n_states
:int
- The number of vertices in the latent space.
Expand source code
class Inferer(object): ''' The class for doing inference based on posterior estimations. ''' def __init__(self, n_states: int): ''' Parameters ---------- n_states : int The number of vertices in the latent space. ''' self.n_states = n_states self.n_categories = int(n_states*(n_states+1)/2) # self.A, self.B = np.nonzero(np.triu(np.ones(n_states))) ## indicator of the catagories self.C = np.triu(np.ones(n_states)) self.C[self.C>0] = np.arange(self.n_categories) self.C = self.C.astype(int) def build_graphs(self, w_tilde, pc_x, method: str = 'mean', thres: float = 0.5, no_loop: bool = False, cutoff = 0): '''Build the backbone. Parameters ---------- pc_x : np.array \([N, K]\) The estimated \(p(c_i|Y_i,X_i)\). method : string, optional 'mean', 'modified_mean', 'map', or 'modified_map'. thres : float, optional The threshold used for filtering edges \(e_{ij}\) that \((n_{i}+n_{j}+e_{ij})/N<thres\), only applied to mean method. Retruns ---------- G : nx.Graph The graph of edge scores. ''' self.no_loop = no_loop # self.w_tilde = w_tilde graph = np.zeros((self.n_states,self.n_states)) if method=='mean': for i in range(self.n_states-1): for j in range(i+1,self.n_states): idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres if np.sum(idx)>0: graph[i,j] = np.mean(pc_x[idx,self.C[i,j]]/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]], axis=-1)) elif method=='modified_mean': for i in range(self.n_states-1): for j in range(i+1,self.n_states): idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres if np.sum(idx)>0: graph[i,j] = np.sum(pc_x[idx,self.C[i,j]])/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]]) elif method=='map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): if np.sum(c==self.C[i,j])>0: graph[i,j] = np.sum(c==self.C[i,j])/np.sum((c==self.C[i,j])|(c==self.C[i,i])|(c==self.C[j,j])) elif method=='modified_map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): graph[i,j] = np.sum(c==self.C[i,j])/(np.sum((w_tilde[:,i]>0.5)|(w_tilde[:,j]>0.5))+1e-16) elif method=='raw_map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): if np.sum(c==self.C[i,j])>0: graph[i,j] = np.sum(c==self.C[i,j])/np.sum(np.isin(c, np.diagonal(self.C)) == False) elif method == "w_base": for i in range(self.n_states): for j in range(i+1,self.n_states): two_vertice_max_w = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j),:] num_two_vertice = two_vertice_max_w.shape[0] if num_two_vertice > 0: graph[i, j] = np.sum( np.abs(two_vertice_max_w[:, i] - two_vertice_max_w[:, j]) < 0.1) / num_two_vertice elif method == "modified_w_base": top2_idx = np.argpartition(w_tilde, -2, axis=1)[:, -2:] for i in range(self.n_states): for j in range(i + 1, self.n_states): two_vertice_max_w = np.all(top2_idx == [i, j], axis=1) | np.all(top2_idx == [j, i], axis=1) two_vertice_max_w = w_tilde[two_vertice_max_w, :] vertice_count = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j), :] vertice_count = vertice_count.shape[0] if vertice_count > 0: edge_count = \ np.max((two_vertice_max_w[:, i], two_vertice_max_w[:, j]), axis=0) \ / (two_vertice_max_w[:, i] + two_vertice_max_w[:, j]) edge_count = np.sum(edge_count < 0.55) graph[i, j] = edge_count / vertice_count else: raise ValueError("Invalid method, must be one of 'mean', 'modified_mean', 'map', 'modified_map','raw_map','w_base', and 'modified_w_base'.") graph[graph<=cutoff] = 0 G = nx.from_numpy_array(graph) if self.no_loop and not nx.is_tree(G): # prune if there are no loops G = nx.maximum_spanning_tree(G) return G def modify_wtilde(self, w_tilde, edges): '''Project \(\\tilde{w}\) to the estimated backbone. Parameters ---------- w_tilde : np.array \([N, k]\) The estimated \(\\tilde{w}\). edges : np.array \([|\\mathcal{E}(\\widehat{\\mathcal{B}})|, 2]\). Retruns ---------- w : np.array The projected \(\\tilde{w}\). ''' w = np.zeros_like(w_tilde) # projection on nodes best_proj_err_node = np.sum(w_tilde**2, axis=-1) - 2*np.max(w_tilde, axis=-1) +1 best_proj_err_node_ind = np.argmax(w_tilde, axis=-1) if len(edges)>0: # projection on edges idc = np.tile(np.arange(w.shape[0]), (2,1)).T ide = edges[np.argmax(np.sum(w_tilde[:,edges], axis=-1)**2 - 4 * np.prod(w_tilde[:,edges], axis=-1) + 2 * np.sum(w_tilde[:,edges], axis=-1), axis=-1)] w[idc, ide] = w_tilde[idc, ide] + (1-np.sum(w_tilde[idc, ide], axis=-1, keepdims=True))/2 best_proj_err_edge = np.sum(w_tilde**2, axis=-1) - np.sum(w_tilde[idc, ide]**2, axis=-1) + (1-np.sum(w_tilde[idc, ide], axis=-1))**2/2 idc = (best_proj_err_node<best_proj_err_edge) w[idc,:] = np.eye(w_tilde.shape[-1])[best_proj_err_node_ind[idc]] else: idc = np.arange(w.shape[0]) w[idc, best_proj_err_node_ind] = 1 return w def build_milestone_net(self, subgraph, init_node: int): '''Build the milestone network. Parameters ---------- subgraph : nx.Graph The connected component of the backbone given the root vertex. init_node : int The root vertex. Returns ---------- df_subgraph : pd.DataFrame The milestone network. ''' if len(subgraph)==1: warnings.warn('Singular node.') return [] elif nx.is_directed_acyclic_graph(subgraph): milestone_net = [] for edge in list(subgraph.edges): if edge[0]==init_node: dist = 1 elif edge[1]==init_node: paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0]) dist = - (np.max([len(p) for p in paths_1]) - 1) else: paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0]) paths_1 = nx.all_simple_paths(subgraph, source=init_node, target=edge[1]) dist = np.max([len(p) for p in paths_1]) - np.max([len(p) for p in paths_0]) milestone_net.append([edge[0], edge[1], dist]) else: # Dijkstra's Algorithm to find the shortest path unvisited = {node: {'parent':None, 'score':np.inf, 'distance':np.inf} for node in subgraph.nodes} current = init_node currentScore = 0 currentDistance = 0 unvisited[current]['score'] = currentScore milestone_net = [] while True: for neighbour in subgraph.neighbors(current): if neighbour not in unvisited: continue newScore = currentScore + subgraph[current][neighbour]['weight'] if unvisited[neighbour]['score'] > newScore: unvisited[neighbour]['score'] = newScore unvisited[neighbour]['parent'] = current unvisited[neighbour]['distance'] = currentDistance+1 if len(unvisited)<len(subgraph): milestone_net.append([unvisited[current]['parent'], current, unvisited[current]['distance']]) del unvisited[current] if not unvisited: break current, currentScore, currentDistance = \ sorted([(i[0],i[1]['score'],i[1]['distance']) for i in unvisited.items()], key = lambda x: x[1])[0] return np.array(milestone_net) def comp_pseudotime(self, milestone_net, init_node: int, w): '''Compute pseudotime. Parameters ---------- milestone_net : pd.DataFrame The milestone network. init_node : int The root vertex. w : np.array \([N, k]\) The projected \(\\tilde{w}\). Returns ---------- pseudotime : np.array \([N, k]\) The estimated pseudtotime. ''' pseudotime = np.empty(w.shape[0]) pseudotime.fill(np.nan) pseudotime[w[:,init_node]==1] = 0 if len(milestone_net)>0: for i in range(len(milestone_net)): _from, _to = milestone_net[i,:2] _from, _to = int(_from), int(_to) idc = ((w[:,_from]>0)&(w[:,_to]>0)) | (w[:,_to]==1) pseudotime[idc] = w[idc,_to] + milestone_net[i,-1] - 1 return pseudotime
Methods
def build_graphs(self, w_tilde, pc_x, method: str = 'mean', thres: float = 0.5, no_loop: bool = False, cutoff=0)
-
Build the backbone.
Parameters
pc_x
:np.array
- [N, K] The estimated p(c_i|Y_i,X_i).
method
:string
, optional- 'mean', 'modified_mean', 'map', or 'modified_map'.
thres
:float
, optional- The threshold used for filtering edges e_{ij} that (n_{i}+n_{j}+e_{ij})/N<thres, only applied to mean method.
Retruns
G : nx.Graph The graph of edge scores.
Expand source code
def build_graphs(self, w_tilde, pc_x, method: str = 'mean', thres: float = 0.5, no_loop: bool = False, cutoff = 0): '''Build the backbone. Parameters ---------- pc_x : np.array \([N, K]\) The estimated \(p(c_i|Y_i,X_i)\). method : string, optional 'mean', 'modified_mean', 'map', or 'modified_map'. thres : float, optional The threshold used for filtering edges \(e_{ij}\) that \((n_{i}+n_{j}+e_{ij})/N<thres\), only applied to mean method. Retruns ---------- G : nx.Graph The graph of edge scores. ''' self.no_loop = no_loop # self.w_tilde = w_tilde graph = np.zeros((self.n_states,self.n_states)) if method=='mean': for i in range(self.n_states-1): for j in range(i+1,self.n_states): idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres if np.sum(idx)>0: graph[i,j] = np.mean(pc_x[idx,self.C[i,j]]/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]], axis=-1)) elif method=='modified_mean': for i in range(self.n_states-1): for j in range(i+1,self.n_states): idx = np.sum(pc_x[:,self.C[[i,i,j],[i,j,j]]], axis=1)>=thres if np.sum(idx)>0: graph[i,j] = np.sum(pc_x[idx,self.C[i,j]])/np.sum(pc_x[idx][:,self.C[[i,i,j],[i,j,j]]]) elif method=='map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): if np.sum(c==self.C[i,j])>0: graph[i,j] = np.sum(c==self.C[i,j])/np.sum((c==self.C[i,j])|(c==self.C[i,i])|(c==self.C[j,j])) elif method=='modified_map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): graph[i,j] = np.sum(c==self.C[i,j])/(np.sum((w_tilde[:,i]>0.5)|(w_tilde[:,j]>0.5))+1e-16) elif method=='raw_map': c = np.argmax(pc_x, axis=-1) for i in range(self.n_states-1): for j in range(i+1,self.n_states): if np.sum(c==self.C[i,j])>0: graph[i,j] = np.sum(c==self.C[i,j])/np.sum(np.isin(c, np.diagonal(self.C)) == False) elif method == "w_base": for i in range(self.n_states): for j in range(i+1,self.n_states): two_vertice_max_w = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j),:] num_two_vertice = two_vertice_max_w.shape[0] if num_two_vertice > 0: graph[i, j] = np.sum( np.abs(two_vertice_max_w[:, i] - two_vertice_max_w[:, j]) < 0.1) / num_two_vertice elif method == "modified_w_base": top2_idx = np.argpartition(w_tilde, -2, axis=1)[:, -2:] for i in range(self.n_states): for j in range(i + 1, self.n_states): two_vertice_max_w = np.all(top2_idx == [i, j], axis=1) | np.all(top2_idx == [j, i], axis=1) two_vertice_max_w = w_tilde[two_vertice_max_w, :] vertice_count = w_tilde[(np.argmax(w_tilde, axis=1) == i) | (np.argmax(w_tilde, axis=1) == j), :] vertice_count = vertice_count.shape[0] if vertice_count > 0: edge_count = \ np.max((two_vertice_max_w[:, i], two_vertice_max_w[:, j]), axis=0) \ / (two_vertice_max_w[:, i] + two_vertice_max_w[:, j]) edge_count = np.sum(edge_count < 0.55) graph[i, j] = edge_count / vertice_count else: raise ValueError("Invalid method, must be one of 'mean', 'modified_mean', 'map', 'modified_map','raw_map','w_base', and 'modified_w_base'.") graph[graph<=cutoff] = 0 G = nx.from_numpy_array(graph) if self.no_loop and not nx.is_tree(G): # prune if there are no loops G = nx.maximum_spanning_tree(G) return G
def modify_wtilde(self, w_tilde, edges)
-
Project \tilde{w} to the estimated backbone.
Parameters
w_tilde
:np.array
- [N, k] The estimated \tilde{w}.
edges
:np.array
- [|\mathcal{E}(\widehat{\mathcal{B}})|, 2].
Retruns
w : np.array The projected \tilde{w}.
Expand source code
def modify_wtilde(self, w_tilde, edges): '''Project \(\\tilde{w}\) to the estimated backbone. Parameters ---------- w_tilde : np.array \([N, k]\) The estimated \(\\tilde{w}\). edges : np.array \([|\\mathcal{E}(\\widehat{\\mathcal{B}})|, 2]\). Retruns ---------- w : np.array The projected \(\\tilde{w}\). ''' w = np.zeros_like(w_tilde) # projection on nodes best_proj_err_node = np.sum(w_tilde**2, axis=-1) - 2*np.max(w_tilde, axis=-1) +1 best_proj_err_node_ind = np.argmax(w_tilde, axis=-1) if len(edges)>0: # projection on edges idc = np.tile(np.arange(w.shape[0]), (2,1)).T ide = edges[np.argmax(np.sum(w_tilde[:,edges], axis=-1)**2 - 4 * np.prod(w_tilde[:,edges], axis=-1) + 2 * np.sum(w_tilde[:,edges], axis=-1), axis=-1)] w[idc, ide] = w_tilde[idc, ide] + (1-np.sum(w_tilde[idc, ide], axis=-1, keepdims=True))/2 best_proj_err_edge = np.sum(w_tilde**2, axis=-1) - np.sum(w_tilde[idc, ide]**2, axis=-1) + (1-np.sum(w_tilde[idc, ide], axis=-1))**2/2 idc = (best_proj_err_node<best_proj_err_edge) w[idc,:] = np.eye(w_tilde.shape[-1])[best_proj_err_node_ind[idc]] else: idc = np.arange(w.shape[0]) w[idc, best_proj_err_node_ind] = 1 return w
def build_milestone_net(self, subgraph, init_node: int)
-
Build the milestone network.
Parameters
subgraph
:nx.Graph
- The connected component of the backbone given the root vertex.
init_node
:int
- The root vertex.
Returns
df_subgraph
:pd.DataFrame
- The milestone network.
Expand source code
def build_milestone_net(self, subgraph, init_node: int): '''Build the milestone network. Parameters ---------- subgraph : nx.Graph The connected component of the backbone given the root vertex. init_node : int The root vertex. Returns ---------- df_subgraph : pd.DataFrame The milestone network. ''' if len(subgraph)==1: warnings.warn('Singular node.') return [] elif nx.is_directed_acyclic_graph(subgraph): milestone_net = [] for edge in list(subgraph.edges): if edge[0]==init_node: dist = 1 elif edge[1]==init_node: paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0]) dist = - (np.max([len(p) for p in paths_1]) - 1) else: paths_0 = nx.all_simple_paths(subgraph, source=init_node, target=edge[0]) paths_1 = nx.all_simple_paths(subgraph, source=init_node, target=edge[1]) dist = np.max([len(p) for p in paths_1]) - np.max([len(p) for p in paths_0]) milestone_net.append([edge[0], edge[1], dist]) else: # Dijkstra's Algorithm to find the shortest path unvisited = {node: {'parent':None, 'score':np.inf, 'distance':np.inf} for node in subgraph.nodes} current = init_node currentScore = 0 currentDistance = 0 unvisited[current]['score'] = currentScore milestone_net = [] while True: for neighbour in subgraph.neighbors(current): if neighbour not in unvisited: continue newScore = currentScore + subgraph[current][neighbour]['weight'] if unvisited[neighbour]['score'] > newScore: unvisited[neighbour]['score'] = newScore unvisited[neighbour]['parent'] = current unvisited[neighbour]['distance'] = currentDistance+1 if len(unvisited)<len(subgraph): milestone_net.append([unvisited[current]['parent'], current, unvisited[current]['distance']]) del unvisited[current] if not unvisited: break current, currentScore, currentDistance = \ sorted([(i[0],i[1]['score'],i[1]['distance']) for i in unvisited.items()], key = lambda x: x[1])[0] return np.array(milestone_net)
def comp_pseudotime(self, milestone_net, init_node: int, w)
-
Compute pseudotime.
Parameters
milestone_net
:pd.DataFrame
- The milestone network.
init_node
:int
- The root vertex.
w
:np.array
- [N, k] The projected \tilde{w}.
Returns
pseudotime
:np.array
- [N, k] The estimated pseudtotime.
Expand source code
def comp_pseudotime(self, milestone_net, init_node: int, w): '''Compute pseudotime. Parameters ---------- milestone_net : pd.DataFrame The milestone network. init_node : int The root vertex. w : np.array \([N, k]\) The projected \(\\tilde{w}\). Returns ---------- pseudotime : np.array \([N, k]\) The estimated pseudtotime. ''' pseudotime = np.empty(w.shape[0]) pseudotime.fill(np.nan) pseudotime[w[:,init_node]==1] = 0 if len(milestone_net)>0: for i in range(len(milestone_net)): _from, _to = milestone_net[i,:2] _from, _to = int(_from), int(_to) idc = ((w[:,_from]>0)&(w[:,_to]>0)) | (w[:,_to]==1) pseudotime[idc] = w[idc,_to] + milestone_net[i,-1] - 1 return pseudotime