# DolphinDB 白皮书 流数据 ![bo_d5nnoi3ef24c73bmnrtg_0_19_1683_1616_640_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_0_19_1683_1616_640_0.jpg) ## 内容 第 1 章. 概述. . iii 1.1 流处理示例. .4 1.2 流处理架构 5 第 2 章. 发布与订阅 .8 2.1 流数据表. .8 2.2 发布与订阅 9 2.3 功能特性. 10 第 3 章. 流计算引擎. 12 3.1 引擎分类. 12 3.2 引擎详解 .13 3.3 功能特性 .27 3.4 流式算子. .28 3.5 流水线处理 28 第 4 章. 流批一体. .31 4.1 历史数据回放 .31 4.2 流批一体实现方案 .34 第 5 章. 流数据运维. .37 5.1 权限管理. 37 5.2 流数据监控. 39 5.3 流数据高可用 41 第 6 章. 流计算 API 及插件 .44 6.1 流计算 API. 44 6.2 流计算插件 45 第 7 章. 场景应用 .46 7.1 金融. 46 7.2 物联网 51 第 8 章. 结语. 55 ## DolphinDB 流数据白皮书 流数据是一种持续实时生成且动态变化的时间序列数据,涵盖了金融交易、物联网(IoT)传感器采集、物流运营、零售订单等各类持续生成动态数据的场景。DolphinDB 将时间序列数据库与流数据处理框架无缝集成,不仅可以处理海量的历史数据,还广泛支持了实时数据的流式计算,包括流数据发布与订阅、数据预处理、实时内存计算、复杂指标的窗口计算、异常检测、多数据源实时关联等。借助 DolphinDB,企业可以高效地过滤、关联、分析和可视化大规模实时流数据,从中预测趋势、监测异常,辅助实时决策。 本白皮书旨在介绍 DolphinDB 的流数据处理框架、功能特性及应用场景,主要包括以下几个方面: - 流数据的概念和特点 - 流数据处理框架及实现 - 流数据操作与功能特性 - 流数据应用场景与案例 ## 第 1 章. 概述 ### 1.1 流处理示例 首先通过简单的 DolphinDB 脚本介绍流数据的订阅与处理过程。 --- // 创建并共享流数据表 pubTable 与 outputTable share(streamTable(1:0, `time`id`price`qty, [TIME, SYMBOL, DOUBLE, LONG]), "pubTable") share(streamTable(1:0, `time`id`price`qty`amount, [TIME, SYMBOL, DOUBLE, LONG, DOUBLE]), "outputTable") go // 定义流处理方法 def streamProcessFunc(msg)\{ t = select time, id, price, qty, price*qty as amount from msg outputTable.append!(t) \} // 提交流订阅 subscribeTable(tableName="pubTable", actionName="demo1", offset=-1, handler=streamProcessFunc, msgAsTable=true) // 注入数据到发布表 insert into pubTable values(09:51:49.581, "GOOG", 2.6, 100) --- 上例演示了 DolphinDB 基本的流订阅流程: - 首先通过 streamTable 函数创建流数据表 pubTable 作为流数据发布表,outputTable 作为订阅数据的接收表。执行 share 语句实现流数据表会话间共享。 ·定义函数 streamProcessFunc 用于计算每条消息价格和数量的乘积得出交易总量,并输出到流数据表 outputTable。 -执行 subscribeTable 函数提交订阅,并指定 streamProcessFunc 为数据处理方法。系统会分配后台线程处理订阅到的数据。 - 向发布表 pubTable 插入数据。数据被推送至消费队列,订阅端后台线程处理数据并将结果推送至输出表 outputTable。 图 1-1 流数据订阅示例 ![bo_d5nnoi3ef24c73bmnrtg_4_423_167_800_532_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_4_423_167_800_532_0.jpg) ### 1.2 流处理架构 #### 1.2.1 流数据处理 流数据是基于事件持续生成的时间序列数据。与静态有界的历史数据不同,流数据具有以下特点: - 动态:数据流持续动态生成,流的结束没有明确定义,数据的大小与结构也没有固定限制。 - 有序:每条流数据记录都具有时间戳或者序列号,标识了数据在流中的位置与顺序。 ·大规模:流数据通常以高速率生成,数据规模大,对处理引擎的并行处理性能和可扩展性有更高要求。 ・强时效:流数据的强时效性要求极低延迟的读取和处理能力,以最大化数据价值,驱动实时业务决策。 流数据处理是指在实时数据流上进行实时计算和分析的过程。与批处理不同,流处理无需等待所有数据全部到位,即可按照时间顺序对数据进行增量处理。这种实时处理方式能够高效利用存储与计算资源,适用于需要快速响应和及时决策的应用场景。
批处理流处理
数据范围对数据集中的所有或大部分数据进行查询或处理对时间窗口内的数据或对最近的数据记录进行查询或处理
数据大小大批量数据单条记录或包含几条记录的小批量数据
性能几分钟至几小时的延迟亚毫秒级延迟
#### 1.2.2 流处理架构 为实现不同应用场景的业务逻辑,DolphinDB 采用了模块化与可扩展的流处理框架,将核心的流计算组件 (如流数据表、流计算引擎等)设计为可重用的模块。模块可以像积木一样通过发布-订阅的方式灵活组合, 形成功能强大的流水线。此外,DolphinDB 还提供了 1500 多个内置函数,可用作流计算引擎的算子,极大简化了流计算应用的开发过程。同时,用户也可以根据业务需求使用自定义函数实现复杂指标计算,即使是复杂 1 - 概述 的金融算法也可以通过 DolphinDB 的脚本语言灵活实现。此外,JIT 编译等技术的应用也进一步提高了流计算的性能。 ## 图 1-2 流数据处理框架 ![bo_d5nnoi3ef24c73bmnrtg_5_132_319_1374_624_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_5_132_319_1374_624_0.jpg) 上图展示了 DolphinDB 流数据处理框架。实时数据注入到流数据发布表后可同时供多方订阅消费: ·流数据计算引擎:流数据计算引擎可以订阅流数据进行实时分析。计算结果可以通过可视化平台如 Grafana 进行实时展示,也可以再次发布,供其他数据节点或应用进行二次订阅和事件处理。 ・API 客户端:通过 DolphinDB API,第三方客户端如 Python 应用程序可以方便地订阅流数据,进行业务操作。这种灵活的 API 订阅方式使得 DolphinDB 的流数据可以被集成到各种应用场景中,满足多样化的实时数据需求。 ·消息中间件:将流数据实时灌注到消息中间件(如 Apache Kafka)的消息队列中,供下游模块订阅程序消费。 - 数据仓库:数据仓库可以订阅并保存流数据,作为分析系统与报表系统的数据源。可以将实时数据用于后续的离线分析和报表生成,实现对历史数据和实时数据的统一分析。 #### 1.2.3 发布/订阅 (Pub/Sub) 模型 DolphinDB 采用了经典的发布/订阅 (Pub/Sub) 通信模型,通过消息队列实现流数据的发布与订阅。该模型将流数据发布端与订阅端解耦,各模块支持独立开发管理,可以增强系统的可拓展性并提升发送者的响应效率。 图 1-3 发布/订阅模型 ![bo_d5nnoi3ef24c73bmnrtg_6_118_195_1419_555_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_6_118_195_1419_555_0.jpg) 发布数据:发布端在每个节点上维护一个发布队列。当新的流数据注入到该节点的流数据发布表 时,DolphinDB 会将这些数据推送到相应的消息发布队列,再由发布线程将数据发布到各个订阅端的消费队列。 订阅数据:每个订阅线程对应一个消费队列。订阅成功提交后,每当有新数据写入流数据发布表时,DolphinDB 会主动通知所有订阅方,消费线程从消费队列中获取数据进行增量处理。 ## 第 2 章. 发布与订阅 ### 2.1 流数据表 #### 2.1.1 创建与删除流数据表 DolphinDB 以流数据表作为实时数据流的载体进行流数据的存储、发布与订阅。流数据表是一种特殊的内存表,包括常规流数据表、键值流数据表与高可用流数据表。流数据表可以视为简化的消息中间件,或是消息中间件中的主题(topic),用户可以向其发布数据,也可以从中订阅数据。 用户可以执行 streamTable 函数创建流数据表。与普通内存表不同,流数据表中的记录支持同时读写, 可以增加记录,但不可修改或删除。使用 share 语句将流数据表共享到所有会话后,表中数据即可被订阅。DolphinDB 支持多个会话中的多个订阅端订阅同一个流数据表。实时数据写入该表后,会向所有订阅该表的订阅端推送数据。 --- colName=["Name","Age"] colType=["string","int"] t = streamTable(1:0, colName, colType) share t as st1 --- 通过 streamTable 函数创建的流数据表可以包含重复记录。在订阅多个数据源时,如要避免数据的重复写入,可以使用 keyedStreamTable 函数创建包含主键的键值流数据表。表中主键不包含重复值,如果新写入的数据与已有记录的主键重复,不会更新已有记录。用户也可以使用 haStreamTable 函数创建高可用流数据表,高可用流数据表在日志中持久化了表结构信息,节点重启后不需要重新建表。同时,高可用流数据表也可以通过设置主键过滤重复记录。关于流数据高可用的详细解决方案,请参阅5.3 流数据高可用小节。 用户可以在取消所有订阅后使用 dropStreamTable 删除流数据表。例如,删除上述语句创建的共享流数据表 st1: --- dropStreamTable(`st1) --- #### 2.1.2 持久化流数据表 流数据表本质是特殊的内存表,系统在默认情况下会把流数据表的所有数据都保存在内存中。由于流数据持续增长的特性,内存可能会被逐渐耗尽,DolphinDB 因此支持了流数据表持久化功能,用户可将内存中的流数据以异步或同步的方式保存到磁盘中。 要持久化流数据表,首先需要在发布节点设置持久化路径参数 persistenceDir,创建流数据表后执行 enableTableShareAndPersistence 命令共享并持久化流数据表。用户也可以执行 share 语句共享流数据表后再通过 enableTablePersistence 命令持久化流数据表。持久化函数的参数设定可能会影响流数据系统的性能,其中: ·asynWrite 指定是否以异步模式持久化数据到磁盘。采用异步持久化的方式可以提高系统的吞吐量,但是节点重启可能会导致最后几条数据丢失。 - compress 指定是否将持久化的数据压缩后保存到磁盘。数据压缩可以减少磁盘写入量和空间占用。 - cacheSize 限制了流数据表在内存中保留的最大记录数。设置合理的内存占用量可以有效保证数据安全。 - flushMode 表示是否开启同步刷盘。内存中的流数据首先写入操作系统缓存,若开启同步刷盘,一批数据落盘后才会开始下一批数据的写入。 开启流数据表持久化有以下优点: - 当前最新的一批记录保留在内存中,较早的记录保存在磁盘中,能够有效避免内存不足问题。 - 如发生节点异常重启,用户调用持久化命令会将持久化的流数据重新载入表中。 ·流订阅可以从磁盘上保存记录的偏移量重新开始。 下表对比了常规流数据表、共享流数据表与持久化流数据表。
流数据表共享流数据表持久化流数据表
生命周期当前会话当前节点重启后仍可加载
其他会话可见
可被订阅
存储方式仅内存仅内存内存和磁盘
重启后可恢复
### 2.2 发布与订阅 #### 2.2.1 发布 DolphinDB 流计算框架采用了发布-订阅的模式。流数据发布首先要定义共享流数据表,实时写入该表的数据将会发布到所有订阅端。流表可部署于 DolphinDB 的数据节点或计算节点上,其所在的节点称为发布节点, 订阅方所在的节点称为订阅节点。例如,通过以下脚本定义并共享流数据表 pubTable: share streamTable(1:0, `timestamp` temperature, [TIMESTAMP, DOUBLE]) as pubTable DolphinDB 支持多种方式写入共享流数据表。在研究阶段,用户可以通过 append!或 insert into 语句直接向流数据表写入数据,也可以使用 DolphinDB 内置的 replay 函数将库内历史数据以指定速率回放,模拟流的形式注入流数据表。在实际生产中,可以通过插件将外部数据源产生的记录实时接入流数据表,或通过 DolphinDB API 的写入接口将外部数据写入流数据表。具体使用示例参见 用户手册-实时流数据接入。 #### 2.2.2 订阅 流数据的订阅方和发布方可以处于同一个节点、同一集群中的不同节点、或不同集群中的节点。外部的客户端应用也可以向 DolphinDB 订阅数据。SubExecutor 是 DolphinDB 集群中的消息处理线程,DolphinDB 的一个数据节点或计算节点对应一个进程,每个节点上会有一到多个消息处理线程。若订阅方位于 DolphinDB 节点上,则对应的消费会被分配到后台线程上运行。若订阅方处于外部客户端,则消费运行在外部程序中。 2 - 发布与订阅 ## 图 2-1 DolphinDB 订阅模式 ![bo_d5nnoi3ef24c73bmnrtg_9_112_188_1427_636_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_9_112_188_1427_636_0.jpg) 在 DolphinDB 中,用户可以通过 subscribeTable 函数订阅流数据表。只需指定表或引擎名称与数据处理方法即可创建订阅关系,函数也提供了丰富的可选参数用于灵活配置订阅。流处理示例小节演示了基本的订阅流程,通过 subscribeTable 提交订阅后,系统会分配一个后台消费线程接收并处理发布表新增的数据,并把处理后的数据推送至输出表。使用 subscribeTable 函数提交订阅脚本编写简单,在内存中直接处理数据, 降低了数据传输和转换的开销。 DolphinDB API 提供了类似 subscribeTable 的流订阅接口,用户可以在第三方客户端(例如 Python 应用程序)直接订阅 DolphinDB 的流数据表。这种订阅模式可以方便地与其他语言集成,将 DolphinDB 中的流数据应用到各种业务场景中,满足多样化的实时数据需求。 用户也可以通过插件订阅外部的消息中间件(如 Apache Kafka,ZeroMQ 等)的数据。例如可以使用 DolphinDB kafka 插件的 subscribe 接口订阅 Kafka 主题。通过插件对接外部消息中间件实现了数据生产与消费者间的解耦的订阅关系,灵活适应各种数据类型与协议。 ### 2.3 功能特性 DolphinDB 的流数据订阅实现了以下功能: ·发布数据过滤:支持指定发布表的过滤列与过滤规则,只将符合特定条件的数据发送给订阅者。发布数据的过滤能够有效减少传输的数据量,提高传输的效率。 ·自定义起始偏移量:用户在发起订阅时可以通过参数指定任意起始偏移位置,订阅偏移量与流数据表的行数一一对应,订阅可以从指定偏移量的对应位置开始消费。 ·本地及远程订阅:DolphinDB 的发布/订阅模型支持本地和远程订阅,订阅者可以在同一台服务器上或不同服务器之间订阅数据,更好地满足分布式数据处理的需求。远程订阅采用了 TCP/IP 通信协议,保证数据传输的可靠性和实时性。 ·多方订阅:发布订阅模型允许多个订阅者同时订阅同一个数据流,实现多方同时消费数据的功能。这为数据的共享和多样化的数据处理需求提供了便利。 ·断线重连:在流数据传输过程中,如果订阅者因网络故障或其他原因与服务端断开连接,订阅可以自动重连确保数据传输的稳定性。开启断线重连后,订阅端会记录流数据发布的偏移量,连接恢复时订阅端会从偏移量开始重新订阅。 ## 第 3 章. 流计算引擎 使用 DolphinDB 处理历史数据时,可以结合 SQL 语句与内置函数进行全量或增量的查询和计算。但实时数据场景要求高效即时处理,全量查询和计算无法满足这种需求。因此,DolphinDB 研发了适合流式处理的计算引擎,在系统内部采用增量计算,优化了实时计算的性能。流计算引擎是 DolphinDB 实时数据处理解决方案的核心组件,可以视作封装的独立计算黑盒,通过向其写入数据触发计算,并将计算结果输出到目标表。DolphinDB 系统内置了十余种流数据计算引擎,本节会详细介绍引擎分类,主要引擎的计算规则与应用场景,并辅以典型示例进行说明。 ### 3.1 引擎分类 DolphinDB 流计算引擎提供了灵活的计算方式和丰富的计算功能,适用于多样化的实时数据处理需求。根据参与计算的表数量和类型,流计算引擎可以分为单表计算和多表连接两种类型: 图 3-1 流计算引擎分类 ![bo_d5nnoi3ef24c73bmnrtg_11_121_868_1427_610_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_11_121_868_1427_610_0.jpg) 单表计算引擎应用于单个流数据表: - 分组内时序计算:对数据进行分组,在组内进行逐条计算、窗口聚合或异常检测。 - 跨分组截面计算:对数据进行分组,选取每组的最新数据进行截面计算。 多表连接类似于 SQL 表连接 (JOIN) 操作,用于实时关联两张表。连接引擎的左表都是流数据表,根据右表的类型不同,多表连接可以分为双流关联和维表关联: ·双流关联(右表是流数据表):根据数据的时序关系进行等值或模糊匹配。 - 维表关联(右表可以是流数据表或静态维度表):流数据表实时关联右表快照。 ### 3.2 引擎详解 #### 3.2.1 单表计算引擎 ## (1)响应式状态引擎(createReactiveStateEngine) ## 计算规则: 响应式状态引擎能够即时响应注入数据,每当有新的数据到达引擎即触发一次计算,并输出一条计算结果。DolphinDB 针对生产业务中常用的状态算子(滑动窗口函数、累积函数、序列相关函数和 topN 相关函数等)进行了优化,采用增量算法大幅提升了这些算子在引擎中的计算效率。在 DolphinDB 提供的内置函数中,响应式状态引擎目前仅支持系统优化过的状态函数;如需实现更加复杂的计算逻辑,用户也可以通过 @state 声明自定义函数封装复杂的状态算子。 ## 应用场景: 响应式状态引擎能够快速响应新数据,适用于对单条数据即时响应并实时更新输出计算结果的高频计算场景, 例如: ・金融:实时计算高频因子,如基于快照数据计算股票的短时涨幅等。 •物联网:检测传感器状态的变化,如实时监控温度、湿度、压力等数据指标的波动。 ## 使用案例:实时计算 5 分钟涨速 本例通过回放一天的快照行情数据模拟实时数据流,响应式状态引擎实时响应每条输入数据,并基于当前数据计算并输出过去 5 分钟涨速。 --- // 定义流数据发布表与结果输出表 schemaTable = loadTable("dfs://SH_TSDB_snapshot_ArrayVector", "snapshot").schema().colDefs share(streamTable(1:0, schemaTable.name, schemaTable.typeString), `snapshotStreamTable) share(streamTable(1:0, ["SecurityID", "DateTime", "factor_5min"], [SYMBOL, TIMESTAMP, DOUBLE]), `changeResultTable) go // 定义自定义状态函数 @state def calculateChange(DateTime, LastPx, lag)\{ PREVLASTPX = tmove(DateTime, LastPx, lag) return (LastPx - PREVLASTPX) \\ PREVLASTPX \} // 定义响应式状态引擎 metrics=<[DateTime, calculateChange(DateTime, LastPx, lag=5m)]> rse = createReactiveStateEngine(name="calChange", metrics=metrics, dummyTable=snapshotStreamTable, outputTable=changeResultTable, keyColumn=`SecurityID) ## // 订阅上游输入表 subscribeTable(tableName="snapshotStreamTable", actionName="snapshotFilter", offset=-1, handler=rse, msgAsTable=true, hash=0) // 后台提交回放任务 data = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time) replay(inputTables=tradeDS, outputTables=tradeStream, dateColumn=`Date, timeColumn=`Time, replayRate=10000, absoluteRate=true) --- ## N 对 N 多表回放 replay 也支持 N 张表的同时回放,即 N 对 N 回放,将多张输入表以元组的形式传入 replay,并为每一个输入表分别指定输出表,输出表和输入表一一对应且具有相同的表结构。 下例基于原始数据生成数据源 orderDS, tradeDS, snapshotDS,并分别回放至输出表 orderStream, tradeStream, snapshotStream: --- orderDS = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time) snapshotDS = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time) tradeDS = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time) inputDict = dict(["order", "trade", "snapshot"], [orderDS, tradeDS, snapshotDS]) replay(inputTables=inputDict, outputTables=messageStream, dateColumn=`Date, timeColumn=`Time, replayRate=10000, absoluteRate=true) --- 若要对异构流数据表进行数据处理(如指标计算等)操作,需要将二进制格式的消息内容反序列化为原始结构的数据。DolphinDB 支持流数据分发引擎 streamFilter 反序列化异构流数据表并过滤分发数据;同时,各 API 在流数据订阅功能的基础上,也支持了订阅时指定 StreamDeserializer 对异构流数据表进行反序列化操作。 #### 4.1.2 回放速率 根据参数 replayRate 的不同设定,历史数据可以按照以下速率回放: - 指定每秒回放记录数:回放的速率基于记录数计算,系统按照每秒 replayRate 条记录进行回放。 - 指定时间跨度回放加速倍数:根据输入表数据的时间跨度加速 replayRate 倍回放,此时每秒回放的记录数是相同的。 - 精确速度回放:根据输入表数据到来的时间截以尽可能精确的速度加速 replayRate 倍回放。 - 全速回放:系统以最快的速率进行回放。 #### 4.1.3 异构回放应用案例 在实际数据分析应用中,通常需要多种不同类型的消息协作。以量化策略研发为例,在生产环境中实时数据的处理通常是由事件驱动的。而为了更好地在研发环境模拟实际交易中的实时数据流,可能需要将逐笔委托、逐笔成交、快照等行情数据同时回放进行关联分析。异构多表回放是准确模拟实盘环境的关键手段,它保证了不同类型数据的绝对时序,从而使策略开发测试中的行为完全复现实盘交易中的处理逻辑。 下例结合股票行情回放展示异构多表回放功能在实际场景中的应用。脚本将逐笔成交数据与快照数据回放至一个异构流数据表,并通过 streamFilter 反序列化、筛选并分发数据,使用 Asof Join 引擎实时关联并计算个股交易成本。 --- // 创建异构流数据表 messageStream colName = `timestamp`source`msg colType = [TIMESTAMP, SYMBOL, BLOB] messageTemp = streamTable(1:0, colName, colType) enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) messageTemp = NULL // 创建计算结果输出表 prevailingQuotes colName = `TradeTime` SecurityID` Price `TradeQty` BidPX1` OfferPX1` TradeCost` Snapshot Time colType = [TIME, SYMBOL, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, TIME] prevailingQuotesTemp = streamTable(1:0, colName, colType) enableTableShareAndPersistence(table=prevailingQuotesTemp, tableName="prevailingQuotes", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) prevailingQuotesTemp = NULL ## // 创建连接引擎 def createSchemaTable(dbName, tableName)\{ schema = loadTable(dbName, tableName).schema().colDefs return table(1:0, schema.name, schema.typeString) \} tradeSchema = createSchemaTable("dfs://trade", "trade") snapshotSchema = createSchemaTable("dfs://snapshot", "snapshot") joinEngine=createAsofJoinEngine(name="tradeJoinSnapshot", leftTable=tradeSchema, rightTable=snapshotSchema, outputTable=prevailingQuotes, metrics=<[Price, TradeQty, BidPX1, OfferPX1, abs(Price-(BidPX1+OfferPX1)/2), snapshotSchema.Time]>, matchingColumn=`SecurityID, timeColumn=`Time, useSystemTime=false, delayedTime=1) // 创建流计算过滤与分发引擎 def filterAndParseStreamFunc(tradeSchema, snapshotSchema)\{ filter1 = dict(STRING, ANY) filter1["condition"] = "trade" filter1["handler"] = getLeftStream(getStreamEngine(`tradeJoinSnapshot)) --- 4 - 流批一体 --- filter2 = dict(STRING, ANY) filter2["condition"] = "snapshot" filter2["handler"] = getRightStream(getStreamEngine(`tradeJoinSnapshot)) schema = dict(["trade", "snapshot"], [tradeSchema, snapshotSchema]) engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2], msgSchema=schema) subscribeTable(tableName="messageStream", actionName="tradeJoinSnapshot", offset=-1, handler=engine, msgAsTable=true, reconnect=true) \} filterAndParseStreamFunc(tradeSchema, snapshotSchema) // 回放历史数据 def replayStockMarketData()\{ timeRS = cutPoints(09:15:00.000..15:00:00.000, 100) tradeDS = replayDS(sqlobj=, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) inputDict = dict(["trade", "snapshot"], [tradeDS, snapshotDS]) submitJob("replay", "replay for factor calculation", replay, inputDict, messageStream, `Date, `Time, 100000, true, 2) \} replayStockMarketData() --- 上述脚本读取数据库中的结构不同的数据表进行全速的异构回放,回放通过 submit Job 函数提交后台作业来执行。对于回放产生的流数据表 messageStream,首先通过函数 streamFilter 反序列化,并根据过滤条件处理订阅数据。数据经过筛选处理后,符合条件的 trade 数据被注入到 Asof Join 引擎的左表,snapshot 数据注入到引擎右表,实现数据关联。完整应用脚本与示例数据可参考教程股票行情回放。 ### 4.2 流批一体实现方案 流批一体是指将研发环境中基于历史数据建模分析研发的因子表达式直接应用于生产环境的实时数据中,并保证流计算的结果和批量计算完全一致,二者使用同一套代码,称为 “流批一体”。DolphinDB 将历史数据和实时数据分析整合,在研发环境中开发的核心因子表达式可以直接应用于生产环境的实时数据中。实时行情订阅、行情数据收录、交易实时计算、盘后研究建模,全都用同一套代码完成,保证在历史回放和生产交易当中数据完全一致。相比传统的 Python 与 C++ 两套代码的方案,开发上线周期可缩短 90% 以上。在生产环境中,DolphinDB 提供了实时流计算框架。在流计算框架下,用户在投研阶段封装好的基于批量数据开发的因子函数,可以无缝投入交易和投资方面的生产程序中。同时,流计算框架在算法路径上进行了精细的优化,兼顾了高效开发和计算性能的优势。用户无需维护两套代码,节约了开发成本,还规避了两套体系可能带来的批计算与流计算结果不一致的问题。 图 4-2 流批一体实现方案 ![bo_d5nnoi3ef24c73bmnrtg_34_105_177_1430_427_0.jpg](images/bo_d5nnoi3ef24c73bmnrtg_34_105_177_1430_427_0.jpg) 流批一体在 DolphinDB 中有两种实现方法: ## (1)使用一套核心函数定义或表达式,代入不同的计算引擎实现历史数据或流数据的计算。 流计算引擎可以直接重用批处理(研发阶段)中基于历史数据编写的表达式或函数,避免在生产环境重写代码,降低了维护研发和生产两套代码的负担。DolphinDB 脚本语言的表达式实际上是对因子语义的描述,因子计算的具体实现则交由相应的计算引擎完成。DolphinDB 确保流式计算的结果与批量计算完全一致,因此代码只要在历史数据的批量计算中验证正确,即可保证流数据的实时计算正确,极大降低了实时计算的调试成本。实际应用中,投研批处理阶段定义的因子表达式无需修改,在生产阶段只需创建流式计算引擎指定该指标即可实现增量流计算。 例如,计算每天主买成交量占全部成交量的比例,可以自定义函数 buyTradeRatio: --- @state def buyTradeRatio(buyNo, sellNo, tradeQty)\{ return cumsum(iif(buyNo>sellNo, tradeQty, 0))\\cumsum(tradeQty) \} --- 在批处理模式下,可以使用 SQL 查询,发挥库内并行计算的优势,使用 csort 语句对组内数据按照时间顺序排序: --- factor = select TradeTime, SecurityID, `buyTradeRatio as factorname, buyTradeRatio(BuyNo, SellNo, TradeQty) as val from loadTable("dfs://tick_SH_L2_TSDB","tick_SH_L2_TSDB") where date(TradeTime)<2020.01.31 and time(TradeTime)>=09:30:00.000 context by SecurityID, date(TradeTime) csort TradeTime --- 在流处理模式下,可以通过响应式状态引擎指定该因子实现增量计算。在批计算中定义的因子函数 buyTradeRatio,只需增加 @state 标识声明其为状态函数即可在流式计算中复用。通过以下代码创建响应式状态引擎 demo,以 SecurityID 作为分组键,输入的消息格式同内存表 tickStream。 --- tickStream = table(1:0, `SecurityID` TradeTime` TradePrice `TradeQty` TradeAmount` BuyNo` SellNo, [SYMBOL, DATETIME, DOUBLE, INT, DOUBLE, LONG, LONG]) result = table(1:0, `SecurityID`TradeTime`Factor, [SYMBOL, DATETIME, DOUBLE]) factors = <[TradeTime, buyTradeRatio(BuyNo, SellNo, TradeQty)]> --- 4 - 流批一体 demoEngine = createReactiveStateEngine(name="demo", metrics=factors, dummyTable=tickStream, outputTable=result, keyColumn="SecurityID") ## (2)回放历史数据模拟实时数据流入,使用流数据计算引擎完成计算。 为确保研发和生产环境使用同一套代码,可以研发阶段需将历史数据严格按照事件发生的时间顺序进行回放, 以此模拟交易环境。使用这种方法计算历史数据的因子值,效率会略逊与基于 SQL 的批量计算。 下例通过用户自定义函数 sum_diff 与内置函数 ema (exponential moving average) 计算高频因子 factor1: --- def sum_diff(x, y)\{ return (x-y)/(x+y) \} factor1 = // 定义响应式状态引擎实现因子流式计算 share streamTable(1:0, `sym`date`time`price, [STRING, DATE, TIME, DOUBLE]) as tickStream result = table(1:0, `sym` factor1, [STRING, DOUBLE]) rse = createReactiveStateEngine(name="reactiveDemo", metrics = factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym") subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert\{rse\}) --- 回放历史数据模拟实时数据注入引擎触发计算: --- // 从 trades 表中加载一天的数据,回放到流数据表 tickStream 中 inputDS = replayDS(