7
7
from tornado .web import RequestHandler
8
8
from tornado .escape import json_decode
9
9
10
- import cesium .time_series
11
- import cesium .featurize
12
- import cesium .predict
13
- import cesium .featureset
10
+ from cesium import featurize , time_series
14
11
from cesium .features import CADENCE_FEATS , GENERAL_FEATS , LOMB_SCARGLE_FEATS
15
12
16
- import xarray as xr
17
13
import joblib
18
14
from os .path import join as pjoin
19
15
import uuid
20
16
import datetime
21
17
import os
22
18
import tempfile
19
+ import numpy as np
20
+ import pandas as pd
23
21
24
22
25
23
class PredictionHandler (BaseHandler ):
@@ -82,27 +80,39 @@ def post(self):
82
80
if (model .finished is None ) or (fset .finished is None ):
83
81
return self .error ('Computation of model or feature set still in progress' )
84
82
85
- prediction_path = pjoin (cfg ['paths' ]['predictions_folder' ],
86
- '{}_prediction.nc ' .format (uuid .uuid4 ()))
87
- prediction_file = File .create (uri = prediction_path )
83
+ pred_path = pjoin (cfg ['paths' ]['predictions_folder' ],
84
+ '{}_prediction.npz ' .format (uuid .uuid4 ()))
85
+ prediction_file = File .create (uri = pred_path )
88
86
prediction = Prediction .create (file = prediction_file , dataset = dataset ,
89
87
project = dataset .project , model = model )
90
88
91
89
executor = yield self ._get_executor ()
92
90
93
- all_time_series = executor .map (cesium . time_series .from_netcdf ,
94
- dataset . uris )
95
- all_features = executor .map (cesium . featurize .featurize_single_ts ,
91
+ all_time_series = executor .map (time_series .load , dataset . uris )
92
+ all_labels = executor . map ( lambda ts : ts . label , all_time_series )
93
+ all_features = executor .map (featurize .featurize_single_ts ,
96
94
all_time_series ,
97
95
features_to_use = fset .features_list ,
98
96
custom_script_path = fset .custom_features_script )
99
- fset_data = executor .submit (cesium . featurize .assemble_featureset ,
97
+ fset_data = executor .submit (featurize .assemble_featureset ,
100
98
all_features , all_time_series )
101
- fset_data = executor .submit (cesium .featureset .Featureset .impute , fset_data )
102
- model_data = executor .submit (joblib .load , model .file .uri )
103
- predset = executor .submit (cesium .predict .model_predictions ,
104
- fset_data , model_data )
105
- future = executor .submit (xr .Dataset .to_netcdf , predset , prediction_path )
99
+ imputed_fset = executor .submit (featurize .impute_featureset ,
100
+ fset_data , inplace = False )
101
+ model_or_gridcv = executor .submit (joblib .load , model .file .uri )
102
+ model_data = executor .submit (lambda model : model .best_estimator_
103
+ if hasattr (model , 'best_estimator_' ) else model ,
104
+ model_or_gridcv )
105
+ preds = executor .submit (lambda fset , model : model .predict (fset ),
106
+ imputed_fset , model_data )
107
+ pred_probs = executor .submit (lambda fset , model : model .predict_proba (fset )
108
+ if hasattr (model , 'predict_proba' ) else [],
109
+ imputed_fset , model_data )
110
+ all_classes = executor .submit (lambda model : model .classes_
111
+ if hasattr (model , 'classes_' ) else [],
112
+ model_data )
113
+ future = executor .submit (featurize .save_featureset , imputed_fset ,
114
+ pred_path , labels = all_labels , preds = preds ,
115
+ pred_probs = pred_probs , all_classes = all_classes )
106
116
107
117
prediction .task_id = future .key
108
118
prediction .save ()
@@ -114,14 +124,18 @@ def post(self):
114
124
115
125
def get (self , prediction_id = None , action = None ):
116
126
if action == 'download' :
117
- prediction = cesium .featureset .from_netcdf (self ._get_prediction (prediction_id ).file .uri )
118
- with tempfile .NamedTemporaryFile () as tf :
119
- util .prediction_to_csv (prediction , tf .name )
120
- with open (tf .name ) as f :
121
- self .set_header ("Content-Type" , 'text/csv; charset="utf-8"' )
122
- self .set_header ("Content-Disposition" ,
123
- "attachment; filename=cesium_prediction_results.csv" )
124
- self .write (f .read ())
127
+ pred_path = self ._get_prediction (prediction_id ).file .uri
128
+ fset , data = featurize .load_featureset (pred_path )
129
+ result = pd .DataFrame ({'ts_name' : fset .index ,
130
+ 'label' : data ['labels' ],
131
+ 'prediction' : data ['preds' ]},
132
+ columns = ['ts_name' , 'label' , 'prediction' ])
133
+ if data .get ('pred_probs' ):
134
+ result ['probability' ] = np .max (data ['pred_probs' ], axis = 1 )
135
+ self .set_header ("Content-Type" , 'text/csv; charset="utf-8"' )
136
+ self .set_header ("Content-Disposition" , "attachment; "
137
+ "filename=cesium_prediction_results.csv" )
138
+ self .write (result .to_csv (index = False ))
125
139
else :
126
140
if prediction_id is None :
127
141
predictions = [prediction
@@ -144,20 +158,22 @@ class PredictRawDataHandler(BaseHandler):
144
158
def post (self ):
145
159
ts_data = json_decode (self .get_argument ('ts_data' ))
146
160
model_id = json_decode (self .get_argument ('modelID' ))
147
- meta_feats = json_decode (
148
- self .get_argument ('meta_features' , 'null' ))
149
- impute_kwargs = json_decode (
150
- self .get_argument ('impute_kwargs' , '{}' ))
161
+ meta_feats = json_decode (self .get_argument ('meta_features' , 'null' ))
162
+ impute_kwargs = json_decode (self .get_argument ('impute_kwargs' , '{}' ))
151
163
152
164
model = Model .get (Model .id == model_id )
153
- computed_model = joblib .load (model .file .uri )
165
+ model_data = joblib .load (model .file .uri )
166
+ if hasattr (model_data , 'best_estimator_' ):
167
+ model_data = model_data .best_estimator_
154
168
features_to_use = model .featureset .features_list
155
169
156
- fset_data = cesium .featurize .featurize_time_series (
157
- * ts_data , features_to_use = features_to_use , meta_features = meta_feats )
158
- fset = cesium .featureset .Featureset (fset_data ).impute (** impute_kwargs )
159
-
160
- predset = cesium .predict .model_predictions (fset , computed_model )
161
- predset ['name' ] = predset .name .astype ('str' )
162
-
163
- return self .success (predset )
170
+ fset = featurize .featurize_time_series (* ts_data ,
171
+ features_to_use = features_to_use ,
172
+ meta_features = meta_feats )
173
+ fset = featurize .impute_featureset (fset , ** impute_kwargs )
174
+ data = {'preds' : model_data .predict (fset ),
175
+ 'all_classes' : model_data .classes_ }
176
+ if hasattr (model_data , 'predict_proba' ):
177
+ data ['pred_probs' ] = model_data .predict_proba (fset )
178
+ pred_info = Prediction .format_pred_data (fset , data )
179
+ return self .success (pred_info )
0 commit comments