13
13
from tqsdk .channel import TqChan
14
14
from tqsdk .diff import _get_obj
15
15
from tqsdk .rangeset import _rangeset_difference , _rangeset_intersection
16
- from tqsdk .tafunc import get_dividend_df
17
- from tqsdk .utils import _generate_uuid
16
+ from tqsdk .utils import _generate_uuid , _get_dividend_factor
18
17
19
18
CACHE_DIR = os .path .join (os .path .expanduser ('~' ), ".tqsdk/data_series_1" )
20
19
@@ -144,7 +143,8 @@ async def _run(self):
144
143
# 复权, 如果存在 STOCK / FUND 并且 adj_type is not None, 这里需要提前准备下载时间段内的复权因子
145
144
quote = self ._api .get_quote (symbol )
146
145
if self ._adj_type and quote .ins_class in ["STOCK" , "FUND" ]:
147
- factor_df = await self ._update_dividend_factor (symbol ) # 复权需要根据日线计算除权因子,todo: 如果本地下载过日线,不需要再从服务器获取日线数据
146
+ factor_df = await _get_dividend_factor (self ._api , quote , self ._start_dt_nano , self ._end_dt_nano ,
147
+ chart_id_prefix = "PYSDK_data_factor" ) # 复权需要根据日线计算除权因子,todo: 如果本地下载过日线,不需要再从服务器获取日线数据
148
148
if self ._adj_type == "F" :
149
149
# 倒序循环 factor_df, 对于小于当前 factor_df[datetime] 的行 乘以 factor_df[factor]
150
150
for i in range (factor_df .shape [0 ] - 1 , - 1 , - 1 ):
@@ -212,13 +212,12 @@ async def _download_data(self, start_dt, end_dt, data_chan):
212
212
async for _ in update_chan :
213
213
if not (chart_info .items () <= _get_obj (chart , ["state" ]).items ()):
214
214
continue # 当前请求还没收齐回应, 不应继续处理
215
+ if chart .get ("ready" , False ) is False :
216
+ continue # 数据还没全部收到
217
+ if serial .get ("last_id" , - 1 ) == - 1 :
218
+ return # 合约没有任何数据
215
219
left_id = chart .get ("left_id" , - 1 )
216
220
right_id = chart .get ("right_id" , - 1 )
217
- if (left_id == - 1 and right_id == - 1 ) or self ._api ._data .get ("mdhis_more_data" , True ):
218
- continue # 定位信息还没收到, 或数据序列还没收到
219
- # 检查合约的数据是否收到
220
- if serial .get ("last_id" , - 1 ) == - 1 :
221
- continue
222
221
if current_id is None :
223
222
current_id = max (left_id , 0 )
224
223
while current_id <= right_id :
@@ -245,51 +244,6 @@ async def _download_data(self, start_dt, end_dt, data_chan):
245
244
"view_width" : 2000 ,
246
245
})
247
246
248
- async def _update_dividend_factor (self , symbol ):
249
- quote = self ._api .get_quote (symbol )
250
- df = get_dividend_df (quote .stock_dividend_ratio , quote .cash_dividend_ratio )
251
- between = df ["datetime" ].between (self ._start_dt_nano , self ._end_dt_nano ) # 只需要开始时间~结束时间之间的复权因子
252
- df ["pre_close" ] = float ('nan' )
253
- for i in df [between ].index :
254
- chart_info = {
255
- "aid" : "set_chart" ,
256
- "chart_id" : _generate_uuid ("PYSDK_data_factor" ),
257
- "ins_list" : symbol ,
258
- "duration" : 86400 * 1000000000 ,
259
- "view_width" : 2 ,
260
- "focus_datetime" : int (df .iloc [i ].datetime ),
261
- "focus_position" : 1
262
- }
263
- await self ._api ._send_chan .send (chart_info )
264
- chart = _get_obj (self ._api ._data , ["charts" , chart_info ["chart_id" ]])
265
- serial = _get_obj (self ._api ._data , ["klines" , symbol , str (86400000000000 )])
266
- try :
267
- async with self ._api .register_update_notify () as update_chan :
268
- async for _ in update_chan :
269
- if not (chart_info .items () <= _get_obj (chart , ["state" ]).items ()):
270
- continue # 当前请求还没收齐回应, 不应继续处理
271
- left_id = chart .get ("left_id" , - 1 )
272
- right_id = chart .get ("right_id" , - 1 )
273
- if (left_id == - 1 and right_id == - 1 ) or self ._api ._data .get ("mdhis_more_data" ,
274
- True ) or serial .get ("last_id" ,
275
- - 1 ) == - 1 :
276
- continue # 定位信息还没收到, 或数据序列还没收到, 合约的数据是否收到
277
- last_item = serial ["data" ].get (str (left_id ), {})
278
- # 复权时间点的昨收盘
279
- df .loc [i , 'pre_close' ] = last_item ['close' ] if last_item .get ('close' ) else float ('nan' )
280
- break
281
- finally :
282
- await self ._api ._send_chan .send ({
283
- "aid" : "set_chart" ,
284
- "chart_id" : chart_info ["chart_id" ],
285
- "ins_list" : "" ,
286
- "duration" : 86400000000000 ,
287
- "view_width" : 2
288
- })
289
- df ["factor" ] = (df ["pre_close" ] - df ["cash_dividend" ]) / df ["pre_close" ] / (1 + df ["stock_dividend" ])
290
- df ["factor" ].fillna (1 , inplace = True )
291
- return df
292
-
293
247
def _merge_rangeset (self ):
294
248
symbol = self ._symbol_list [0 ]
295
249
rangeset = DataSeries ._get_rangeset_id (symbol , self ._dur_nano )
0 commit comments