|
|
|
|
|
import argparse |
|
import os |
|
import sys |
|
import time |
|
import traceback |
|
import sklearn |
|
import json |
|
import tensorflow as tf |
|
import keras |
|
import keras_nlp |
|
import keras.layers as kl |
|
from keras.layers import Conv1D, MaxPooling1D, AveragePooling1D |
|
from keras_nlp.layers import SinePositionEncoding, TransformerEncoder |
|
from keras.layers import BatchNormalization |
|
from keras.models import Sequential, Model, load_model |
|
from keras.optimizers import Adam |
|
from keras.callbacks import EarlyStopping, History, ModelCheckpoint |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from scipy import stats |
|
from collections import Counter |
|
from itertools import product |
|
from sklearn.metrics import mean_squared_error |
|
|
|
startTime=time.time() |
|
import os |
|
os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
|
def parse_arguments(): |
|
parser = argparse.ArgumentParser(description='DeepSTARR') |
|
parser.add_argument('--config', type=str, default='config/config-conv-117.json', help='Configuration file path (default: config/config-conv-117.json)') |
|
parser.add_argument('--indir', type=str, default='./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt', help='Input data directory (default: ./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt)') |
|
parser.add_argument('--out_dir', type=str, default='output', help='Output directory (default: output)') |
|
parser.add_argument('--label', type=str, default='baseline', help='Output label (default: baseline)') |
|
return parser.parse_args() |
|
|
|
def LoadConfig(config): |
|
with open(config, 'r') as file: |
|
params = json.load(file) |
|
return params |
|
|
|
def one_hot_encode(seq): |
|
nucleotide_dict = {'A': [1, 0, 0, 0], |
|
'C': [0, 1, 0, 0], |
|
'G': [0, 0, 1, 0], |
|
'T': [0, 0, 0, 1], |
|
'N': [0, 0, 0, 0]} |
|
return np.array([nucleotide_dict[nuc] for nuc in seq]) |
|
|
|
def kmer_encode(sequence, k=3): |
|
sequence = sequence.upper() |
|
kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)] |
|
kmer_counts = Counter(kmers) |
|
return {kmer: kmer_counts.get(kmer, 0) / len(kmers) for kmer in [''.join(p) for p in product('ACGT', repeat=k)]} |
|
|
|
def kmer_features(seq, k=3): |
|
all_kmers = [''.join(p) for p in product('ACGT', repeat=k)] |
|
feature_matrix = [] |
|
kmer_freqs = kmer_encode(seq, k) |
|
feature_vector = [kmer_freqs[kmer] for kmer in all_kmers] |
|
feature_matrix.append(feature_vector) |
|
return np.array(feature_matrix) |
|
|
|
def prepare_input(data_set, params): |
|
if params['encode'] == 'one-hot': |
|
seq_matrix = np.array(data_set['Sequence'].apply(one_hot_encode).tolist()) |
|
elif params['encode'] == 'k-mer': |
|
seq_matrix = np.array(data_set['Sequence'].apply(kmer_features, k=3).tolist()) |
|
else: |
|
raise Exception ('wrong encoding method') |
|
|
|
Y_dev = data_set.Dev_log2_enrichment |
|
Y_hk = data_set.Hk_log2_enrichment |
|
Y = [Y_dev, Y_hk] |
|
|
|
return seq_matrix, Y |
|
|
|
def DeepSTARR(params): |
|
if params['encode'] == 'one-hot': |
|
input = kl.Input(shape=(249, 4)) |
|
elif params['encode'] == 'k-mer': |
|
input = kl.Input(shape=(1, 64)) |
|
|
|
for i in range(params['convolution_layers']['n_layers']): |
|
x = kl.Conv1D(params['convolution_layers']['filters'][i], |
|
kernel_size = params['convolution_layers']['kernel_sizes'][i], |
|
padding = params['pad'], |
|
name=str('Conv1D_'+str(i+1)))(input) |
|
x = kl.BatchNormalization()(x) |
|
x = kl.Activation('relu')(x) |
|
if params['encode'] == 'one-hot': |
|
x = kl.MaxPooling1D(2)(x) |
|
|
|
if params['dropout_conv'] == 'yes': x = kl.Dropout(params['dropout_prob'])(x) |
|
|
|
|
|
for i in range(params['transformer_layers']['n_layers']): |
|
if i == 0: |
|
x = x + keras_nlp.layers.SinePositionEncoding()(x) |
|
x = TransformerEncoder(intermediate_dim = params['transformer_layers']['attn_key_dim'][i], |
|
num_heads = params['transformer_layers']['attn_heads'][i], |
|
dropout = params['dropout_prob'])(x) |
|
|
|
|
|
|
|
x = kl.Flatten()(x) |
|
|
|
|
|
|
|
for i in range(params['n_dense_layer']): |
|
x = kl.Dense(params['dense_neurons'+str(i+1)], |
|
name=str('Dense_'+str(i+1)))(x) |
|
x = kl.BatchNormalization()(x) |
|
x = kl.Activation('relu')(x) |
|
x = kl.Dropout(params['dropout_prob'])(x) |
|
|
|
|
|
bottleneck = x |
|
|
|
|
|
|
|
tasks = ['Dev', 'Hk'] |
|
outputs = [] |
|
for task in tasks: |
|
outputs.append(kl.Dense(1, activation='linear', name=str('Dense_' + task))(bottleneck)) |
|
|
|
|
|
model = Model([input], outputs) |
|
model.compile(Adam(learning_rate=params['lr']), |
|
loss=['mse', 'mse'], |
|
loss_weights=[1, 1]) |
|
|
|
return model, params |
|
|
|
def train(selected_model, X_train, Y_train, X_valid, Y_valid, params): |
|
my_history=selected_model.fit(X_train, Y_train, |
|
validation_data=(X_valid, Y_valid), |
|
batch_size=params['batch_size'], |
|
epochs=params['epochs'], |
|
callbacks=[EarlyStopping(patience=params['early_stop'], monitor="val_loss", restore_best_weights=True), History()]) |
|
|
|
return selected_model, my_history |
|
|
|
def summary_statistics(X, Y, set, task, main_model, main_params, out_dir): |
|
pred = main_model.predict(X, batch_size=main_params['batch_size']) |
|
if task =="Dev": |
|
i=0 |
|
if task =="Hk": |
|
i=1 |
|
print(set + ' MSE ' + task + ' = ' + str("{0:0.2f}".format(mean_squared_error(Y, pred[i].squeeze())))) |
|
print(set + ' PCC ' + task + ' = ' + str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0]))) |
|
print(set + ' SCC ' + task + ' = ' + str("{0:0.2f}".format(stats.spearmanr(Y, pred[i].squeeze())[0]))) |
|
return str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0])) |
|
|
|
def main(config, indir, out_dir, label): |
|
data = pd.read_table(indir) |
|
params = LoadConfig(config) |
|
|
|
X_train, Y_train = prepare_input(data[data['set'] == "Train"], params) |
|
X_valid, Y_valid = prepare_input(data[data['set'] == "Val"], params) |
|
X_test, Y_test = prepare_input(data[data['set'] == "Test"], params) |
|
|
|
DeepSTARR(params)[0].summary() |
|
DeepSTARR(params)[1] |
|
main_model, main_params = DeepSTARR(params) |
|
main_model, my_history = train(main_model, X_train, Y_train, X_valid, Y_valid, main_params) |
|
|
|
endTime=time.time() |
|
seconds=endTime-startTime |
|
print("Total training time:",round(seconds/60,2),"minutes") |
|
|
|
dev_results = summary_statistics(X_test, Y_test[0], "test", "Dev", main_model, main_params, out_dir) |
|
hk_results = summary_statistics(X_test, Y_test[1], "test", "Hk", main_model, main_params, out_dir) |
|
|
|
result = { |
|
"AutoDNA": { |
|
"means": { |
|
"PCC(Dev)": dev_results, |
|
"PCC(Hk)": hk_results |
|
} |
|
} |
|
} |
|
|
|
with open(f"{out_dir}/final_info.json", "w") as file: |
|
json.dump(result, file, indent=4) |
|
|
|
main_model.save(out_dir + '/' + label + '.h5') |
|
|
|
if __name__ == "__main__": |
|
try: |
|
args = parse_arguments() |
|
main(args.config, args.indir, args.out_dir, args.label) |
|
except Exception as e: |
|
print("Original error in subprocess:", flush=True) |
|
if not os.path.exists(args.out_dir): |
|
os.makedirs(args.out_dir) |
|
traceback.print_exc(file=open(os.path.join(args.out_dir, "traceback.log"), "w")) |
|
raise |
|
|
|
|
|
|
|
|
|
|