qff.price.fetch 源代码

# coding :utf-8
#
# The MIT License (MIT)
#
# Copyright (c) 2016-2019 XuHaiJiang/QFF
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
通过通达信接口获取股票实时价格数据
"""
import json
import pandas as pd
from datetime import date
from pytdx.hq import TdxHq_API
from retrying import retry
from qff.tools.date import is_trade_day, get_trade_gap, get_real_trade_date
from qff.tools.logs import log
from qff.tools.tdx import get_best_ip, select_market_code, select_index_code


__all__ = ["fetch_price", "fetch_ticks", "fetch_current_ticks", "fetch_today_transaction",
           "fetch_today_min_curve", "fetch_stock_xdxr", "fetch_stock_block"]


def _select_freq(freq):
    if freq in ['day', 'd', 'D', 'DAY', 'Day']:
        freq, c = 9, 1
    elif str(freq) in ['1', '1m', '1min', 'one']:
        freq, c = 8, 240
    elif str(freq) in ['5', '5m', '5min', 'five']:
        freq, c = 0, 48
    elif str(freq) in ['15', '15m', '15min', 'fifteen']:
        freq, c = 1, 16
    elif str(freq) in ['30', '30m', '30min', 'half']:
        freq, c = 2, 8
    elif str(freq) in ['60', '60m', '60min', '1h']:
        freq, c = 3, 4
    elif freq in ['w', 'W', 'Week', 'week']:
        freq, c = 5, 1/5
    elif freq in ['month', 'M', 'mon', 'Month']:
        freq, c = 6, 1/20
    elif freq in ['Q', 'Quarter', 'q']:
        freq, c = 10, 1/60
    elif freq in ['y', 'Y', 'year', 'Year']:
        freq, c = 11, 1/250
    else:
        freq, c = 0, 0

    return freq, c


def _calc_today_min_len():
    # 计算当天1分钟曲线的数据记录长度
    _now = pd.Timestamp.now()
    s_now = _now.strftime('%Y-%m-%d')
    if not is_trade_day(s_now):
        data_len = 240
    else:
        open_time = pd.to_datetime('{0} 09:31:00'.format(s_now))
        # 计算当前时间和开盘时间的分钟数
        interval = int((_now.ceil('1min') - open_time).total_seconds()/60)
        if interval <= 0:
            data_len = 240
        elif 0 < interval <= 120:
            data_len = interval
        elif 120 < interval <= 210:
            data_len = 120
        elif 210 < interval <= 330:
            data_len = interval - 120
        else:
            data_len = 240
    return data_len


# @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100)
def fetch_price(code, count=None, freq='day', market='stock', start=None):
    """
    从tdx服务器上获取曲线数据,仅可查询一支股票,按天或者分钟,返回数据格式为 DataFrame

    :param code: 一支股票代码或者一个指数代码
    :param count: 返回的结果集的行数, 即表示获取至当前时刻之前几个frequency的数据,-1表示所有数据。
        与 start 二选一,不可同时使用,如果同时存在,start参数无效
    :param freq: 单位时间长度, 天或者分钟, 现在支持,day/week/month/quarter/year/1m/5m/15m/30m/60m ,默认值是day
    :param market: 市场类型,目前支持“stock/index/etf", 默认“stock".
    :param start: 开始日期,不带分钟信息。与 count 二选一,不可同时使用. 字符串或者 datetime.date 对象,如果 count
        和 start 参数都没有, 则取count=1,即获取最近一条数据。

    :type code: str
    :type count: int
    :type freq: str
    :type market: str
    :type start: str

    :return:
        返回[pandas.DataFrame]对象, 行索引是date(分钟级别数据为datetime), 列索引是行情字段名字.
        如果只获取了一天,而当天停牌,返回None,注意:所有数据都未复权
    """
    freq, c = _select_freq(freq)
    if count is None:
        if start is None:
            count = 1
        else:
            count = get_trade_gap(start, str(date.today()))
            count = int(count * c)
            if count > 40800:
                count = 40800
    elif count <= 0:
        count = 40800

    ip, port = get_best_ip()
    api = TdxHq_API()
    try:
        with api.connect(ip, port):
            ret = []
            _start = 0
            while count > 0:
                _len = 800 if count > 800 else count
                if market in ['stock', 'etf']:
                    df = api.get_security_bars(freq, select_market_code(code), code, _start, _len)
                elif market == 'index':
                    df = api.get_index_bars(freq, select_index_code(code), code, _start, _len)
                if df is not None and len(df) > 0:
                    df = api.to_df(df)
                    ret.append(df)
                    _start += _len
                    count -= _len
                else:
                    break

            if len(ret) > 0:
                data = pd.concat(ret, axis=0, sort=False) if len(ret) > 1 else ret[0]

                if freq in [0, 1, 2, 3, 8]:  # 分钟数据
                    data = data.drop(['year', 'month', 'day', 'hour', 'minute'], axis=1, inplace=False)
                    data = data.assign(datetime=data['datetime'] + ':00')
                    data.set_index('datetime', inplace=True)

                else:
                    data = data.assign(date=data['datetime'].apply(lambda x: str(x[0:10])))
                    data = data.drop(['year', 'month', 'day', 'hour', 'minute', 'datetime'], axis=1, inplace=False)
                    data.set_index('date', inplace=True)
                data.sort_index(inplace=True)
                data.insert(0, 'code', code)
                if start is not None:
                    data = data.loc[start:]
                return data
            else:
                # 这里的问题是: 如果只取了一天的股票,而当天停牌, 那么就直接返回None了
                return None

    except Exception as err:
        log.error(f'fetch_price exception:{err}')
        return None


[文档]def fetch_today_min_curve(code, market='stock'): """ 获取当天的分钟曲线,返回当前时间前的当日1分钟曲线数据,用于模拟交易环境 :param code: 一支股票代码或者一个指数代码 :param market: 市场类型,目前支持“stock/index/etf", 默认“stock". :type code: str :type market: str :return: 返回[pandas.DataFrame]对象, 行索引是date(分钟级别数据为datetime), 列索引是行情字段名字. """ count = _calc_today_min_len() return fetch_price(code, count, freq='1m', market=market)
def fetch_current_ticks(code, market='stock'): """ 获取单个股票或指数当前时刻的ticks数据 :param code: 一支股票代码或者一个指数代码 :param market: 市场类型,目前支持“stock"和”index", 默认“stock". :type code: str :type market: str :return: 返回[Dict]对象,各字段含义如下: ================== ==================== 字段名 含义 ================== ==================== price 当前价格 last_close 昨日收盘价 open 当日开盘价 high 截至到当前时刻的日内最高价 low 截至到当前时刻的日内最低价 vol 截至到当前时刻的日内总手数 cur_vol 当前tick成交笔数 amount 截至到当前时刻的日内总成交额 s_vol 内盘 b_vol 外盘 bid1~bid5 买一到买五价格 ask1~ask5 卖一到卖五价格 bid_vol1~bid_vol5 买一到买五挂单手数 ask_vol1~ask_vol5 卖一到卖五挂单手数 ================== ==================== """ ip, port = get_best_ip() api = TdxHq_API() with api.connect(ip, port): data = api.get_security_quotes([(select_market_code(code, market), code)])[0] data = json.loads(json.dumps(data)) return data
[文档]@retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def fetch_ticks(code, market='stock'): """ 获取股票或指数列表当前时刻的ticks数据 :param code: 一支股票代码或者一个指数代码列表 :param market: 市场类型,目前支持“stock"和”index", 默认“stock". :type code: list :type market: str :return: 返回[pandas.DataFrame]对象,当前股票列表对应的tick数据。tick字段描述请见 :ref:`class_tick` """ stocks = [(select_market_code(code, market), code) for code in code] ip, port = get_best_ip() api = TdxHq_API() with api.connect(ip, port): data = pd.concat( [api.to_df(api.get_security_quotes(stocks[i: i+80])) for i in range(len(stocks) + 1)] ) return data
[文档]def fetch_today_transaction(code): """ 获取当日实时分笔成交信息,包含集合竞价 :param code: 一支股票代码 :type code: str :return: 返回对应股票的分笔成交信息, :: price vol num buyorsell datetime 2022-12-30 09:25 13.04 3119 153 2 2022-12-30 09:30 13.04 2965 89 0 2022-12-30 09:30 13.05 10320 182 0 其中: buyorsell 1--sell 0--buy 2--盘前' """ ip, port = get_best_ip() api = TdxHq_API() try: with api.connect(ip, port): # data = pd.DataFrame() data = pd.concat([api.to_df(api.get_transaction_data( select_market_code(str(code)), code, (2 - i) * 2000, 2000)) for i in range(3)], axis=0, sort=False) data = data.dropna() day = get_real_trade_date(date.today()) data = data.assign(datetime=data['time'].apply(lambda x: day + ' ' + str(x))) data = data.drop(['time'], axis=1) data.set_index('datetime', inplace=True) if 'value' in data.columns: data = data.drop(['value'], axis=1) return data except Exception as err: log.error(f'fetch_today_transaction exception:{err}') return None
def fetch_stock_info(code): """ 获取当日股票基本信息 :param code: 股票代码 :return: dict """ ip, port = get_best_ip() api = TdxHq_API() market_code = select_market_code(code) with api.connect(ip, port): return api.get_finance_info(market_code, code) def fetch_stock_list(market='stock'): """ 获取当日股票/指数/ETF列表 :param market: 市场类型stock/index/etf :return: dataframe """ ip, port = get_best_ip() api = TdxHq_API() with api.connect(ip, port): # 读取深圳市场股票代码 sz = pd.concat([api.to_df(api.get_security_list(0, i * 1000)) for i in range(int(api.get_security_count(0) / 1000) + 1)], axis=0, sort=False) sz = sz[['code', 'name']].dropna() if market == 'stock': sz = sz[sz['code'].str[:2].isin(['00', '30', '02'])] elif market == 'index': sz = sz[sz['code'].str[:2].isin(['39'])] elif market == 'etf': sz = sz[sz['code'].str[:2].isin(['15'])] else: log.error("fetch_stock_list: 参数market错误!") return None # 读取上海市场股票代码 sh = pd.concat([api.to_df(api.get_security_list(1, i*1000)) for i in range(int(api.get_security_count(1) / 1000) + 1)], axis=0, sort=False) sh = sh[['code', 'name']].dropna() if market == 'stock': sh = sh[sh['code'].str.startswith('6')] elif market == 'index': sh = sh[sh['code'].str[:3].isin(['000', '880'])] elif market == 'etf': sh = sh[sh['code'].str[:2].isin(['51'])] else: log.error("fetch_stock_list: 参数market错误!") return None # 读取北京市场股票代码 # bj = pd.concat([api.to_df(api.get_security_list(2, i*1000)) # for i in range(int(api.get_security_count(2) / 1000) + 1)], axis=0, sort=False) # if bj is not None and len(bj) > 0: # bj = bj[['code', 'name']].dropna() # if market == 'stock': # bj = bj[bj['code'].str[:2].isin(['43', '83', '87', '82', '88'])] # elif market == 'index': # bj = bj[bj['code'].str[:2].isin(['89'])] # else: # log.error("fetch_stock_list: 参数market错误!") # return None # data = pd.concat([sz, sh, bj], sort=False).set_index('code') # else: # data = pd.concat([sz, sh], sort=False).set_index('code') data = pd.concat([sz, sh], sort=False).set_index('code') return data.sort_index().assign(name=data['name'].apply(lambda x: str(x)[0:6].strip().strip(b'\x00'.decode()))) @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def fetch_stock_xdxr(code): """ 获取除权除息数据 :param code: :return: """ market_code = select_market_code(code) ip, port = get_best_ip() api = TdxHq_API() # with api.connect(ip, port): api.connect(ip, port) category = { '1': '除权除息', '2': '送配股上市', '3': '非流通股上市', '4': '未知股本变动', '5': '股本变化', '6': '增发新股', '7': '股份回购', '8': '增发新股上市', '9': '转配股上市', '10': '可转债上市', '11': '扩缩股', '12': '非流通股缩股', '13': '送认购权证', '14': '送认沽权证'} data = api.to_df(api.get_xdxr_info(market_code, code)) if len(data) >= 1 and 'year' in data.columns: data = data \ .assign(date=pd.to_datetime(data[['year', 'month', 'day']])) \ .drop(['year', 'month', 'day'], axis=1) \ .assign(category_meaning=data['category'].apply( lambda x: category[str(x)])) \ .assign(code=str(code)) \ .rename(index=str, columns={'panhouliutong': 'liquidity_after', 'panqianliutong': 'liquidity_before', 'houzongguben': 'shares_after', 'qianzongguben': 'shares_before'}) \ data = data.assign(date=data['date'].apply(lambda x: str(x)[0:10]))\ .set_index('date', drop=False, inplace=False).sort_index() else: data = None api.close() return data def fetch_stock_block(): """ 获取股票板块数据,包括概念板块、风格板块、指数板块 """ ip, port = get_best_ip() api = TdxHq_API() with api.connect(ip, port): data = pd.concat([api.to_df( api.get_and_parse_block_info("block_gn.dat")).assign(type='gn'), api.to_df(api.get_and_parse_block_info( "block.dat")).assign(type='yb'), api.to_df(api.get_and_parse_block_info( "block_zs.dat")).assign(type='zs'), api.to_df(api.get_and_parse_block_info( "block_fg.dat")).assign(type='fg')], sort=False) if len(data) > 10: return data.assign(source='tdx').drop(['block_type', 'code_index'], axis=1).set_index('code', drop=False, inplace=False).drop_duplicates() else: log.error("fetch_stock_block: 错误!") return None if __name__ == '__main__': fetch_stock_list("stock")