-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils_tsg.py
More file actions
122 lines (96 loc) · 3.3 KB
/
utils_tsg.py
File metadata and controls
122 lines (96 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
## Necessary Packages
import numpy as np
import tensorflow as tf
def train_test_divide (data_x, data_x_hat, data_t, data_t_hat, train_rate = 0.8):
"""Divide train and test data for both original and synthetic data.
Args:
- data_x: original data
- data_x_hat: generated data
- data_t: original time
- data_t_hat: generated time
- train_rate: ratio of training data from the original data
"""
# Divide train/test index (original data)
no = len(data_x)
idx = np.random.permutation(no)
train_idx = idx[:int(no*train_rate)]
test_idx = idx[int(no*train_rate):]
train_x = [data_x[i] for i in train_idx]
test_x = [data_x[i] for i in test_idx]
train_t = [data_t[i] for i in train_idx]
test_t = [data_t[i] for i in test_idx]
# Divide train/test index (synthetic data)
no = len(data_x_hat)
idx = np.random.permutation(no)
train_idx = idx[:int(no*train_rate)]
test_idx = idx[int(no*train_rate):]
train_x_hat = [data_x_hat[i] for i in train_idx]
test_x_hat = [data_x_hat[i] for i in test_idx]
train_t_hat = [data_t_hat[i] for i in train_idx]
test_t_hat = [data_t_hat[i] for i in test_idx]
return train_x, train_x_hat, test_x, test_x_hat, train_t, train_t_hat, test_t, test_t_hat
def extract_time (data):
"""Returns Maximum sequence length and each sequence length.
Args:
- data: original data
Returns:
- time: extracted time information
- max_seq_len: maximum sequence length
"""
time = list()
max_seq_len = 0
for i in range(len(data)):
max_seq_len = max(max_seq_len, len(data[i][:,0]))
time.append(len(data[i][:,0]))
return time, max_seq_len
def rnn_cell(module_name, hidden_dim):
"""Basic RNN Cell.
Args:
- module_name: gru, lstm, or lstmLN
Returns:
- rnn_cell: RNN Cell
"""
assert module_name in ['gru','lstm','lstmLN']
# GRU
if (module_name == 'gru'):
rnn_cell = tf.nn.rnn_cell.GRUCell(num_units=hidden_dim, activation=tf.nn.tanh)
# LSTM
elif (module_name == 'lstm'):
rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units=hidden_dim, activation=tf.nn.tanh)
# LSTM Layer Normalization
elif (module_name == 'lstmLN'):
rnn_cell = tf.contrib.rnn.LayerNormBasicLSTMCell(num_units=hidden_dim, activation=tf.nn.tanh)
return rnn_cell
def random_generator (batch_size, z_dim, T_mb, max_seq_len):
"""Random vector generation.
Args:
- batch_size: size of the random vector
- z_dim: dimension of random vector
- T_mb: time information for the random vector
- max_seq_len: maximum sequence length
Returns:
- Z_mb: generated random vector
"""
Z_mb = list()
for i in range(batch_size):
temp = np.zeros([max_seq_len, z_dim])
temp_Z = np.random.uniform(0., 1, [T_mb[i], z_dim])
temp[:T_mb[i],:] = temp_Z
Z_mb.append(temp_Z)
return Z_mb
def batch_generator(data, time, batch_size):
"""Mini-batch generator.
Args:
- data: time-series data
- time: time information
- batch_size: the number of samples in each batch
Returns:
- X_mb: time-series data in each batch
- T_mb: time information in each batch
"""
no = len(data)
idx = np.random.permutation(no)
train_idx = idx[:batch_size]
X_mb = list(data[i] for i in train_idx)
T_mb = list(time[i] for i in train_idx)
return X_mb, T_mb