yxc97's picture
Upload folder using huggingface_hub
62a2f1c verified
# adapted from Deepstarr colab notebook: https://colab.research.google.com/drive/1Xgak40TuxWWLh5P5ARf0-4Xo0BcRn0Gd
import argparse
import os
import sys
import time
import traceback
import sklearn
import json
import tensorflow as tf
import keras
import keras_nlp
import keras.layers as kl
from keras.layers import Conv1D, MaxPooling1D, AveragePooling1D
from keras_nlp.layers import SinePositionEncoding, TransformerEncoder
from keras.layers import BatchNormalization
from keras.models import Sequential, Model, load_model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, History, ModelCheckpoint
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from collections import Counter
from itertools import product
from sklearn.metrics import mean_squared_error
startTime=time.time()
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
def parse_arguments():
parser = argparse.ArgumentParser(description='DeepSTARR')
parser.add_argument('--config', type=str, default='config/config-conv-117.json', help='Configuration file path (default: config/config-conv-117.json)')
parser.add_argument('--indir', type=str, default='./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt', help='Input data directory (default: ./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt)')
parser.add_argument('--out_dir', type=str, default='output', help='Output directory (default: output)')
parser.add_argument('--label', type=str, default='baseline', help='Output label (default: baseline)')
return parser.parse_args()
def LoadConfig(config):
with open(config, 'r') as file:
params = json.load(file)
return params
def one_hot_encode(seq):
nucleotide_dict = {'A': [1, 0, 0, 0],
'C': [0, 1, 0, 0],
'G': [0, 0, 1, 0],
'T': [0, 0, 0, 1],
'N': [0, 0, 0, 0]}
return np.array([nucleotide_dict[nuc] for nuc in seq])
def kmer_encode(sequence, k=3):
sequence = sequence.upper()
kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)]
kmer_counts = Counter(kmers)
return {kmer: kmer_counts.get(kmer, 0) / len(kmers) for kmer in [''.join(p) for p in product('ACGT', repeat=k)]}
def kmer_features(seq, k=3):
all_kmers = [''.join(p) for p in product('ACGT', repeat=k)]
feature_matrix = []
kmer_freqs = kmer_encode(seq, k)
feature_vector = [kmer_freqs[kmer] for kmer in all_kmers]
feature_matrix.append(feature_vector)
return np.array(feature_matrix)
def prepare_input(data_set, params):
if params['encode'] == 'one-hot':
seq_matrix = np.array(data_set['Sequence'].apply(one_hot_encode).tolist()) # (number of sequences, length of sequences, nucleotides)
elif params['encode'] == 'k-mer':
seq_matrix = np.array(data_set['Sequence'].apply(kmer_features, k=3).tolist()) # (number of sequences, 1, 4^k)
else:
raise Exception ('wrong encoding method')
Y_dev = data_set.Dev_log2_enrichment
Y_hk = data_set.Hk_log2_enrichment
Y = [Y_dev, Y_hk]
return seq_matrix, Y
def DeepSTARR(params):
if params['encode'] == 'one-hot':
input = kl.Input(shape=(249, 4))
elif params['encode'] == 'k-mer':
input = kl.Input(shape=(1, 64))
for i in range(params['convolution_layers']['n_layers']):
x = kl.Conv1D(params['convolution_layers']['filters'][i],
kernel_size = params['convolution_layers']['kernel_sizes'][i],
padding = params['pad'],
name=str('Conv1D_'+str(i+1)))(input)
x = kl.BatchNormalization()(x)
x = kl.Activation('relu')(x)
if params['encode'] == 'one-hot':
x = kl.MaxPooling1D(2)(x)
if params['dropout_conv'] == 'yes': x = kl.Dropout(params['dropout_prob'])(x)
# optional attention layers
for i in range(params['transformer_layers']['n_layers']):
if i == 0:
x = x + keras_nlp.layers.SinePositionEncoding()(x)
x = TransformerEncoder(intermediate_dim = params['transformer_layers']['attn_key_dim'][i],
num_heads = params['transformer_layers']['attn_heads'][i],
dropout = params['dropout_prob'])(x)
# After the convolutional layers, the output is flattened and passed through a series of fully connected/dense layers
# Flattening converts a multi-dimensional input (from the convolutions) into a one-dimensional array (to be connected with the fully connected layers
x = kl.Flatten()(x)
# Fully connected layers
# Each fully connected layer is followed by batch normalization, ReLU activation, and dropout
for i in range(params['n_dense_layer']):
x = kl.Dense(params['dense_neurons'+str(i+1)],
name=str('Dense_'+str(i+1)))(x)
x = kl.BatchNormalization()(x)
x = kl.Activation('relu')(x)
x = kl.Dropout(params['dropout_prob'])(x)
# Main model bottleneck
bottleneck = x
# heads per task (developmental and housekeeping enhancer activities)
# The final output layer is a pair of dense layers, one for each task (developmental and housekeeping enhancer activities), each with a single neuron and a linear activation function
tasks = ['Dev', 'Hk']
outputs = []
for task in tasks:
outputs.append(kl.Dense(1, activation='linear', name=str('Dense_' + task))(bottleneck))
# Build Keras model object
model = Model([input], outputs)
model.compile(Adam(learning_rate=params['lr']), # Adam optimizer
loss=['mse', 'mse'], # loss is Mean Squared Error (MSE)
loss_weights=[1, 1]) # in case we want to change the weights of each output. For now keep them with same weights
return model, params
def train(selected_model, X_train, Y_train, X_valid, Y_valid, params):
my_history=selected_model.fit(X_train, Y_train,
validation_data=(X_valid, Y_valid),
batch_size=params['batch_size'],
epochs=params['epochs'],
callbacks=[EarlyStopping(patience=params['early_stop'], monitor="val_loss", restore_best_weights=True), History()])
return selected_model, my_history
def summary_statistics(X, Y, set, task, main_model, main_params, out_dir):
pred = main_model.predict(X, batch_size=main_params['batch_size']) # predict
if task =="Dev":
i=0
if task =="Hk":
i=1
print(set + ' MSE ' + task + ' = ' + str("{0:0.2f}".format(mean_squared_error(Y, pred[i].squeeze()))))
print(set + ' PCC ' + task + ' = ' + str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0])))
print(set + ' SCC ' + task + ' = ' + str("{0:0.2f}".format(stats.spearmanr(Y, pred[i].squeeze())[0])))
return str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0]))
def main(config, indir, out_dir, label):
data = pd.read_table(indir)
params = LoadConfig(config)
X_train, Y_train = prepare_input(data[data['set'] == "Train"], params)
X_valid, Y_valid = prepare_input(data[data['set'] == "Val"], params)
X_test, Y_test = prepare_input(data[data['set'] == "Test"], params)
DeepSTARR(params)[0].summary()
DeepSTARR(params)[1]
main_model, main_params = DeepSTARR(params)
main_model, my_history = train(main_model, X_train, Y_train, X_valid, Y_valid, main_params)
endTime=time.time()
seconds=endTime-startTime
print("Total training time:",round(seconds/60,2),"minutes")
dev_results = summary_statistics(X_test, Y_test[0], "test", "Dev", main_model, main_params, out_dir)
hk_results = summary_statistics(X_test, Y_test[1], "test", "Hk", main_model, main_params, out_dir)
result = {
"AutoDNA": {
"means": {
"PCC(Dev)": dev_results,
"PCC(Hk)": hk_results
}
}
}
with open(f"{out_dir}/final_info.json", "w") as file:
json.dump(result, file, indent=4)
main_model.save(out_dir + '/' + label + '.h5')
if __name__ == "__main__":
try:
args = parse_arguments()
main(args.config, args.indir, args.out_dir, args.label)
except Exception as e:
print("Original error in subprocess:", flush=True)
if not os.path.exists(args.out_dir):
os.makedirs(args.out_dir)
traceback.print_exc(file=open(os.path.join(args.out_dir, "traceback.log"), "w"))
raise