import numpy as np
import networkx as nx
from copy import deepcopy
from ...visualization import *
from .infomap import InfoMap
[docs]class NeighborSampler:
"""
Class for sampling node attributes averaged over neighbors.
Attributes:
graph (spatial.Graph) - graph instance
G (nx.Graph) - graph with node attribute
attr (str) - attribute to be averaged over neighbors
depth (int) - maximum number of edges connecting neighbors
log (bool) - if True, log-transform values before averaging
"""
def __init__(self, graph, attr, depth=1, log=True):
"""
Instantiate sampler for averaging <attr> value over all neighbors within <depth> of each node.
Args:
graph (spatial.Graph) - graph instance
attr (str) - attribute to be averaged over neighbors
depth (int) - maximum number of edges connecting neighbors
log (bool) - if True, log-transform values before averaging
"""
self.graph = graph
self.attr = attr
self.depth = depth
self.log = log
# add node attribute to graph
if self.attr_used not in list(self.G.nodes(data=True)[0].keys()):
self.add_attribute_to_graph()
# sample over neigbors
self.average_over_neighbors()
[docs] @classmethod
def multisample(cls, attr, *graphs, **kwargs):
"""
Generate composite sample from one or more <graphs>.
Args:
attr (str) - attribute to be averaged over neighbors
graphs (spatial.Graph) - one or more graph instances
kwargs: keyword arguments for sampler
Returns:
sample (np.ndarray[float]) - 2D array of sampled values, first column contains cell measurements while the second column contains measurements averaged over the neighbors of each cell
keys (list of str) - attribute keys for sampled data
"""
samples = []
for graph in graphs:
sampler = cls(graph, attr, **kwargs)
samples.append(sampler.sample)
return np.vstack(samples), sampler.keys
@property
def num_nodes(self):
""" Number of nodes. """
return self.graph.num_nodes
@property
def data(self):
""" Graph data. """
return self.graph.data
@property
def G(self):
""" NetworkX graph instance. """
return self.graph.G
@property
def node_values(self):
""" Vector of attribute values for each node. """
values = self.data[self.attr].values
if self.log:
values = np.log(values)
return values
@property
def node_values_dict(self):
""" Dictionary of attribute values, keyed by node index. """
values = self.data[self.attr]
if self.log:
values = np.log(values)
return dict(values)
@property
def neighbors(self):
""" Dictionary of neighbor indices keyed by node indices. """
kwargs = dict(depth_limit=self.depth)
bfs = lambda n: [e[1] for e in nx.bfs_edges(self.G, n, **kwargs)]
return {node: bfs(node) for node in self.G.nodes}
@property
def size_attr(self):
""" Neighborhood size attribute name. """
return 'd{:d}_neighbors'.format(self.depth)
@property
def attr_used(self):
""" Name of attribute used to access graph data. """
if self.log:
return 'log_' + self.attr
else:
return self.attr
@property
def averaged_attr(self):
""" Name of averaged attribute. """
return '{:s}_d{:d}'.format(self.attr, self.depth)
@property
def keys(self):
""" List of attribute names. """
return [self.attr, self.averaged_attr]
@property
def sample(self):
""" Returns bivariate sample combining each node's attribute value with the average attribute value in its neighborhood. """
return self.data[self.keys].values
[docs] def add_attribute_to_graph(self):
""" Add attribute to networkx graph object. """
nx.set_node_attributes(self.G, self.node_values_dict, name=self.attr_used)
@staticmethod
def _neighbor_average(G, node_values, depth=1):
"""
Average attribute value over all neighbors of each node in a graph.
Args:
G (nx.Graph) - graph with nodes containing an attribute
node_values (dict) - node attribute values keyed by node index
depth (int) - maximum number of edges connecting neighbors
Returns:
node_to_neighborhood (dict) - dictionary of neighborhood (<attribute>, size) tuples keyed by node index
"""
# define breadth first search of each <src> node
bfs = lambda n: [e[1] for e in nx.bfs_edges(G, n, depth_limit=depth)]
# define function to parse mean node attribute within neighborhood
parse = lambda nbs: (np.mean([node_values[n] for n in nbs]), len(nbs))
return {src: parse(bfs(src)) for src in G.nodes}
[docs] def average_over_neighbors(self):
"""
Average attribute value over all neighbors adjacent to each node.
"""
# average attribute over neighbors
neighbor_dict = self._neighbor_average(self.G, self.node_values_dict, self.depth)
# extract average and sample size for each node
keys, values = list(zip(*neighbor_dict.items()))
node_indices = np.array(keys)
means, sizes = np.array(values).T
# log transform average
if self.log:
means = np.exp(means)
# store outcome
self.data.loc[node_indices, self.averaged_attr] = means
self.data.loc[node_indices, self.size_attr] = sizes
@default_figure
def histogram_sample_sizes(self, ax=None):
""" Histogram sample sizes. """
sizes = self.data[self.size_attr].values
_ = ax.hist(sizes, bins=np.arange(sizes.max()+1))
@square_figure
def plot_neigborhood(self, node,
color='r',
bg_color='k',
ax=None,
**kwargs):
"""
Visualize neighborhood surrounding <node>.
Args:
node (int) - node positional index
color (str or RGB tuple) - color for nodes within neighborhood
bg_color (str or RGB tuple) - color for nodes outside neighborhood
kwargs: keyword arguments for plt.scatter
"""
node_id = self.graph.node_map(node)[0]
neighbors_id = self.neighbors[node_id]
neighbors = self.graph.position_map(neighbors_id)
colors = np.array([bg_color for _ in range(self.graph.nodes.size)])
colors[node] = color
colors[neighbors] = color
ax.scatter(*self.data[self.graph.xykey].values.T, c=colors, **kwargs)
@default_figure
def plot_autocorrelation(self, ax=None, xmax=10, **kwargs):
""" Plot autocorrelation versus path length. """
# evaluate distance between all nodes
paths = dict(nx.all_pairs_shortest_path_length(self.G, cutoff=xmax))
if xmax is None:
max_depth = max([max(v.values()) for v in paths.values()])
else:
max_depth = xmax
# get node levels and evaluate global mean/variance
levels = self.node_values
mu, sigma2 = levels.mean(), levels.var()
def eval_flux(edge_list):
""" Evaluate mean fluctuation in edge_list. """
idx = self.graph.position_map(np.array(edge_list))
flux = ((levels[idx[:, 0]]-mu) * (levels[idx[:, 1]] - mu)) / sigma2
return flux.mean(), flux.std(), flux.size
# compute pairwise fluctuations between nodes
edges = {i: [] for i in range(max_depth+1)}
for node, neighbors in paths.items():
for neighbor, distance in neighbors.items():
edges[distance].append((node, neighbor))
# compile autocorrelation function
means, _, sizes = list(zip(*[eval_flux(e) for e in edges.values()]))
means, sizes = np.array(means), np.array(sizes)
# plot autocorrelation
ax.plot(range(max_depth+1), means, '.-k', **kwargs)
ax.set_ylim(-0.1, 1)
ax.set_xlim(0, max_depth+1)
ax.set_ylabel('Correlation')
ax.set_xlabel('Path length')
[docs]class RadialSampler(NeighborSampler):
"""
Class for sampling node attributes averaged within a predetermined radius of each node.
Attributes:
graph (spatial.Graph) - graph instance
G (nx.Graph) - graph with node attribute
attr (str) - attribute to be averaged over neighbors
depth (int) - hierarchical level to which communities are merged
log (bool) - if True, log-transform values before averaging
length_scale (float) - characteristic length scale of the graph
radius (float) - radius of sampling region surrounding each measurement
"""
def __init__(self, graph, attr, depth=1., log=True):
"""
Instantiate sampler for averaging <attr> value over all nodes within a predetermined radius of each node. The radius is defined by <depth> multiples of the characteristic length over which correlations in the attribute value decay.
Args:
graph (spatial.Graph) - graph instance
attr (str) - attribute to be averaged over neighbors
depth (int) - hierarchical level to which communities are merged
log (bool) - if True, log-transform values before averaging
"""
# store attributes
self.graph = graph
self.attr = attr
self.depth = depth
self.log = log
# determine characteristic length scale and set sampling radius
length_scale = graph.get_correlations(attr, log).characteristic_length
# if failed (e.g. no correlation), use 1.5x median edge length
if length_scale is None:
length_scale = 1.5 * graph.median_edge_length
self.length_scale = length_scale
self.radius = depth * length_scale
self.neighbor_mask = self._neighbor_mask
# sample over neigbors
self.average_over_neighbors()
@property
def distance_matrix(self):
""" Euclidean distance matrix between nodes (ordered by position in <self.data>). """
return self.graph.distance_matrix
@property
def _neighbor_mask(self):
""" Boolean adjacency mask (True for neighbors). """
neighbor_mask = (self.distance_matrix < self.radius)
np.fill_diagonal(neighbor_mask, False)
return neighbor_mask
@property
def neighbors(self):
"""
Dictionary of neighbor positional indices keyed by node indices.
"""
return {n: r.nonzero()[0] for n, r in enumerate(self.neighbor_mask)}
@property
def size_attr(self):
""" Neighborhood size attribute name. """
return 'sampling_radius'
@property
def averaged_attr(self):
""" Name of averaged attribute. """
return '{:s}_radial'.format(self.attr)
[docs] def average_over_neighbors(self):
"""
Average attribute value over all nodes within the specified radius of each node.
"""
# average within each neighborhood
node_values = self.node_values.reshape(1, -1)
matrix = np.repeat(node_values, self.num_nodes, axis=0)
masked_values = np.ma.masked_array(matrix, mask=~self.neighbor_mask)
means = masked_values.mean(axis=1).data
# log transform average
if self.log:
means = np.exp(means)
# store outcome
self.data[self.averaged_attr] = means
self.data[self.size_attr] = (~masked_values.mask).sum(axis=1)
@square_figure
def plot_neigborhood(self, node,
color='r',
bg_color='k',
radius=True,
ax=None,
**kwargs):
"""
Visualize neighborhood surrounding <node>.
Args:
node (int) - node positional index
color (str or RGB tuple) - color for nodes within neighborhood
bg_color (str or RGB tuple) - color for nodes outside neighborhood
radius (bool) - if True, plot sampling radius
kwargs: keyword arguments for plt.scatter
"""
# draw selection boundary
if radius:
center = self.data[self.graph.xykey].values[node]
circle = plt.Circle(center, self.radius, color=color, alpha=0.2)
ax.add_artist(circle)
# scatter points
colors = np.array([bg_color for _ in range(self.graph.nodes.size)])
colors[node] = color
colors[self.neighbors[node]] = color
ax.scatter(*self.data[self.graph.xykey].values.T, c=colors, **kwargs)
@default_figure
def plot_autocorrelation(self, ax=None, **kwargs):
""" Plot autocorrelation versus community depth. """
correlations = self.graph.get_correlations(self.attr, self.log)
correlations.visualize(ax=ax, **kwargs)