Source code for marsi.utils

# Copyright 2016 Chr. Hansen A/S and The Novo Nordisk Foundation Center for Biosustainability, DTU.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import gzip
import logging
import os
import pickle
import re
import shutil
import time

import numpy as np
from IProgress import ProgressBar, Percentage
from cameo import fba
from cameo.flux_analysis.analysis import n_carbon
from cobra.core.reaction import Reaction

from marsi import config

__all__ = ['data_dir', 'log_dir', 'pickle_large', 'unpickle_large', 'frange', 'src_dir', 'internal_data_dir']

data_dir = os.path.join(config.prj_dir, "data")
models_dir = os.path.join(config.prj_dir, "models")
log_dir = os.path.join(config.prj_dir, "log")
src_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)))

internal_data_dir = os.path.join(src_dir, 'io', 'files')

INCHI_KEY_TYPE = np.dtype("a27")

BIOMASS_RE = re.compile("biomass", re.IGNORECASE)

MAX_BYTES = 2 ** 31 - 1

logger = logging.getLogger(__name__)


[docs]def pickle_large(obj, file_path, progress=False): with open(file_path, 'wb') as model_handler: bytes_out = pickle.dumps(obj) output_size = len(bytes_out) if progress: pbar = ProgressBar(maxval=output_size, widgets=["Writing ", Percentage()]) for idx in pbar(range(0, output_size, MAX_BYTES)): model_handler.write(bytes_out[idx:idx + MAX_BYTES]) else: for idx in range(0, output_size, MAX_BYTES): model_handler.write(bytes_out[idx:idx + MAX_BYTES])
[docs]def unpickle_large(file_path, progress=False): input_size = os.path.getsize(file_path) logger.debug("Input size: %f bytes" % input_size) with open(file_path, 'rb') as file_handler: bytes_in = bytearray(0) if progress: pbar = ProgressBar(maxval=input_size, widgets=["Loading ", Percentage()]) for _ in pbar(range(0, input_size, MAX_BYTES)): bytes_in += file_handler.read(MAX_BYTES) else: for _ in range(0, input_size, MAX_BYTES): bytes_in += file_handler.read(MAX_BYTES) return pickle.loads(bytes_in)
[docs]def frange(start, stop=None, steps=10): """ Float range generator. Generates *steps* equally separated between *start* and *stop*. If *stop* is None, the values are between 0 and *start* Parameters ---------- start : float The initial value. stop : float The final value. steps : int Number of values to generate. Returns ------- generator A generator that yields float. """ if stop is None: stop = start start = 0 # Python 2 division of int returns int start = float(start) stop = float(stop) step_size = (stop - start) / float(steps) logger.debug("Step size %f" % step_size) for i in range(steps): logger.debug("Iteration %i: %f" % (i + 1, i * step_size)) yield start + i * step_size
def unique(l): """ Removes repeated values from a list. Parameters ---------- l: list Returns ------- list The same list with only unique values. """ s = set() n = 0 for x in l: if x not in s: s.add(x) l[n] = x n += 1 del l[n:] def timing(debug=False): # pragma: no cover def function_wrapper(func): if debug: def debug_wrap_func(*args, **kwargs): start = time.time() ret = func(*args, **kwargs) stop = time.time() if config.log.level >= config.Level.DEBUG: print('%s function took %0.3f ms' % (func.__name__, (stop - start) * 1000.0)) return ret return debug_wrap_func else: def wrap_func(*args, **kwargs): start = time.time() ret = func(*args, **kwargs) stop = time.time() print('%s function took %0.3f ms' % (func.__name__, (stop - start) * 1000.0)) return ret return wrap_func return function_wrapper def default_carbon_sources(model): solution = fba(model) carbon_sources = [] for ex in model.exchanges: assert isinstance(ex, Reaction) if ex.lower_bound < 0 and solution[ex.id] < 0 < n_carbon(ex): logger.debug("Found carbon source: %s") carbon_sources.append(ex) return carbon_sources def gunzip(file): assert file[-3:] == ".gz" in_name = file out_name = file[0:-3] with gzip.open(in_name, 'rb') as f_in, open(out_name, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) def search_metabolites(model, species_id, ignore_external=True): if ignore_external: return model.metabolites.query(lambda mid: mid[:-2] == species_id and mid[-2:] != "_e", attribute='id') else: return model.metabolites.query(lambda mid: mid[:-2] == species_id, attribute='id')