morphling和mapred的区别

  其他常见问题
内容纲要

概要描述


本文旨在说明slipstream事件驱动和微批模式的异同
slipstream官网介绍链接:slipstream官网介绍

详细说明

Slipstream推荐事件驱动模型, 事件驱动模型是TDH4.8之后新增的流处理模型,5.1及之后默认开启事件驱动模型。ngmr.engine.mode参数决定了Slipstream选择何种引擎,ngmr.engine.mode=mapred为微批模型, ngmr.engine.mode=morphling为事件驱动模型。

也可以通过创建job的时候指定驱动模型:

CREATE STREAMJOB  as ("")
JOBPROPERTIES("streamsql.use.eventmode"="true"[,"key"="value"...]);

事件驱动模型更接近原生的流处理,一有数据进来就处理. 就好比流水线上的零件, 来一个处理一个
file

微批模型顾名思义还是基于"批"的思想. 将Input Stream按时间段分割成一个个小块处理. 就像是给流水线增加一个缓冲, 每隔一段时间去取一批零件加工
file

相比较微批模型, 事件驱动模型具有以下特点:

  1. 延迟更低: 微批是秒级的延迟,而事件驱动模型的延迟是毫秒级的.
  2. 功能更全:
    a. 支持会话窗口(Session Window),
    b. 复杂事件的处理(CEP)
    c. 更加完善的高可用性(HA)(9.3节)
    d. StreamJob的Auto-failover

Stream
事件驱动模型中,当数据源处有数据进来就立即处理,来一条处理一条.
file

微批模型下Input Stream被分割成一系列 小数据块(Batch) 来处理,Batch在获取数据时生成: 在一段单位时间内获取的数据都放进一个batch中,这段单位时间称为 Batch Interval,它的长度(Batch Duration)可以设置,如下图所示。

file

Derived Stream
事件驱动模型下, 每得到一条数据,就对其进行变形,得到Derived Stream:
file

而微批模型下,因为Input Stream是一系列batch,那么Stream的变形实际上是对batch计算得到新batch的
过程:
file

Window Stream
事件驱动模型下支持秒级的window stream
file

上图中窗口长度(Length)为4s,滑动间隔(Slide)为3s. 如此图中的第一个窗口为[t,t+4s),第二个窗口为[t+3s,t+7s),以此类推. 窗口的长度和滑动的间隔只要是1s的整数倍即可. 需要注意的是, 当t=0s,即Stream创建的初始时刻, 第一个窗口为[0,3s),比窗口长度会少1秒,这是因为Slide是执行窗口操作的间隔, Slide时间到了就会执行窗口计算.

微批模型下,窗口长度(Length)和滑动间隔(Slide)的长度都必须是batch duration的整数倍。
file
上图的窗口变形中,length是batch duration的2倍(SET streamsql.hdfs.batchflush.interval.ms=2;),slide是batch duration的1倍。如此图中的第一个窗口为[t,t+4s),第二个窗口为[t+2s,t+6s),以此类推. 与事件驱动模型相同的是,当t=0时,第一个窗口的长度等于Slide(2s).

从TDH 5.1开始, 在事件驱动模式下支持用户自定义输入源. 添加自定义输入源步骤如下:

  1. 继承 io.transwarp.slipstream.api.source.CustomDataSource 抽象类
    def init(id: Int, params: Properties): Unit
    /**
    * non-blocking
    * return null if get nothing
    */
    def poll(): Array[Byte]
    def close(): Unit

示例代码参考slipstream手册6.2.6.1.4章节

Stream DML 注意事项

事件驱动模式下, 只在 window stream 下支持聚合操作

微批模式: 支持流与流的join,支持流与表的join.
事件驱动模式下:
支持mapjoin和global lookup join.
流与流的Join要求join的两个流必须是window stream,且window的slide(滑动间隔)必须一致

从TDH5.1开始,在事件驱动模式下支持自定义输出,demo参考手册6.3.8.3章节

批量Flush到HDFS

支持批量将数据flush到HDFS上的TEXT表和ORC表,批量flush可以避免产生过多小文件。注意,目标表不能为事务表、分区表(5.1.2以前版本)或者分桶表。进行批量flush需要设置下面几个 job级别 的参数:
微批模式
SET streamsql.enable.hdfs.batchflush = true
–打开批量flush开关
SET streamsql.hdfs.batchflush.size =
–设置一次flush的消息个数,消息量达到该参数时flush一次
SET streamsql.hdfs.batchflush.interval.ms =
–设置每过多长时间(单位为毫秒)flush一次
只需满足 batchflush.size 和 batchflush.interval.ms 其中的一个条件即会触发一次flush。
事件驱动模式
SET morphling.result.auto.flush=true
–来一条数据刷一条到输出表, 可在测试时使用,不推荐在生产中使用.

批量Flush默认开启
• TDH5.1前
SET morphling.hdfs.flush.size =
–设置一次flush的消息个数,消息量达到该参数时flush一次, 默认100000
SET morphling.hdfs.flush.interval.ms =
–设置每过多长时间(单位为毫秒)flush一次,默认60000
• TDH5.1+
SET morphling.record.flush.size =
SET morphling.record.flush.interval.ms =
同微批模式, flush size 和 flush interval 满足其中之一就会触发一次Flush.

事件驱动模式下支持分区表函数(Partition Table Function,PTF)。 PTF将输入流视为一张分区表,根据定义的分区字段分成不同的partition,通过时间划分窗口,然后在这个窗口(有限数据集)上做分析。 与在Stream Window上做聚合计算不同的是,PTF功能更多,计算更灵活,详情参考手册6.9.1章节

微批模式下参数
file
file
file

事件驱动模式下参数

file
file
file

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。