概要描述
Slipstream 重要的参数说明
详细说明
Mapred And Morphling mode 共同的参数
1. 引擎级别:需要在启动前通过Manager界面配置重启生效
ngmr.engine.mode (mapred) 引擎执行的模式,默认是mapred,若要使用事件驱动模式的stream,则要切换到morphling
2. Application级别: 设置在appproperties中的参数, application的参数对于当前app下所有的streamjob都是有效的。
**若是要每个streamjob区分不同的参数,可以设置在jobproperties中。
streamsql.use.eventmode (false) 是否使用事件驱动,当前如果引擎是morphling,会默认开启该参数,注意事件驱动和非事件驱动无法同时在同一个app下运行。
3. Job 级别:设置在jobproperties中,job级别参数设置到app级别则对app下所有streamjob都有效
streamsql.use.eventtime (false) 是否使用事件时间
stream.id 流任务名,即 jobId
streamsql.max.running.sql (1000) 最大执行SQL数
streamsql.max.running.task (1000) 最大运行任务数
Mapred mode下的参数
引擎级别
stream.driver.checkpoint.dir checkpoint的目录,可配置为hdfs的目录
stream.driver.enable.autofailover (false) 是否开启自动failover
stream.driver.ha.node (/InceptorServer2) zk上的节点信息
App&Job 级别
stream.number.receivers (-1) receiver的个数, -1表示根据数据源来计算,例如几个partition就启动几个receiver
streamsql.enable.hdfs.batchflush (false) 是否打开 batch flush, 防止产生过多小文件
streamsql.hdfs.batchflush.size (100000) 打开batch flush 每批次的大小
streamsql.hdfs.batchflush.interval.ms (60000) batch flush 的时间间隔
streamsql.eventtime.zerotime (0) 事件时间的起点
streamsql.eventtime.maxlag.on.disorder (-1) 事件时间允许流数据乱序的最大的间隔时间,-1表示数据是有序的
streamsql.infinitewindow.numreducers (8) infinite window reduce task 的数量
与任务调度有关
stream.batch.duration.ms (2000) 批任务每批的大小,默认2s产生一批任务
spark.streaming.blockInterval (-1) 每个block的间隔,默认200ms,一批2秒的任务最多可包含10个block
spark.streaming.blockQueueSize (-1) block 暂存的queue 大小,默认10
spark.streaming.receiver.maxRate (-1) receiver数据接受的最大速率,默认是0表示不受限制,即从数据源往receiver 每秒push 最大条数的数据
该参数通过控制数据流量以防止突然数据量增大,系统处理不过来,导致内存溢出等问题。
HA相关的参数
application.checkpoint.dir application 开启checkpoint 的目录
application.enable.wal (false) 是否开启WAL
Morphling mode下的参数
1. 引擎级别
网络相关参数,slipstream shuffle通过netty进行网络通信
ngmr.io.mode (NIO), Netty IO 方式,可选EPOLL
ngmr.io.directbufs (true), Netty是否使用对外内存,默认是true
ngmr.memsegment.size (32768) 系统一个memory page 的大小,单位bytes, 用于设置NetworkBufferPool一个buffer的大小,设置Netty高低水位值
ngmr.network.num.buffers (10240) NetworkBufferPool 初始化大小,即有多少个buffer可用
ngmr.network.client.num.threads (0) Netty Client端EventLoop线程池大小,默认0表示当前CPU core数的2倍
ngmr.network.server.num.threads (0) Netty Server端 EventLoop线程池大小,设置范围建议为[ numOfCores + 1, 2 * numOfCores]
ngmr.network.client.timeout (120) 单位秒, client网络超时时间
ngmr.network.sndrcv.buffer (0) Netty 底层TCP使用的buffer大小,0表示采用Netty的默认值
Linux下读写的默认值配置在文件中。
第一个表示最小值,第二个表示默认值,第三个表示最大值。
使用HA时需要配置的额外参数
spark.morphling.recovery.mode (standalone / zookeeper ) 如果需要支持Exactly Once,则配置Zookeeper
spark.morphling.taskstate.backend (filesystem) task状态的存储系统,可以说会memory, filesystem, db. 当前只支持filesystem
spark.morphling.taskstate.checkpoint.directory ( hdfs://xxx/checkpoints) Morphling Task保存State的checkpoints的地址,通常是分布式文件系统,保证不同的executor都能访问
当使用Zookeeper的RecoverMode时,相关的配置如下:
spark.morphling.completed.checkpoints.storage.dir (/morphling/completedcheckpoints) Morphling Driver端保存任务状态checkpoints的地址
Zookeeper Client相关参数:
spark.morphling.zookeeper.quorum (localhost:2181); Zookeeper的quorum
spark.morphling.zookeeper.session.timeout (10000);超时时间,单位为ms
spark.morphling.zookeeper.connect.timeout (5000);超时时间,单位为ms
spark.morphling.zookeeper.max.retries (3); 最多重试连接次数
spark.morphling.zookeeper.retry.wait (5000); 每次重试等待时间,单位为ms
Zookeeper上Znode路径相关参数:
spark.morphling.zookeeper.root.path (/morphling); Morphling使用zookeeper的Root Path
spark.morphling.zookeeper.namespace (/default); 当前集群使用的子znode,用于区分多集群的环境,和Root Path配合使用
spark.morphling.zookeeper.checkpoints.path (/checkpoints); 用于记录morphling人物中当前完整的checkpoint的信息
spark.morphling.submitted.job.path (/runningjobs); 当开启morphling.job.enable.auto.failover自动Failover时,会保留提交的任务的信息
2. Application级别
若是要每个streamjob区分不同的参数,可以设置在jobproperties中。
streamsql.use.eventmode (false) 是否使用事件驱动,当前如果引擎是morphling,会默认开启该参数,注意事件驱动和非事件驱动无法同时在同一个app下运行。
3. Job 级别
streamsql.use.eventtime (false) 是否使用事件时间
morphling.maxlag.on.disorder (-1) 允许流数据乱序的最大间隔时间,-1表示数据有序
morphling.infinitewindow.reduce.tasks (-1), Infinite window 默认reduce任务的数量, -1表示等于当前reduce task的数目
morphling.result.auto.flush (false) 是否开启自动sink,若开启则有数据就立刻flush到目标表
morphling.hdfs.flush.size (100000) 数据刷新到目标表的batch大小,默认10万条数据会刷新一次
morphling.hdfs.flush.interval.ms (60000) 数据刷新到目标表的频率,默认60秒刷新一次,与条数的配置项相辅相成,按先满足条件执行
使用HA启动任务时的一些任务级别的参数
morphling.task.max.failures (-1), 任务最大允许失败的次数,若失败会自动恢复,-1表示不允许
morphling.job.enable.checkpoint (false), 是否开启checkpoint 功能
morphling.job.checkpoint.interval (60000), Checkpoint 的时间间隔
morphling.job.enable.auto.failover (false) 开启任务自动恢复,配合 morphling.task.max.failues使用
morphling.job.enable.wal.sink (false), 保证exactly-once语义,是否打开wal sink 功能
morphling.wal.sink.committer.type (mysql/voltdb) wal sink依赖的外部数据库类型,支持mysql和voltdb
morphling.wal.sink.voltdb.jdbc.url
morphling.wal.sink.mysql.jdbc.url commit type配置成啥就用哪个
使用metrics功能的参数
morphling.metrics.enable (true)