Tardis
Tardis 提供面向加密货币市场的细粒度数据,包括逐笔(tick-by-tick)的订单薄快照与增量更新、成交(trades)、持仓量(open interest)、资金费率(funding rates)、期权链(options chains)以及主流交易所的强制平仓/清算(liquidations)数据。
NautilusTrader 提供与 Tardis API 及其数据格式的集成,便于无缝访问 Tardis 数据。该适配器的主要功能包括:
TardisCSVDataLoader:读取 Tardis 格式的 CSV 文件并转换为 Nautilus 数据,支持批量加载与内存友好的流式处理。TardisMachineClient:支持从 Tardis Machine WebSocket 服务的实时流与历史回放,将消息转换为 Nautilus 可用的数据结构。TardisHttpClient:向 Tardis HTTP API 请求 instrument 定义元数据,并解析为 Nautilus 的 instrument 定义。TardisDataClient:提供用于订阅来自 Tardis Machine WebSocket 的实时数据流的客户端。TardisInstrumentProvider:通过 Tardis 的 HTTP instrument 元数据 API 提供 instrument 定义。- 数据管道函数(Data pipeline functions):支持从 Tardis Machine 回放历史数据并将其写入 Nautilus 的 Parquet 格式,且可直接集成到 catalog 以简化数据管理(详见下文)。
使用该适配器需要提供 Tardis API key。另请参见 “环境变量(environment variables)” 小节。
概览
该适配器以 Rust 实现,并提供可选的 Python 绑定,以便在 Python 工作流中更方便地使用。适配器本身不依赖任何外部的 Tardis 客户端库。
无需对 tardis 做额外的安装步骤。适配器的核心组件以静态库方式编译,并在构建过程中自动链接。
Tardis 文档
Tardis 提供详尽的用户文档。建议在使用本集成指南时,同时参考 Tardis 官方文档以获取更深入的细节。
支持的格式
Tardis 提供“归一化(normalized)”的市场数据——在所有支持的交易所之间保持统一格式。这种归一化非常有价值,因为单个解析器即可处理任何 Tardis 支持的交易所 的数据,从而减少开发工作与复杂性。因此,NautilusTrader 暂不直接支持各交易所的原生市场数据格式,以避免为每个交易所实现各自解析器的不必要开销。
下列归一化的 Tardis 格式被 NautilusTrader 支持:
| Tardis 格式 | Nautilus 数据类型 |
|---|---|
| book_change | OrderBookDelta |
| booksnapshot* | OrderBookDepth10 |
| quote | QuoteTick |
| quote_10s | QuoteTick |
| trade | Trade |
| tradebar* | Bar |
| instrument | CurrencyPair, CryptoFuture, CryptoPerpetual, OptionContract |
| derivative_ticker | 尚未支持 |
| disconnect | 不适用 |
注:
- quote 是 book_snapshot_1_0ms 的别名。
- quote_10s 是 book_snapshot_1_10s 的别名。
- quote、quote_10s 以及一层快照(one-level snapshots)都会被解析为
QuoteTick。
另请参见 Tardis 的归一化市场数据 API。
Bar(K 线)
适配器会自动将 Tardis 的 trade bar 间隔与后缀 转换为 Nautilus 的 BarType,包括:
| Tardis 后缀 | Nautilus bar 聚合单位 |
|---|---|
| ms - 毫秒 | MILLISECOND |
| s - 秒 | SECOND |
| m - 分钟 | MINUTE |
| ticks - 以成交笔数为单位 | TICK |
| vol - 成交量大小 | VOLUME |
代码与符号规范(Symbology and normalization)
Tardis 集成通过一致的符号归一化,保证与 NautilusTrader 的加密货币交易所适配器无缝兼容。通常情况下,NautilusTrader 会采用 Tardis 提供的原生交易所命名规则。但对于某些交易所,原始符号会按照 Nautilus 的符号规范进行调整,具体如下:
通用规则
- 所有符号会转换为大写。
- 对于某些交易所,会在市场类型后缀前添加连字符(hyphen)(详见交易所特定的规范)。
- 原始交易所符号会保留在 Nautilus instrument 定义的
raw_symbol字段中。
交易所特定的规范
- Binance:Nautilus 会在所有永续合约符号后追加
-PERP后缀。 - Bybit:Nautilus 使用特定的产品类别后缀,包括
-SPOT、-LINEAR、-INVERSE、-OPTION。 - dYdX:Nautilus 会在所有永续合约符号后追加
-PERP后缀。 - Gate.io:Nautilus 会在所有永续合约符号后追加
-PERP后缀。
有关各交易所符号规范的详细说明,参见:
Venues(交易场所)
Tardis 中的某些交易所在逻辑上被划分为多个 venue。下表列出了 Nautilus 的 venue 与对应的 Tardis 交易所映射关系:
| Nautilus venue | Tardis 交易所(s) |
|---|---|
ASCENDEX | ascendex |
BINANCE | binance, binance-dex, binance-european-options, binance-futures, binance-jersey, binance-options |
BINANCE_DELIVERY | binance-delivery(币本位合约) |
BINANCE_US | binance-us |
BITFINEX | bitfinex, bitfinex-derivatives |
BITFLYER | bitflyer |
BITGET | bitget, bitget-futures |
BITMEX | bitmex |
BITNOMIAL | bitnomial |
BITSTAMP | bitstamp |
BLOCKCHAIN_COM | blockchain-com |
BYBIT | bybit, bybit-options, bybit-spot |
COINBASE | coinbase |
COINBASE_INTX | coinbase-international |
COINFLEX | coinflex(用于历史研究) |
CRYPTO_COM | crypto-com, crypto-com-derivatives |
CRYPTOFACILITIES | cryptofacilities |
DELTA | delta |
DERIBIT | deribit |
DYDX | dydx |
DYDX_V4 | dydx-v4 |
FTX | ftx, ftx-us(用于历史研究) |
GATE_IO | gate-io, gate-io-futures |
GEMINI | gemini |
HITBTC | hitbtc |
HUOBI | huobi, huobi-dm, huobi-dm-linear-swap, huobi-dm-options |
HUOBI_DELIVERY | huobi-dm-swap |
HYPERLIQUID | hyperliquid |
KRAKEN | kraken |
KUCOIN | kucoin, kucoin-futures |
MANGO | mango |
OKCOIN | okcoin |
OKEX | okex, okex-futures, okex-options, okex-spreads, okex-swap |
PHEMEX | phemex |
POLONIEX | poloniex |
SERUM | serum(用于历史研究) |
STAR_ATLAS | star-atlas |
UPBIT | upbit |
WOO_X | woo-x |
环境变量
下列环境变量用于配置 Tardis 与 NautilusTrader:
TM_API_KEY:Tardis Machine 的 API key。TARDIS_API_KEY:NautilusTrader Tardis 客户端使用的 API key。TARDIS_MACHINE_WS_URL(可选):NautilusTrader 中TardisMachineClient使用的 WebSocket URL。TARDIS_BASE_URL(可选):NautilusTrader 中TardisHttpClient使用的基础 URL。NAUTILUS_PATH(可选):包含catalog/子目录的父目录,用于将回放数据以 Nautilus catalog 格式写入。
运行 Tardis Machine 的历史回放(historical replays)
Tardis Machine Server 是一个可在本地运行的服务,带有内置数据缓存,通过 HTTP 与 WebSocket API 提供逐 tick 的历史数据和实时汇总数据。
你可以执行完整的 Tardis Machine WebSocket 历史回放,并将结果输出为 Nautilus Parquet 格式,支持 Python 或 Rust 两种方式。由于回放功能在 Rust 中实现,无论从 Python 还是 Rust 调用,性能相同,你可以按工作流偏好来选择。
端到端的 run_tardis_machine_replay 数据管道函数会根据指定的配置 执行以下步骤:
- 连接到 Tardis Machine 服务器。
- 向 Tardis 的 instruments metadata HTTP API 请求并解析所需的 instrument 定义。
- 从 Tardis Machine 服务器按指定时间范围流式请求所有目标 instruments 与数据类型。
- 对于每个 instrument、数据类型和日期(UTC),生成一个符合 catalog 的
.parquet文件。 - 断开与 Tardis Machine 的连接并结束进程。
文件命名约定
按天、按 instrument 写入文件,使用 ISO 8601 时间戳范围以明确表示数据的时间跨度:
- 格式:
{start_timestamp}_{end_timestamp}.parquet - 示例:
2023-10-01T00-00-00-000000000Z_2023-10-01T23-59-59-999999999Z.parquet - 目录结构:
data/{data_type}/{instrument_id}/{filename}
该格式与 Nautilus 数据 catalog 完全兼容,便于查询、合并与数据管理操作。
对于每月的第一天,您可以在无需 API key 的情况下请求数据;其他日期则需要 Tardis Machine 的 API key。
此流程针对直接输出到 Nautilus Parquet 数据 catalog 进行了优化。请确保将 NAUTILUS_PATH 环境变量设置为包含 catalog/ 子目录的父目录。Parquet 文件将按数据类型与 instrument 存放在 <NAUTILUS_PATH>/catalog/data/ 的相应子目录下。
如果配置文件中未指定 output_path 且 NAUTILUS_PATH 未设置,则系统将默认使用当前工作目录。
操作步骤
首先,确保 tardis-machine 的 Docker 容器正在运行。可以使用如下命令启动:
docker run -p 8000:8000 -p 8001:8001 -e "TM_API_KEY=YOUR_API_KEY" -d tardisdev/tardis-machine
该命令会以无持久缓存的方式启动 tardis-machine 服务,可能影响性能。如需更好的性能,请考虑为容器挂载持久卷。更多信息请参见 Tardis 的 Docker 文档。
配置
接下来,准备好一个 JSON 格式的配置文件。
配置 JSON 格式
| 字段 | 类型 | 描述 | 默认 |
|---|---|---|---|
tardis_ws_url | string (optional) | Tardis Machine 的 WebSocket URL。 | 若为 null 则使用环境变量 TARDIS_MACHINE_WS_URL。 |
normalize_symbols | bool (optional) | 是否应用 Nautilus 的符号归一化。 | 若为 null 则默认为 true。 |
output_path | string (optional) | 写入 Nautilus Parquet 数据的输出目录路径。 | 若为 null 则使用 NAUTILUS_PATH 环境变量,否则为当前工作目录。 |
options | JSON[] | 一个 ReplayNormalizedRequestOptions 对象数组。 |
示例配置文件 example_config.json 可在此找到:example_config.json:
{
"tardis_ws_url": "ws://localhost:8001",
"output_path": null,
"options": [
{
"exchange": "bitmex",
"symbols": ["xbtusd", "ethusd"],
"data_types": ["trade"],
"from": "2019-10-01",
"to": "2019-10-02"
}
]
}
在 Python 中执行回放
在 Python 中运行回放可参考下列脚本:
import asyncio
from nautilus_trader.core import nautilus_pyo3
async def run():
config_filepath = Path("YOUR_CONFIG_FILEPATH")
await nautilus_pyo3.run_tardis_machine_replay(str(config_filepath.resolve()))
if __name__ == "__main__":
asyncio.run(run())
在 Rust 中执行回放
在 Rust 中运行回放可参考下列示例:
use std::{env, path::PathBuf};
use nautilus_adapters::tardis::replay::run_tardis_machine_replay_from_config;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let config_filepath = PathBuf::from("YOUR_CONFIG_FILEPATH");
run_tardis_machine_replay_from_config(&config_filepath).await;
}
请确保启用 Rust 日志,例如在 shell 中设置:
export RUST_LOG=debug
可运行的示例二进制位于仓库:example_replay.rs。
也可以使用 cargo 运行:
cargo run --bin tardis-replay <path_to_your_config>
加载 Tardis CSV 数据
Tardis 格式的 CSV 数据可以通过 Python 或 Rust 加载。加载器会读取磁盘上的 CSV 文本并将其解析为 Nautilus 数据。由于加载器在 Rust 中实现,因此无论从 Python 还是 Rust 调用,性能表现一致,可按个人工作流偏好选择。
你也可以为 load_* 函数/方法可选地指定 limit 参数,以限制加载的最大行数。
混合多个 instrument 的 CSV 文件由于精度(precision)要求较高,存在较大挑战,不建议使用。请使用单一 instrument 的 CSV 文件(详见下文)。
在 Python 中加载 CSV
可使用 TardisCSVDataLoader 在 Python 中加载 Tardis 格式的 CSV。
加载时可选地指定 instrument ID,但必须显式提供 price precision 与 size precision。指定 instrument ID 能提升加载性能,而精度参数是必需的,因为文本数据本身无法推断出精度。
示例用法:
from nautilus_trader.adapters.tardis import TardisCSVDataLoader
from nautilus_trader.model import InstrumentId
instrument_id = InstrumentId.from_str("BTC-PERPETUAL.DERIBIT")
loader = TardisCSVDataLoader(
price_precision=1,
size_precision=0,
instrument_id=instrument_id,
)
filepath = Path("YOUR_CSV_DATA_PATH")
limit = None
deltas = loader.load_deltas(filepath, limit)
在 Rust 中加载 CSV
在 Rust 中可使用此处的加载函数:csv 模块示例。 加载时同样可以可选地指定 instrument ID,但必须提供 price precision 与 size precision。指定 instrument ID 能提升加载性能,而精度参数不可省略。
完整示例见仓库:example_csv.rs。
示例代码:
use std::path::Path;
use nautilus_adapters::tardis;
use nautilus_model::identifiers::InstrumentId;
#[tokio::main]
async fn main() {
// 你必须指定精度以及 CSV 文件路径
let price_precision = 1;
let size_precision = 0;
let filepath = Path::new("YOUR_CSV_DATA_PATH");
// 可选地指定 instrument ID 与/或 limit
let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
let limit = None;
// 根据工作流决定是否传播解析错误
let _deltas = tardis::csv::load_deltas(
filepath,
price_precision,
size_precision,
Some(instrument_id),
limit,
)
.unwrap();
}
流式处理 Tardis CSV 数据
为在内存受限的环境下高效处理大文件,Tardis 集成提供了流式处理能力:按配置的块(chunk)读取与处理数据,而非一次性将整个文件载入内存。这在处理数 GB 级别的 CSV 文件时尤为有用。
流式功能适用于所有支持的 Tardis 数据类型:
- 订单薄增量(Order book deltas,
stream_deltas)。 - 报价(Quote ticks,
stream_quotes)。 - 成交(Trade ticks,
stream_trades)。 - 订单薄深度快照(Order book depth snapshots,
stream_depth10)。
在 Python 中流式读取 CSV
TardisCSVDataLoader 提供流式方法,按迭代器返回数据块。每个方法接受 chunk_size 参数,用于控制每个块读取的记录数:
from nautilus_trader.adapters.tardis import TardisCSVDataLoader
from nautilus_trader.model import InstrumentId
instrument_id = InstrumentId.from_str("BTC-PERPETUAL.DERIBIT")
loader = TardisCSVDataLoader(
price_precision=1,
size_precision=0,
instrument_id=instrument_id,
)
filepath = Path("large_trades_file.csv")
chunk_size = 100_000 # 每块处理 100,000 条记录(默认值)
# 分块流式处理成交数据
for chunk in loader.stream_trades(filepath, chunk_size):
print(f"Processing chunk with {len(chunk)} trades")
# 仅该块数据在内存中
for trade in chunk:
# 在此处实现你的处理逻辑
pass
流式处理订单薄数据
订单薄数据的流式支持包括增量(deltas)和深度快照(depth snapshots):
# 流式处理订单薄增量
for chunk in loader.stream_deltas(filepath):
print(f"Processing {len(chunk)} deltas")
# 处理 delta 块
# 流式处理 depth10 快照(levels 可选:5 或 25)
for chunk in loader.stream_depth10(filepath, levels=5):
print(f"Processing {len(chunk)} depth snapshots")
# 处理 depth 块
流式处理报价数据
报价数据的流式处理方式类似:
# 流式处理报价 ticks
for chunk in loader.stream_quotes(filepath):
print(f"Processing {len(chunk)} quotes")
# 处理报价块
内存效率优势
流式处理方式带来显著的内存效率优势:
- 可控的内存占用:一次仅加载一个数据块。
- 可扩展的处理能力:可处理大于可用内存的文件。
- 支持配置块大小:根据系统内存与性能需求调整
chunk_size(默认 100,000)。
当在流式模式下使用精度推断(未显式提供精度)时,推断出的精度可能与对整个文件进行批量加载时得到的结果不同。因为精度推断基于块边界进行,不同块中可能包含不同的精度要求。为获得确定性行为,请在调用流式方法时显式提供 price_precision 与 size_precision。
在 Rust 中流式读取 CSV
底层的流式功能由 Rust 实现,也可以直接在 Rust 中使用:
use std::path::Path;
use nautilus_adapters::tardis::csv::{stream_trades, stream_deltas};
use nautilus_model::identifiers::InstrumentId;
#[tokio::main]
async fn main() {
let filepath = Path::new("large_trades_file.csv");
let chunk_size = 100_000;
let price_precision = Some(1);
let size_precision = Some(0);
let instrument_id = Some(InstrumentId::from("BTC-PERPETUAL.DERIBIT"));
// 分块流式处理成交数据
let stream = stream_trades(
filepath,
chunk_size,
price_precision,
size_precision,
instrument_id,
).unwrap();
for chunk_result in stream {
match chunk_result {
Ok(chunk) => {
println!("Processing chunk with {} trades", chunk.len());
// 处理该块
}
Err(e) => {
eprintln!("Error processing chunk: {}", e);
break;
}
}
}
}
请求 instrument 定义
你可以使用 TardisHttpClient 在 Python 与 Rust 中请求 instrument 定义。该客户端会调用 Tardis instruments metadata API 并将 instrument 元数据解析为 Nautilus 的 instrument 定义。
TardisHttpClient 的构造函数接受可选参数:api_key、base_url 与 timeout_secs(默认 60 秒)。
该客户端提供获取单个 instrument 或某个交易所上所有 instruments 的方法。请在引用 Tardis 支持的交易所 时使用 Tardis 的 lower-kebab 命名规范。
访问 instruments metadata API 需要 Tardis 的 API key。
在 Python 中请求 Instruments
在 Python 中获取 instrument 定义的示例:
import asyncio
from nautilus_trader.core import nautilus_pyo3
async def run():
http_client = nautilus_pyo3.TardisHttpClient()
instrument = await http_client.instrument("bitmex", "xbtusd")
print(f"Received: {instrument}")
instruments = await http_client.instruments("bitmex")
print(f"Received: {len(instruments)} instruments")
if __name__ == "__main__":
asyncio.run(run())
在 Rust 中请求 Instruments
在 Rust 中获取 instrument 定义的示例:完整示例见仓库中的 example_http.rs。
use nautilus_adapters::tardis::{enums::Exchange, http::client::TardisHttpClient};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let client = TardisHttpClient::new(None, None, None).unwrap();
// Nautilus instrument definitions
let resp = client.instruments(Exchange::Bitmex).await;
println!("Received: {resp:?}");
let resp = client.instrument(Exchange::Bitmex, "ETHUSDT").await;
println!("Received: {resp:?}");
}
Instrument provider
TardisInstrumentProvider 通过 HTTP instrument 元数据 API 请求并解析来自 Tardis 的 instrument 定义。由于存在多个 Tardis 支持的交易所,在加载所有 instruments 时,需要使用 InstrumentProviderConfig 过滤出目标 venues:
from nautilus_trader.config import InstrumentProviderConfig
# 查看支持的 venues: https://nautilustrader.io/docs/nightly/integrations/tardis#venues
venues = {"BINANCE", "BYBIT"}
filters = {"venues": frozenset(venues)}
instrument_provider_config = InstrumentProviderConfig(load_all=True, filters=filters)
你也可以像平时一样加载特定的 instrument 定义:
from nautilus_trader.config import InstrumentProviderConfig
instrument_ids = [
InstrumentId.from_str("BTCUSDT-PERP.BINANCE"), # 将使用 'binance-futures' 交易所
InstrumentId.from_str("BTCUSDT.BINANCE"), # 将使用 'binance' 交易所
]
instrument_provider_config = InstrumentProviderConfig(load_ids=instrument_ids)
订阅之前,所需的 instruments 必须已缓存可用。为简化流程,建议为计划订阅的 venues 预先加载所有 instruments。
实时数据客户端
TardisDataClient 允许将 Tardis Machine 集成到正在运行的 NautilusTrader 系统中。它支持订阅下列数据类型:
OrderBookDelta(来自 Tardis 的 L2 细粒度,包含所有变更或完整深度快照)OrderBookDepth10(来自 Tardis 的 L2 细粒度,提供最多 10 层快照)QuoteTickTradeTickBar(基于 Tardis 支持的 bar 聚合)
数据 WebSocket
主 TardisMachineClient 的数据 WebSocket 在初始连接阶段负责管理所有接收到的流订阅,直到 ws_connection_delay_secs 指定的延迟时间结束。对于在此之后新增的订阅,会创建新的 TardisMachineClient 实例。该策略在启动时能让主 WebSocket 在单个流中处理数百个订阅,从而提高性能。
当设定了 ws_connection_delay_secs 的初始订阅延迟时,对这些流的取消订阅不会真正从 Tardis Machine 的流中移除订阅(因为 Tardis 不支持选择性取消订阅)。不过组件仍会如期从消息总线(message bus)的发布中取消订阅。
在任何初始延迟之后创建的订阅,将按常规行为正常取消并从 Tardis Machine 流中移除。
如果你预计会频繁地订阅与取消订阅,建议将 ws_connection_delay_secs 设为 0。这样每次初始订阅都会创建一个新的客户端,便于在取消订阅时单独关闭。
限制与注意事项
已知的限制与注意事项包括:
- 不支持单次的历史数据请求(historical data requests),因为每次请求至少需要从 Tardis Machine 做一天的回放,且可能需要附加筛选,这在实践中既不高效也不实用。
如需更多功能或希望为 Tardis 适配器贡献代码,请参见我们的贡献指南。