# DolphinDB 白皮书 流数据

## 内容
第 1 章. 概述. . iii
1.1 流处理示例. .4
1.2 流处理架构 5
第 2 章. 发布与订阅 .8
2.1 流数据表. .8
2.2 发布与订阅 9
2.3 功能特性. 10
第 3 章. 流计算引擎. 12
3.1 引擎分类. 12
3.2 引擎详解 .13
3.3 功能特性 .27
3.4 流式算子. .28
3.5 流水线处理 28
第 4 章. 流批一体. .31
4.1 历史数据回放 .31
4.2 流批一体实现方案 .34
第 5 章. 流数据运维. .37
5.1 权限管理. 37
5.2 流数据监控. 39
5.3 流数据高可用 41
第 6 章. 流计算 API 及插件 .44
6.1 流计算 API. 44
6.2 流计算插件 45
第 7 章. 场景应用 .46
7.1 金融. 46
7.2 物联网 51
第 8 章. 结语. 55
## DolphinDB 流数据白皮书
流数据是一种持续实时生成且动态变化的时间序列数据,涵盖了金融交易、物联网(IoT)传感器采集、物流运营、零售订单等各类持续生成动态数据的场景。DolphinDB 将时间序列数据库与流数据处理框架无缝集成,不仅可以处理海量的历史数据,还广泛支持了实时数据的流式计算,包括流数据发布与订阅、数据预处理、实时内存计算、复杂指标的窗口计算、异常检测、多数据源实时关联等。借助 DolphinDB,企业可以高效地过滤、关联、分析和可视化大规模实时流数据,从中预测趋势、监测异常,辅助实时决策。
本白皮书旨在介绍 DolphinDB 的流数据处理框架、功能特性及应用场景,主要包括以下几个方面:
- 流数据的概念和特点
- 流数据处理框架及实现
- 流数据操作与功能特性
- 流数据应用场景与案例
## 第 1 章. 概述
### 1.1 流处理示例
首先通过简单的 DolphinDB 脚本介绍流数据的订阅与处理过程。
---
// 创建并共享流数据表 pubTable 与 outputTable
share(streamTable(1:0, `time`id`price`qty, [TIME, SYMBOL, DOUBLE, LONG]),
"pubTable")
share(streamTable(1:0, `time`id`price`qty`amount, [TIME, SYMBOL, DOUBLE, LONG,
DOUBLE]), "outputTable")
go
// 定义流处理方法
def streamProcessFunc(msg)\{
t = select time, id, price, qty, price*qty as amount from msg
outputTable.append!(t)
\}
// 提交流订阅
subscribeTable(tableName="pubTable", actionName="demo1", offset=-1,
handler=streamProcessFunc, msgAsTable=true)
// 注入数据到发布表
insert into pubTable values(09:51:49.581, "GOOG", 2.6, 100)
---
上例演示了 DolphinDB 基本的流订阅流程:
- 首先通过 streamTable 函数创建流数据表 pubTable 作为流数据发布表,outputTable 作为订阅数据的接收表。执行 share 语句实现流数据表会话间共享。
·定义函数 streamProcessFunc 用于计算每条消息价格和数量的乘积得出交易总量,并输出到流数据表 outputTable。
-执行 subscribeTable 函数提交订阅,并指定 streamProcessFunc 为数据处理方法。系统会分配后台线程处理订阅到的数据。
- 向发布表 pubTable 插入数据。数据被推送至消费队列,订阅端后台线程处理数据并将结果推送至输出表 outputTable。
图 1-1 流数据订阅示例

### 1.2 流处理架构
#### 1.2.1 流数据处理
流数据是基于事件持续生成的时间序列数据。与静态有界的历史数据不同,流数据具有以下特点:
- 动态:数据流持续动态生成,流的结束没有明确定义,数据的大小与结构也没有固定限制。
- 有序:每条流数据记录都具有时间戳或者序列号,标识了数据在流中的位置与顺序。
·大规模:流数据通常以高速率生成,数据规模大,对处理引擎的并行处理性能和可扩展性有更高要求。
・强时效:流数据的强时效性要求极低延迟的读取和处理能力,以最大化数据价值,驱动实时业务决策。
流数据处理是指在实时数据流上进行实时计算和分析的过程。与批处理不同,流处理无需等待所有数据全部到位,即可按照时间顺序对数据进行增量处理。这种实时处理方式能够高效利用存储与计算资源,适用于需要快速响应和及时决策的应用场景。
| 批处理 | 流处理 |
| 数据范围 | 对数据集中的所有或大部分数据进行查询或处理 | 对时间窗口内的数据或对最近的数据记录进行查询或处理 |
| 数据大小 | 大批量数据 | 单条记录或包含几条记录的小批量数据 |
| 性能 | 几分钟至几小时的延迟 | 亚毫秒级延迟 |
#### 1.2.2 流处理架构
为实现不同应用场景的业务逻辑,DolphinDB 采用了模块化与可扩展的流处理框架,将核心的流计算组件 (如流数据表、流计算引擎等)设计为可重用的模块。模块可以像积木一样通过发布-订阅的方式灵活组合, 形成功能强大的流水线。此外,DolphinDB 还提供了 1500 多个内置函数,可用作流计算引擎的算子,极大简化了流计算应用的开发过程。同时,用户也可以根据业务需求使用自定义函数实现复杂指标计算,即使是复杂
1 - 概述
的金融算法也可以通过 DolphinDB 的脚本语言灵活实现。此外,JIT 编译等技术的应用也进一步提高了流计算的性能。
## 图 1-2 流数据处理框架

上图展示了 DolphinDB 流数据处理框架。实时数据注入到流数据发布表后可同时供多方订阅消费:
·流数据计算引擎:流数据计算引擎可以订阅流数据进行实时分析。计算结果可以通过可视化平台如 Grafana 进行实时展示,也可以再次发布,供其他数据节点或应用进行二次订阅和事件处理。
・API 客户端:通过 DolphinDB API,第三方客户端如 Python 应用程序可以方便地订阅流数据,进行业务操作。这种灵活的 API 订阅方式使得 DolphinDB 的流数据可以被集成到各种应用场景中,满足多样化的实时数据需求。
·消息中间件:将流数据实时灌注到消息中间件(如 Apache Kafka)的消息队列中,供下游模块订阅程序消费。
- 数据仓库:数据仓库可以订阅并保存流数据,作为分析系统与报表系统的数据源。可以将实时数据用于后续的离线分析和报表生成,实现对历史数据和实时数据的统一分析。
#### 1.2.3 发布/订阅 (Pub/Sub) 模型
DolphinDB 采用了经典的发布/订阅 (Pub/Sub) 通信模型,通过消息队列实现流数据的发布与订阅。该模型将流数据发布端与订阅端解耦,各模块支持独立开发管理,可以增强系统的可拓展性并提升发送者的响应效率。
图 1-3 发布/订阅模型

发布数据:发布端在每个节点上维护一个发布队列。当新的流数据注入到该节点的流数据发布表
时,DolphinDB 会将这些数据推送到相应的消息发布队列,再由发布线程将数据发布到各个订阅端的消费队列。
订阅数据:每个订阅线程对应一个消费队列。订阅成功提交后,每当有新数据写入流数据发布表时,DolphinDB 会主动通知所有订阅方,消费线程从消费队列中获取数据进行增量处理。
## 第 2 章. 发布与订阅
### 2.1 流数据表
#### 2.1.1 创建与删除流数据表
DolphinDB 以流数据表作为实时数据流的载体进行流数据的存储、发布与订阅。流数据表是一种特殊的内存表,包括常规流数据表、键值流数据表与高可用流数据表。流数据表可以视为简化的消息中间件,或是消息中间件中的主题(topic),用户可以向其发布数据,也可以从中订阅数据。
用户可以执行 streamTable 函数创建流数据表。与普通内存表不同,流数据表中的记录支持同时读写, 可以增加记录,但不可修改或删除。使用 share 语句将流数据表共享到所有会话后,表中数据即可被订阅。DolphinDB 支持多个会话中的多个订阅端订阅同一个流数据表。实时数据写入该表后,会向所有订阅该表的订阅端推送数据。
---
colName=["Name","Age"]
colType=["string","int"]
t = streamTable(1:0, colName, colType)
share t as st1
---
通过 streamTable 函数创建的流数据表可以包含重复记录。在订阅多个数据源时,如要避免数据的重复写入,可以使用 keyedStreamTable 函数创建包含主键的键值流数据表。表中主键不包含重复值,如果新写入的数据与已有记录的主键重复,不会更新已有记录。用户也可以使用 haStreamTable 函数创建高可用流数据表,高可用流数据表在日志中持久化了表结构信息,节点重启后不需要重新建表。同时,高可用流数据表也可以通过设置主键过滤重复记录。关于流数据高可用的详细解决方案,请参阅5.3 流数据高可用小节。
用户可以在取消所有订阅后使用 dropStreamTable 删除流数据表。例如,删除上述语句创建的共享流数据表 st1:
---
dropStreamTable(`st1)
---
#### 2.1.2 持久化流数据表
流数据表本质是特殊的内存表,系统在默认情况下会把流数据表的所有数据都保存在内存中。由于流数据持续增长的特性,内存可能会被逐渐耗尽,DolphinDB 因此支持了流数据表持久化功能,用户可将内存中的流数据以异步或同步的方式保存到磁盘中。
要持久化流数据表,首先需要在发布节点设置持久化路径参数 persistenceDir,创建流数据表后执行 enableTableShareAndPersistence 命令共享并持久化流数据表。用户也可以执行 share 语句共享流数据表后再通过 enableTablePersistence 命令持久化流数据表。持久化函数的参数设定可能会影响流数据系统的性能,其中:
·asynWrite 指定是否以异步模式持久化数据到磁盘。采用异步持久化的方式可以提高系统的吞吐量,但是节点重启可能会导致最后几条数据丢失。
- compress 指定是否将持久化的数据压缩后保存到磁盘。数据压缩可以减少磁盘写入量和空间占用。
- cacheSize 限制了流数据表在内存中保留的最大记录数。设置合理的内存占用量可以有效保证数据安全。
- flushMode 表示是否开启同步刷盘。内存中的流数据首先写入操作系统缓存,若开启同步刷盘,一批数据落盘后才会开始下一批数据的写入。
开启流数据表持久化有以下优点:
- 当前最新的一批记录保留在内存中,较早的记录保存在磁盘中,能够有效避免内存不足问题。
- 如发生节点异常重启,用户调用持久化命令会将持久化的流数据重新载入表中。
·流订阅可以从磁盘上保存记录的偏移量重新开始。
下表对比了常规流数据表、共享流数据表与持久化流数据表。
| 流数据表 | 共享流数据表 | 持久化流数据表 |
| 生命周期 | 当前会话 | 当前节点 | 重启后仍可加载 |
| 其他会话可见 | 否 | 是 | 是 |
| 可被订阅 | 否 | 是 | 是 |
| 存储方式 | 仅内存 | 仅内存 | 内存和磁盘 |
| 重启后可恢复 | 否 | 否 | 是 |
### 2.2 发布与订阅
#### 2.2.1 发布
DolphinDB 流计算框架采用了发布-订阅的模式。流数据发布首先要定义共享流数据表,实时写入该表的数据将会发布到所有订阅端。流表可部署于 DolphinDB 的数据节点或计算节点上,其所在的节点称为发布节点, 订阅方所在的节点称为订阅节点。例如,通过以下脚本定义并共享流数据表 pubTable:
share streamTable(1:0, `timestamp` temperature, [TIMESTAMP, DOUBLE]) as pubTable
DolphinDB 支持多种方式写入共享流数据表。在研究阶段,用户可以通过 append!或 insert into 语句直接向流数据表写入数据,也可以使用 DolphinDB 内置的 replay 函数将库内历史数据以指定速率回放,模拟流的形式注入流数据表。在实际生产中,可以通过插件将外部数据源产生的记录实时接入流数据表,或通过 DolphinDB API 的写入接口将外部数据写入流数据表。具体使用示例参见 用户手册-实时流数据接入。
#### 2.2.2 订阅
流数据的订阅方和发布方可以处于同一个节点、同一集群中的不同节点、或不同集群中的节点。外部的客户端应用也可以向 DolphinDB 订阅数据。SubExecutor 是 DolphinDB 集群中的消息处理线程,DolphinDB 的一个数据节点或计算节点对应一个进程,每个节点上会有一到多个消息处理线程。若订阅方位于 DolphinDB 节点上,则对应的消费会被分配到后台线程上运行。若订阅方处于外部客户端,则消费运行在外部程序中。
2 - 发布与订阅
## 图 2-1 DolphinDB 订阅模式

在 DolphinDB 中,用户可以通过 subscribeTable 函数订阅流数据表。只需指定表或引擎名称与数据处理方法即可创建订阅关系,函数也提供了丰富的可选参数用于灵活配置订阅。流处理示例小节演示了基本的订阅流程,通过 subscribeTable 提交订阅后,系统会分配一个后台消费线程接收并处理发布表新增的数据,并把处理后的数据推送至输出表。使用 subscribeTable 函数提交订阅脚本编写简单,在内存中直接处理数据, 降低了数据传输和转换的开销。
DolphinDB API 提供了类似 subscribeTable 的流订阅接口,用户可以在第三方客户端(例如 Python 应用程序)直接订阅 DolphinDB 的流数据表。这种订阅模式可以方便地与其他语言集成,将 DolphinDB 中的流数据应用到各种业务场景中,满足多样化的实时数据需求。
用户也可以通过插件订阅外部的消息中间件(如 Apache Kafka,ZeroMQ 等)的数据。例如可以使用 DolphinDB kafka 插件的 subscribe 接口订阅 Kafka 主题。通过插件对接外部消息中间件实现了数据生产与消费者间的解耦的订阅关系,灵活适应各种数据类型与协议。
### 2.3 功能特性
DolphinDB 的流数据订阅实现了以下功能:
·发布数据过滤:支持指定发布表的过滤列与过滤规则,只将符合特定条件的数据发送给订阅者。发布数据的过滤能够有效减少传输的数据量,提高传输的效率。
·自定义起始偏移量:用户在发起订阅时可以通过参数指定任意起始偏移位置,订阅偏移量与流数据表的行数一一对应,订阅可以从指定偏移量的对应位置开始消费。
·本地及远程订阅:DolphinDB 的发布/订阅模型支持本地和远程订阅,订阅者可以在同一台服务器上或不同服务器之间订阅数据,更好地满足分布式数据处理的需求。远程订阅采用了 TCP/IP 通信协议,保证数据传输的可靠性和实时性。
·多方订阅:发布订阅模型允许多个订阅者同时订阅同一个数据流,实现多方同时消费数据的功能。这为数据的共享和多样化的数据处理需求提供了便利。
·断线重连:在流数据传输过程中,如果订阅者因网络故障或其他原因与服务端断开连接,订阅可以自动重连确保数据传输的稳定性。开启断线重连后,订阅端会记录流数据发布的偏移量,连接恢复时订阅端会从偏移量开始重新订阅。
## 第 3 章. 流计算引擎
使用 DolphinDB 处理历史数据时,可以结合 SQL 语句与内置函数进行全量或增量的查询和计算。但实时数据场景要求高效即时处理,全量查询和计算无法满足这种需求。因此,DolphinDB 研发了适合流式处理的计算引擎,在系统内部采用增量计算,优化了实时计算的性能。流计算引擎是 DolphinDB 实时数据处理解决方案的核心组件,可以视作封装的独立计算黑盒,通过向其写入数据触发计算,并将计算结果输出到目标表。DolphinDB 系统内置了十余种流数据计算引擎,本节会详细介绍引擎分类,主要引擎的计算规则与应用场景,并辅以典型示例进行说明。
### 3.1 引擎分类
DolphinDB 流计算引擎提供了灵活的计算方式和丰富的计算功能,适用于多样化的实时数据处理需求。根据参与计算的表数量和类型,流计算引擎可以分为单表计算和多表连接两种类型:
图 3-1 流计算引擎分类

单表计算引擎应用于单个流数据表:
- 分组内时序计算:对数据进行分组,在组内进行逐条计算、窗口聚合或异常检测。
- 跨分组截面计算:对数据进行分组,选取每组的最新数据进行截面计算。
多表连接类似于 SQL 表连接 (JOIN) 操作,用于实时关联两张表。连接引擎的左表都是流数据表,根据右表的类型不同,多表连接可以分为双流关联和维表关联:
·双流关联(右表是流数据表):根据数据的时序关系进行等值或模糊匹配。
- 维表关联(右表可以是流数据表或静态维度表):流数据表实时关联右表快照。
### 3.2 引擎详解
#### 3.2.1 单表计算引擎
## (1)响应式状态引擎(createReactiveStateEngine)
## 计算规则:
响应式状态引擎能够即时响应注入数据,每当有新的数据到达引擎即触发一次计算,并输出一条计算结果。DolphinDB 针对生产业务中常用的状态算子(滑动窗口函数、累积函数、序列相关函数和 topN 相关函数等)进行了优化,采用增量算法大幅提升了这些算子在引擎中的计算效率。在 DolphinDB 提供的内置函数中,响应式状态引擎目前仅支持系统优化过的状态函数;如需实现更加复杂的计算逻辑,用户也可以通过 @state 声明自定义函数封装复杂的状态算子。
## 应用场景:
响应式状态引擎能够快速响应新数据,适用于对单条数据即时响应并实时更新输出计算结果的高频计算场景, 例如:
・金融:实时计算高频因子,如基于快照数据计算股票的短时涨幅等。
•物联网:检测传感器状态的变化,如实时监控温度、湿度、压力等数据指标的波动。
## 使用案例:实时计算 5 分钟涨速
本例通过回放一天的快照行情数据模拟实时数据流,响应式状态引擎实时响应每条输入数据,并基于当前数据计算并输出过去 5 分钟涨速。
---
// 定义流数据发布表与结果输出表
schemaTable = loadTable("dfs://SH_TSDB_snapshot_ArrayVector",
"snapshot").schema().colDefs
share(streamTable(1:0, schemaTable.name, schemaTable.typeString),
`snapshotStreamTable)
share(streamTable(1:0, ["SecurityID", "DateTime", "factor_5min"], [SYMBOL,
TIMESTAMP, DOUBLE]), `changeResultTable)
go
// 定义自定义状态函数
@state
def calculateChange(DateTime, LastPx, lag)\{
PREVLASTPX = tmove(DateTime, LastPx, lag)
return (LastPx - PREVLASTPX) \\ PREVLASTPX
\}
// 定义响应式状态引擎
metrics=<[DateTime, calculateChange(DateTime, LastPx, lag=5m)]>
rse = createReactiveStateEngine(name="calChange", metrics=metrics,
dummyTable=snapshotStreamTable, outputTable=changeResultTable,
keyColumn=`SecurityID)
## // 订阅上游输入表
subscribeTable(tableName="snapshotStreamTable", actionName="snapshotFilter",
offset=-1, handler=rse, msgAsTable=true, hash=0)
// 后台提交回放任务
data = replayDS(sqlObj=