Source code for cobra.flux_analysis.phenotype_phase_plane

from numpy import (linspace, zeros, meshgrid, abs, empty, arange, int32,
                   unravel_index, dtype)
from multiprocessing import Pool

from ..solvers import solver_dict, get_solver_name

# attempt to import plotting libraries
try:
    from matplotlib import pyplot
    from mpl_toolkits.mplot3d import axes3d
except ImportError:
    pyplot = None
    axes3d = None
mlab = None  # mayavi may crash python
try:  # for prettier colors
    from palettable.colorbrewer import get_map
except ImportError:
    try:
        from brewer2mpl import get_map
    except ImportError:
        get_map = None


[docs]class phenotypePhasePlaneData: """class to hold results of a phenotype phase plane analysis""" def __init__(self, reaction1_name, reaction2_name, reaction1_range_max, reaction2_range_max, reaction1_npoints, reaction2_npoints): self.reaction1_name = reaction1_name self.reaction2_name = reaction2_name self.reaction1_range_max = reaction1_range_max self.reaction2_range_max = reaction2_range_max self.reaction1_npoints = reaction1_npoints self.reaction2_npoints = reaction2_npoints self.reaction1_fluxes = linspace(0, reaction1_range_max, reaction1_npoints) self.reaction2_fluxes = linspace(0, reaction2_range_max, reaction2_npoints) self.growth_rates = zeros((reaction1_npoints, reaction2_npoints)) self.shadow_prices1 = zeros((reaction1_npoints, reaction2_npoints)) self.shadow_prices2 = zeros((reaction1_npoints, reaction2_npoints)) self.segments = zeros(self.growth_rates.shape, dtype=int32) self.phases = []
[docs] def plot(self): """plot the phenotype phase plane in 3D using any available backend""" if pyplot is not None: self.plot_matplotlib() elif mlab is not None: self.plot_mayavi() else: raise ImportError("No suitable 3D plotting package found")
[docs] def plot_matplotlib(self, theme="Paired", scale_grid=False): """Use matplotlib to plot a phenotype phase plane in 3D. theme: color theme to use (requires palettable) returns: maptlotlib 3d subplot object""" if pyplot is None: raise ImportError("Error importing matplotlib 3D plotting") colors = empty(self.growth_rates.shape, dtype=dtype((str, 7))) n_segments = self.segments.max() # pick colors if get_map is None: color_list = ['#A6CEE3', '#1F78B4', '#B2DF8A', '#33A02C', '#FB9A99', '#E31A1C', '#FDBF6F', '#FF7F00', '#CAB2D6', '#6A3D9A', '#FFFF99', '#B15928'] else: color_list = get_map(theme, 'Qualitative', n_segments).hex_colors if n_segments > len(color_list): from warnings import warn warn("not enough colors to color all detected phases") if n_segments > 0 and n_segments <= len(color_list): for i in range(n_segments): colors[self.segments == (i + 1)] = color_list[i] else: colors[:, :] = 'b' if scale_grid: # grid wires should not have more than ~20 points xgrid_scale = int(self.reaction1_npoints / 20) ygrid_scale = int(self.reaction2_npoints / 20) else: xgrid_scale, ygrid_scale = (1, 1) figure = pyplot.figure() xgrid, ygrid = meshgrid(self.reaction1_fluxes, self.reaction2_fluxes) axes = figure.add_subplot(111, projection="3d") xgrid = xgrid.transpose() ygrid = ygrid.transpose() axes.plot_surface(xgrid, ygrid, self.growth_rates, rstride=1, cstride=1, facecolors=colors, linewidth=0, antialiased=False) axes.plot_wireframe(xgrid, ygrid, self.growth_rates, color="black", rstride=xgrid_scale, cstride=ygrid_scale) axes.set_xlabel(self.reaction1_name, size="x-large") axes.set_ylabel(self.reaction2_name, size="x-large") axes.set_zlabel("Growth rate", size="x-large") axes.view_init(elev=30, azim=-135) figure.tight_layout() return axes
[docs] def plot_mayavi(self): """Use mayavi to plot a phenotype phase plane in 3D. The resulting figure will be quick to interact with in real time, but might be difficult to save as a vector figure. returns: mlab figure object""" from mayavi import mlab figure = mlab.figure(bgcolor=(1, 1, 1), fgcolor=(0, 0, 0)) figure.name = "Phenotype Phase Plane" max = 10.0 xmax = self.reaction1_fluxes.max() ymax = self.reaction2_fluxes.max() zmax = self.growth_rates.max() xgrid, ygrid = meshgrid(self.reaction1_fluxes, self.reaction2_fluxes) xgrid = xgrid.transpose() ygrid = ygrid.transpose() xscale = max / xmax yscale = max / ymax zscale = max / zmax mlab.surf(xgrid * xscale, ygrid * yscale, self.growth_rates * zscale, representation="wireframe", color=(0, 0, 0), figure=figure) mlab.mesh(xgrid * xscale, ygrid * yscale, self.growth_rates * zscale, scalars=self.shadow_prices1 + self.shadow_prices2, resolution=1, representation="surface", opacity=0.75, figure=figure) # draw axes mlab.outline(extent=(0, max, 0, max, 0, max)) mlab.axes(opacity=0, ranges=[0, xmax, 0, ymax, 0, zmax]) mlab.xlabel(self.reaction1_name) mlab.ylabel(self.reaction2_name) mlab.zlabel("Growth rates") return figure
[docs] def segment(self, threshold=0.01): """attempt to segment the data and identify the various phases""" self.segments *= 0 # each entry in phases will consist of the following tuple # ((x, y), shadow_price1, shadow_price2) self.phases = [] # initialize the area to be all False covered_area = (self.growth_rates * 0 == 1) # as long as part of the area has not been covered segment_id = 0 while self.segments.min() == 0: segment_id += 1 # i and j are indices for a current point which has not been # assigned a segment yet i, j = unravel_index(self.segments.argmin(), self.segments.shape) # update the segment id for any point with a similar shadow price # to the current point d1 = abs(self.shadow_prices1 - self.shadow_prices1[i, j]) d2 = abs(self.shadow_prices2 - self.shadow_prices2[i, j]) self.segments[(d1 < threshold) * (d2 < threshold)] += segment_id # add the current point as one of the phases self.phases.append(( (self.reaction1_fluxes[i], self.reaction2_fluxes[j]), self.shadow_prices1[i, j], self.shadow_prices2[i, j]))
def _calculate_subset(arguments): """Calculate a subset of the phenotype phase plane data. Store each result tuple as: (i, j, growth_rate, shadow_price1, shadow_price2)""" model = arguments["model"] reaction1_fluxes = arguments["reaction1_fluxes"] reaction2_fluxes = arguments["reaction2_fluxes"] metabolite1_name = arguments["metabolite1_name"] metabolite2_name = arguments["metabolite2_name"] index1 = arguments["index1"] index2 = arguments["index2"] i_list = arguments["i_list"] j_list = arguments["j_list"] tolerance = arguments["tolerance"] solver = solver_dict[arguments["solver"]] results = [] reaction1 = model.reactions[index1] reaction2 = model.reactions[index2] problem = solver.create_problem(model) solver.solve_problem(problem) for a, flux1 in enumerate(reaction1_fluxes): i = i_list[a] # flux is actually negative for uptake. Also some solvers require # float instead of numpy.float64 flux1 = float(-1 * flux1) # change bounds on reaction 1 solver.change_variable_bounds(problem, index1, flux1 - tolerance, flux1 + tolerance) for b, flux2 in enumerate(reaction2_fluxes): j = j_list[b] flux2 = float(-1 * flux2) # same story as flux1 # change bounds on reaction 2 solver.change_variable_bounds(problem, index2, flux2 - tolerance, flux2 + tolerance) # solve the problem and save results solver.solve_problem(problem) solution = solver.format_solution(problem, model) if solution is not None and solution.status == "optimal": results.append((i, j, solution.f, solution.y_dict[metabolite1_name], solution.y_dict[metabolite2_name])) else: results.append((i, j, 0, 0, 0)) # reset reaction 2 bounds solver.change_variable_bounds(problem, index2, float(reaction2.lower_bound), float(reaction2.upper_bound)) # reset reaction 1 bounds solver.change_variable_bounds(problem, index1, float(reaction1.lower_bound), float(reaction1.upper_bound)) return results
[docs]def calculate_phenotype_phase_plane( model, reaction1_name, reaction2_name, reaction1_range_max=20, reaction2_range_max=20, reaction1_npoints=50, reaction2_npoints=50, solver=None, n_processes=1, tolerance=1e-6): """calculates the growth rates while varying the uptake rates for two reactions. returns: an object containing the growth rates for the uptake rates. To plot the result, call the plot function of the returned object. Example: data = calculate_phenotype_phase_plane(my_model, "EX_foo", "EX_bar") data.plot() """ if solver is None: solver = get_solver_name() data = phenotypePhasePlaneData( str(reaction1_name), str(reaction2_name), reaction1_range_max, reaction2_range_max, reaction1_npoints, reaction2_npoints) # find the objects for the reactions and metabolites index1 = model.reactions.index(data.reaction1_name) index2 = model.reactions.index(data.reaction2_name) metabolite1_name = list(model.reactions[index1]._metabolites)[0].id metabolite2_name = list(model.reactions[index2]._metabolites)[0].id if n_processes > reaction1_npoints: # limit the number of processes n_processes = reaction1_npoints range_add = reaction1_npoints // n_processes # prepare the list of arguments for each _calculate_subset call arguments_list = [] i = arange(reaction1_npoints) j = arange(reaction2_npoints) for n in range(n_processes): start = n * range_add if n != n_processes - 1: r1_range = data.reaction1_fluxes[start:start + range_add] i_list = i[start:start + range_add] else: r1_range = data.reaction1_fluxes[start:] i_list = i[start:] arguments_list.append({ "model": model, "index1": index1, "index2": index2, "metabolite1_name": metabolite1_name, "metabolite2_name": metabolite2_name, "reaction1_fluxes": r1_range, "reaction2_fluxes": data.reaction2_fluxes.copy(), "i_list": i_list, "j_list": j.copy(), "tolerance": tolerance, "solver": solver}) if n_processes > 1: p = Pool(n_processes) results = list(p.map(_calculate_subset, arguments_list)) else: results = [_calculate_subset(arguments_list[0])] for result_list in results: for result in result_list: i = result[0] j = result[1] data.growth_rates[i, j] = result[2] data.shadow_prices1[i, j] = result[3] data.shadow_prices2[i, j] = result[4] data.segment() return data