# -*- python -*-
# -*- coding utf-8 -*-
#
# This file is part of GDSCTools software
#
# Copyright (c) 2015 - Wellcome Trust Sanger Institute
# All rights reserved
#
# File author(s): Thomas Cokelaer <cokelaer@gmail.comWE HERE>
#
# Distributed under the BSD 3-Clause License.
# See accompanying file LICENSE.txt distributed with this software
#
# website: http://github.com/CancerRxGene/gdsctools
#
##############################################################################
"""Code related to the ANOVA analysis to find associations between drug IC50s
and genomic features"""
import collections
import pandas as pd
from easydev import Progress
from gdsctools.stats import MultipleTesting
from gdsctools import readers
from gdsctools.settings import ANOVASettings
from gdsctools.anova_results import ANOVAResults
from gdsctools.errors import GDSCToolsDuplicatedDrugError
import colorlog as logger
__all__ = ['BaseModels']
[docs]class BaseModels(object):
"""A Base class for ANOVA / ElaticNet models
"""
def __init__(self, ic50, genomic_features=None,
drug_decode=None, verbose=True,
set_media_factor=False):
""".. rubric:: Constructor
:param DataFrame IC50: a dataframe with the IC50. Rows should be
the COSMIC identifiers and columns should be the Drug names
(or identifiers)
:param features: another dataframe with rows as in the IC50 matrix
and columns as features. The first 3 columns must be named
specifically to hold tissues, MSI (see format).
:param drug_decode: a 3 column CSV file with drug's name and targets
see :mod:`readers` for more information.
:param verbose: verbosity in "WARNING", "ERROR", "DEBUG", "INFO"
The attribute :attr:`settings` contains specific settings related
to the analysis or visulation.
"""
self.verbose = verbose
self._init_called = False
# We first need to read the IC50 using a dedicated reader
try:
# Simple one without duplicated
self.ic50 = readers.IC50(ic50)
except GDSCToolsDuplicatedDrugError:
print("duplicated error")
try:
from gdsctools.gdsc import IC50Cluster
self.ic50 = IC50Cluster(ic50)
except Exception as err:
raise(err)
# Reads features if provided, otherwise use a default data set
if genomic_features is None:
# Reads default version provided with the package
self.features = readers.GenomicFeatures()
else:
self.features = readers.GenomicFeatures(genomic_features)
if self.features.found_media is False and \
set_media_factor is True:
if self.verbose:
print('Populating MEDIA Factor in the Genomic Feature matrix')
self.features.fill_media_factor()
#: a CSV with 3 columns used in the report
self.read_drug_decode(drug_decode)
# create the multiple testing factory used in anova_all()
self.multiple_testing = MultipleTesting()
# We prune the genomic features by settings the cosmic ids of
# the features to be those of the cosmic ids of the IC50. See
# readers module. This affectation, prune the features dataframe
# automatically. This fails if a cosmic identifier is not
# found in the features' cosmic ids, so let us catch the error
# before hand to give a
unknowns = set(self.ic50.cosmicIds).difference(
set(self.features.cosmicIds))
if len(unknowns) > 0 and self.verbose:
print("WARNING: " +
"%s cosmic identifiers in your IC50 " % len(unknowns) +
"could not be found in the genomic feature matrix. " +
"They will be dropped. Consider using a user-defined " +
"genomic features matrix")
self.ic50.drop_cosmic(list(unknowns))
self.features.cosmicIds = self.ic50.cosmicIds
#self.cosmicIds = self.ic50.cosmicIds
#: an instance of :class:`~gdsctools.settings.ANOVASettings`
self.settings = ANOVASettings()
# alias to all column names to store results
# cast to list (Python3).
self.column_names = list(ANOVAResults().mapping.keys())
# skip assoc_id for now
self._odof_dict = dict([(name, None)
for name in self.column_names])
# a cache to store ANOVA results for each drug
self.individual_anova = {}
# must be called if ic50 or features are changed.
self.init()
def _autoset_msi_factor(self):
if self.features.found_msi:
# if the number of pos. (or neg.) factors is not large enough then
# the MSI factor is not used
msi_name = self.features.colnames.msi
self.msi_factor = self.features.df[msi_name]
total = len(self.msi_factor)
positives = self.msi_factor.sum()
negatives = total - positives
# we must have at least 2 positives or 2 negative
# This is therefore a < comparison here below. See in
# _get_one_drug_one_feature_data that we use >= which
# is consistent.
if positives < self.settings.MSI_factor_threshold:
self.settings.include_MSI_factor = False
if negatives < self.settings.MSI_factor_threshold:
self.settings.include_MSI_factor = False
else:
self.settings.include_MSI_factor = False
self.settings.analysis_type = 'feature_only'
def _autoset_tissue_factor(self):
# select tissue based on the features
tissue_name = self.features.colnames.tissue
self.tissue_factor = self.features.df[tissue_name]
if len(self.tissue_factor.unique()) == 1:
# there is only one tissue
tissue = self.tissue_factor.unique()[0]
self.settings.analysis_type = tissue
self.settings.directory = tissue
else:
# this is a PANCAN analysis
self.settings.analysis_type = 'PANCAN'
def _autoset_media_factor(self):
if self.settings.analysis_type != 'PANCAN':
self.settings.include_media_factor = False
if self.features.found_media is True:
# Not authorised. See
# http://gdsctools.readthedocs.io/en/master/anova_parttwo.html#regression-analysis
print("WARNING")
print("You have only one Tissue %s " % self.features.tissues[0])
print("When using MEDIA FACTOR, you must use MSI and a PANCAN analysis")
print("We DO NOT include the MEDIA Factor in the analysis hereafter\n")
elif self.features.found_media is True:
self.settings.include_media_factor = True
colname = self.features.colnames.media
self.media_factor = self.features.df[colname]
else:
self.settings.include_media_factor = False
[docs] def set_cancer_type(self, ctype=None):
"""Select only a set of tissues.
Input IC50 may be PANCAN (several cancer tissues).
This function can be used to select a subset of tissues.
This function changes the :attr:`ic50` dataframe and possibly
the feature as well if some are not relevant anymore (sum of the
column is zero for instance).
"""
if ctype is None:
return
if ctype == 'PANCAN':
# Nothing to do, keep everything
return
if isinstance(ctype, str):
ctype = [str(ctype)]
for this in ctype:
assert this in self.features.tissues, "%s not found" % ctype
# keep only features that correspond to the tissue
self.features.keep_tissue_in(ctype)
self.ic50.df = self.ic50.df.loc[self.features.df.index]
self.init()
[docs] def read_settings(self, settings):
"""Read settings and update cancer type if set"""
self.settings.from_json(settings)
self.set_cancer_type(self.settings.analysis_type)
[docs] def init(self):
# Some preprocessing to speed up data access in ANOVA
ic50_parse = self.ic50.df.copy().unstack().dropna()
# for each drug, we store the IC50s (Y) and corresponding indices
# of cosmic identifiers + since v0.13 the real indices
# Create a dictionary version of the data
# to be accessed per drug where NA have already been
# removed. Each drug is a dictionary with 2 keys:
# Y for the data and indices for the cosmicID where
# there is an IC50 measured.
self.ic50_dict = dict([
(d, {'indices': ic50_parse.loc[d].index,
'Y': ic50_parse.loc[d].values}) for d in self.ic50.drugIds])
cosmicIds = list(self.ic50.df.index)
for key in self.ic50_dict.keys():
indices = [cosmicIds.index(this) for this in
self.ic50_dict[key]['indices']]
self.ic50_dict[key]['real_indices'] = indices
# save the tissues
self._autoset_tissue_factor()
# and MSI (Microsatellite instability) status of the samples.
self._autoset_msi_factor()
# and (growth) media factor
self._autoset_media_factor()
# dictionaries to speed up code.
self.msi_dict = {}
self.tissue_dict = {}
self.media_dict = {}
# fill the dictionaries for each drug once for all
for drug_name in self.ic50.drugIds:
# NOTE: indices are actually cosmid ids (not indices from 0 to N)
indices = self.ic50_dict[drug_name]['indices']
# MSI, media and tissue are not large data files and can be stored
# enterily
if self.features.found_msi:
self.msi_dict[drug_name] = self.msi_factor.loc[indices]
if self.settings.include_media_factor:
self.media_dict[drug_name] = self.media_factor.loc[indices]
self.tissue_dict[drug_name] = self.tissue_factor.loc[indices]
# some preprocessing for the OLS computation.
# We create the dummies for the tissue factor once for all
# Note that to agree with R convention, we have to resort the column
# to agree with R convention that is a<B==b<c instead of
# where A<B<C<a<b<c (in Python)
self._tissue_dummies = pd.get_dummies(self.tissue_factor)
columns = self._tissue_dummies.columns
columns = sorted(columns, key=lambda s: s.lower())
columns = ['C(tissue)[T.' + x + ']' for x in columns]
self._tissue_dummies.columns = columns
if self.settings.include_media_factor:
self._media_dummies = pd.get_dummies(self.media_factor)
columns = self._media_dummies.columns
columns = ['C(media)[T.' + x + ']' for x in columns]
self._media_dummies.columns = columns
for col in columns:
self._tissue_dummies[col] = self._media_dummies[col]
N = len(self._tissue_dummies)
self._tissue_dummies['C(msi)[T.1]'] = [1]*N
self._tissue_dummies['feature'] = [1] * N
self._tissue_dummies.insert(0, 'Intercept', [1] * N)
# drop first feature in the tissues that seems to be used as a
# reference in the regression
#tissues = [x for x in self._tissue_dummies.columns if 'tissue' in x]
#self._tissue_dummies.drop(tissues[0], axis=1, inplace=True)
"""if self.settings.include_media_factor:
# Drop first category in the media factor ?! like for tissues.
# What is the rationale ?
media = [x for x in self._tissue_dummies.columns if 'media' in x]
self._tissue_dummies.drop(media[0], axis=1, inplace=True)
"""
# reset the buffer.
self.individual_anova = {}
if self.verbose and self._init_called is False:
for this in ['tissue', 'media', 'msi', 'feature']:
if this in self._get_analysis_mode():
logger.debug(this.upper() + " FACTOR : included")
else:
logger.debug(this.upper() + " FACTOR : NOT included")
self._init_called = True
def _get_cosmics(self):
return self.ic50.cosmicIds
def _set_cosmics(self, cosmics):
self.ic50.cosmicIds = cosmics
self.features.cosmicIds = cosmics
self.init()
self.individual_anova = {}
cosmicIds = property(_get_cosmics, _set_cosmics,
doc="get/set the cosmic identifiers in the IC50 and feature matrices")
def _get_drug_names(self):
return self.ic50.drugIds
def _set_drug_names(self, drugs):
self.ic50.drugIds = drugs
self.init()
# not need to init this again ? self.individual_anova = {}
drugIds = property(_get_drug_names, _set_drug_names,
doc="Get/Set drug identifers")
def _get_feature_names(self):
shift = self.features.shift
return self.features.features[shift:]
def _set_features_names(self, features):
self.features.features = features
self.init()
self.individual_anova = {}
feature_names = property(_get_feature_names, _set_features_names,
doc="Get/Set feature names")
def _get_analysis_mode(self):
modes = []
if self.settings.analysis_type == 'PANCAN':
modes.append('tissue')
if self.settings.include_MSI_factor is True:
modes.append('msi')
if self.settings.include_media_factor is True:
modes.append('media')
modes.append('feature')
return modes
[docs] def diagnostics(self, details=False):
"""Return dataframe with information about the analysis
"""
n_drugs = len(self.ic50.drugIds)
n_features = len(self.features.features) - self.features.shift
n_combos = n_drugs * n_features
feasible = 0
pb = Progress(n_drugs, 1)
counter = 0
feasibles = collections.defaultdict(int)
for drug in self.ic50.drugIds:
for feature in self.features.features[self.features.shift:]:
dd = self._get_one_drug_one_feature_data(drug, feature,
diagnostic_only=True)
if dd.status is True:
feasible += 1
feasibles[drug] +=1
counter += 1
pb.animate(counter)
results = {
'n_drug': n_drugs,
'n_combos': n_combos,
'feasible_tests': feasible,
'percentage_feasible_tests': float(feasible)/n_combos*100}
if details is True:
results["feasible_tests_per_drug"] = feasibles
return results
[docs] def read_drug_decode(self, filename=None):
"""Read file with the DRUG information
.. seealso:: :class:`gdsctools.readers.DrugDecode`
"""
# Read the DRUG decoder file into a DrugDecode/Reader instance
self.drug_decode = readers.DrugDecode(filename)
def __str__(self):
txt = self.ic50.__str__()
txt += "\n" + self.features.__str__()
return txt
def __repr__(self):
txt = self.__str__()
return txt