本文讲述如何使用Flink SQL + UDF来做Batch ETL和BI数据分析的任务。
首先介绍下Zeppelin中的Flink Interpreter类型。Zeppelin的Flink Interpreter支持Flink的所有API (DataSet, DataStream, Table API )。语言方面支持Scala,Python,SQL。下图是Zeppelin中支持的不同场景下的Flink Interpreter。
Name | Class | Description |
%flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment |
%flink.pyflink | PyFlinkInterpreter | Provides a python environment |
%flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment |
%flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment |
%flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment |
下图例举了所有重要的Flink配置信息,除此之外你还可以配置任意Flink的Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)
Property | Default | Description |
FLINK_HOME | Flink的安装目录 | |
HADOOP_CONF_DIR | Hadoop配置信息目录 | |
HIVE_CONF_DIR | Hive配置信息目录 | |
flink.execution.mode | local | Execution mode of flink, e.g. local | yarn | remote |
flink.execution.remote.host | jobmanager hostname if it is remote mode | |
flink.execution.remote.port | jobmanager port if it is remote mode | |
flink.jm.memory | 1024 | Total number of memory(mb) of JobManager |
flink.tm.memory | 1024 | Total number of memory(mb) of TaskManager |
flink.yarn.appName | Zeppelin Flink Session | Yarn app name |
flink.yarn.queue | queue name of yarn app | |
flink.execution.jars | additional user jars (comma separated) | |
flink.execution.packages | additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10 | |
zeppelin.pyflink.python | python | python binary executable for PyFlink |
table.exec.resource.default-parallelism | 1 | Default parallelism for flink sql job |
zeppelin.flink.scala.color | true | whether display scala shell output in colorful format |
zeppelin.flink.enableHive | false | whether enable hive |
zeppelin.flink.printREPLOutput | true | Print REPL output |
zeppelin.flink.maxResult | 1 | max number of rows returned by sql interpreter |
StreamExecutionEnvironment, ExecutionEnvironment, StreamTableEnvironment, BatchTableEnvironment
Flink Interpreter (%flink) 为用户自动创建了下面6个变量作为Flink Scala程序的入口。
senv
(StreamExecutionEnvironment),benv
(ExecutionEnvironment)stenv
(StreamTableEnvironment for blink planner)btenv
(BatchTableEnvironment for blink planner)stenv_2
(StreamTableEnvironment for flink planner)btenv_2
(BatchTableEnvironment for flink planner)
PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了6个python变量作为PyFlink程序的入口
- s_env (StreamExecutionEnvironment),
- b_env (ExecutionEnvironment)
st_env
(StreamTableEnvironment for blink planner)bt_env
(BatchTableEnvironment for blink planner)st_env_2
(StreamTableEnvironment for flink planner)bt_env_2
(BatchTableEnvironment for flink planner)
Flink 1.10中有2种table api的planner:flink
& blink
.
- 如果你用DataSet api以及需要把DataSet转换成Table,那么就需要使用Flink planner的TableEnvironment (
btenv_2
andstenv_2
). - 其他场景下, 我们都会建议用户使用
blink
planner. 这也是Flink sql使用的planner(%flink.bsql
&%flink.ssql
)
%flink.bsql 是用来执行Flink的batch sql. 运行 help
命令可以得到所有可用的命令
总的来说,Flink Batch SQL可以用来做2大任务:
- 使用
insert into
语句来做 Batch ETL - 使用
select
语句来做BI 数据分析
下面我们基于Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做Batch ETL任务。首先用Flink Sql创建一个raw 数据的source table,以及清洗干净后的sink table。
然后再定义Table Function来parse raw data。
接下来就可以用insert into语句来进行数据转换(source table –> sink table)
用select语句来Preview最终数据,验证insert into语句的正确性
经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的SQL Select语句进行分析,也可以使用Zeppelin的dynamic forms来增加交互性(TextBox,Select,Checkbox)
使用Flink UDF
SQL虽然强大,但表达能力毕竟有限。有时候就要借助于UDF来表达更复杂的逻辑。Flink Interpreter 支持2种UDF (Scala + Python)。下面是2个简单的例子。
Scala UDF
%flink class ScalaUpper extends ScalarFunction { def eval(str: String) = str.toUpperCase } btenv.registerFunction("scala_upper", new ScalaUpper())
Python UDF
%flink.pyflink class PythonUpper(ScalarFunction): def eval(self, s): return s.upper() bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
创建完UDF之后,你就可以在SQL里使用了。
对Hive数据的数据分析
除了可以分析Flink SQL创建的table之外,Flink也可以分析Hive上已有的table。如果要让Flink Interpreter使用Hive,那么需要做以下配置
- 设置
zeppelin.flink.enableHive
为true
- Copy 下面这些 dependencies 到flink的 lib 目录
- flink-connector-hive_{scala_version}-{flink.version}.jar
- flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
- flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
- hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
- 在Flink interpreter setting 里或者 zeppelin-env.sh里指定
HIVE_CONF_DIR
- 在Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的Hive版本
下面就用一个简单的例子展示如何在Zeppelin中用Flink查询Hive table
1. 用Zeppelin的jdbc interpreter查询hive tables
2. 用Flink sql 查询 hive table的schema
3. 用Flink Sql 查询hive table