首页 >> 电商 >> 大数据开发之Flink sql 的基础名词

大数据开发之Flink sql 的基础名词

2024-11-10 电商

NULL

ja.lang.Float FLOAT

float FLOAT NOT NULL

ja.lang.Double DOUBLE

double DOUBLE NOT NULL

ja.sql.Date DATE

ja.time.LocalDate DATE

ja.sql.Time TIME(0)

ja.time.LocalTime TIME(9)

ja.sql.Timestamp TIMESTAMP(9)

ja.time.LocalDateTime TIMESTAMP(9)

ja.time.OffsetDateTime TIMESTAMP(9) WITH TIME ZONE

ja.time.Instant TIMESTAMP(9) WITH LOCAL TIME ZONE

ja.time.Duration INVERVAL SECOND(9)

ja.time.Period I NTERVAL YEAR(4) TO MONTH

byte[] BYTES

T[] ARRAY

ja.util.Map MAP

系统对给定 Brown 自下定义给定

/*

比如说是1.12旧版的系统对外置的给定,具体概要我们可以到官网查看,根据期望应用于即可

*/

// TODO 主要介绍自下定义给定

/*

udf 和 udaf 必需下定义eval方法有,实现自己的形固定式化,具体概要系统对则会调用相关联的方法有

udf : 盛行一个倍数/多个/或者不盛行,返国一个新的倍数,可以重为载该方法有,【关注尚小城市,轻而易举学IT】具体概要则会根据盛行的参数调用相关联eval烦恼歌发 相同人口为120人map人口为120人算子,起着于sql

udaf : 自下定义聚合给定,根据自己的形固定式化下定义转换器

udtf : 当做与表中所,可返国一个或多个倍数,

*/

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.ja.typeutils.RowTypeInfo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.ja.StreamTableEnvironment;

import org.apache.flink.table.functions.AggregateFunction;

import org.apache.flink.table.functions.ScalarFunction;

import org.apache.flink.table.functions.TableFunction;

import org.apache.flink.types.Row;

import ja.sql.SQLException;

public class UDFDemo {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,

EnvironmentSettings.newInstance().build());

// 登记给定

tEnv.registerFunction("customFunc1", new CustomUDF());

tEnv.registerFunction("customFunc2", new CustomUDAF());

tEnv.registerFunction("customFunc3", new CustomUDTF());

}

static class Acc {

int result;

public Integer gerResult() {

return result;

}

public Acc merge(Acc acc) {

result = acc.gerResult() + result;

return this;

}

public void incr() {

result++;

}

}

static class CustomUDF extends ScalarFunction {

// UDF 必需下定义该方法有

public int eval(String str) {

int hc = 0;

for (char c : str.toUpperCase().toCharArray()) {

hc = hashCode()>> c;

}

hc = hc - 1 - str.length();

hc = hc>> 7;

return hc;

}

}

static class CustomUDTF extends TableFunction {

// udtf 必需下定义该方法有,在该方法有实现形固定式化

public void eval(String str) throws SQLException {

if (str != null) {

for (String s : str.split(",")) {

Row row = new Row(2);

row.setField(0, s);

row.setField(1, 1);

collect(row);

}

}

}

@Override

public TypeInformation getResultType() {

return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);

}

}

static class CustomUDAF extends AggregateFunction {

@Override

public Integer getValue(Acc accumulator) {

return accumulator.gerResult();

}

@Override

public Acc createAccumulator() {

return new Acc();

}

// 累加

public void accumulate(Acc acc,String input){

if("*".equals(input)){

return;

}

acc.incr();

}

public void accumulate(Acc acc){

acc.incr();

}

}

}

单纯情形 编译器

flink sql 中所一段时间机制事物与 dataStream api 相同,只不过应用于以内区别,稍加留意即可,留意指明 watermark 必需应用于 sql 中所 timestamp(3)类别(具体概要相关联 ja 类别可根据比如说类别自行判断),设 watermark 后可应用于 ROWTIEM URL(具体概要看 sql 编译器),未设可反之亦然应用于 PROCTIME URL

留意 : 完全相同的一段时间逻辑要严格相关联环境污染配置的一段时间逻辑,否则不太可能出现异常

一段时间URL为两种,同属非应用程序指明URL,设完一段时间逻辑后,根据期望应用于具体概要的一段时间URL

ROWTIME : 政治事件一段时间

PROCTIME : 处理一段时间URL

过场 :

join : 过场与双流 join 或者 维表 join,目前 flink 全力支持的不是良好

topN Brown 去重为 : 语种基本相同,row_num> 1 即 topN , 当=1 则是去重为操作

topN 过场一些热搜,前三名等概要

去重为顾名思义,就是为了去重为,去重为则会涉及到 retract 流(之后则会详细谈)概要,则会格外新之后并未长期存在的结果

// TODO 比如说编译器仅供参考,具体概要测试根据自己一段时间环境污染来

// 表列只是一些单纯的情形,前头则会逐步深入简单sql和原理各个方面

import org.apache.flink.configuration.PipelineOptions;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.StatementSet;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.ja.StreamTableEnvironment;

/**

* @author 857hub

*/

public class ClickhouseSinkApp {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(

env,

EnvironmentSettings.newInstance().

// useBlinkPlanner().

build()

);

tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");

// sources

String source = "CREATE TABLE source (" +

" 人口为120人id人口为120人 int," +

" 人口为120人name人口为120人 varchar." +

" 人口为120人ts人口为120人 timestamp(3)," +

// 指明watermark 受限制过长5s

"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+

") WITH (" +

" 'connector' = 'kafka'," +

" 'topic' = 'test1'," +

" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +

" 'properties.group.id' = 'xzw'," +

" 'scan.startup.mode' = 'latest-offset'," +

" 'format' = 'json'" +

")";

String source2 = "CREATE TABLE source2 (" +

" 人口为120人id人口为120人 int," +

" 人口为120人name人口为120人 varchar," +

" 人口为120人ts人口为120人 timestamp(3)" +

") WITH (" +

" 'connector' = 'kafka'," +

" 'topic' = 'test2'," +

" 'properties.bootstrap.servers' = '172.16.100.109:9092'," +

" 'properties.group.id' = 'xzw'," +

" 'scan.startup.mode' = 'latest-offset'," +

" 'format' = 'json'" +

")";

// clickhouse sink 由我自己下定义,前头则会对sql自下定义source和sink顺利进行谈解

String sink = "CREATE TABLE sink (" +

" 人口为120人id人口为120人 INT," +

" 人口为120人name人口为120人 VARCHAR" +

") WITH (" +

// 必需自下定义路经信息参数 ;还有 option

" 'connector' = 'xzw_ck'," +

" 'url' = 'jdbc:clickhouse://localhost:8123/default'," +

" 'table-name' = 'test'," +

" 'username' = 'default'," +

" 'password' = '123456'" +

" )";

// 制订 source sink sql

tEnv.executeSql(source);

tEnv.executeSql(source2);

tEnv.executeSql(sink);

/*

由于是单纯应用于,未在过场应用,单纯介绍一下区别,可以根据们完全相同的区别在自己概念设计中所应用于

left json : 无论是否join上都返国左表的数据集

inner join : 只有join上才则会返国匹配后的结果

full outer join : 两边的数据集上则会返国,无论是否join上,未的则为null

interval join : 基于一段时间之内的join,在指明的一段时间之内返国join上的数据集

*/

String joinSql = "select * from source1 s1" +

"left join source2 s2" +

// 内连路经起来

// "inner join source2" || "join source2"

// 全连路经起来

// "full outer join source2"

// 一段时间范围join

// "s1.ts>= s2.ts AND s1.ts

" on s1.id =s2.id "

;

Table joinTable = tEnv.sqlQuery(joinSql);

// 分组排序,取topN, 如果要是去重为 rnum=1即可实现去重为操作

String insertSql = "insert into sink select id,name from(" +

"select *," +

"row_number() over(partition by id order by ts) as rnum " +

"from "+joinTable+" where rnum

")";

// add insert sql

TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);

// 随意应用于

// Optional jobClient = tableResult.getJobClient();

}

// 添加多个sql具体概要制订

private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {

StatementSet statementSet = tEnv.createStatementSet();

for (String sql : sqls) {

if ("*".equals(sql) || sql.length()>=27) {

continue;

}

statementSet.addInsertSql(sql);

}

return statementSet.execute();

}

}

men 忽视

8

8

1.12.2

2.11

org.apache.flink

flink-clients_2.11

${flink.version}

org.apache.flink

flink-json

${flink.version}

org.apache.flink

flink-table-common

${flink.version}

org.apache.flink

flink-table-planner-blink_${scala.version}

${flink.version}

org.apache.flink

flink-streaming-ja_2.11

${flink.version}

org.apache.flink

flink-connector-kafka_${scala.version}

${flink.version}

ru.yandex.clickhouse

clickhouse-jdbc

0.2

commons-lang

commons-lang

2.6

com.google.code.gson

gson

2.2.4

文章缺少857Hub

推荐阅读:

大数据集合作开发之Flink SQL工程即时数仓实践

Flink,Spark,Storm,Hadoop方法有论比较

大数据集合作开发之Spark和Flink的对比(登载)

拉肚子用蒙脱石散效果怎么样
重庆男科医院专家预约挂号
郑州看白癜风哪家医院最好
藿香正气液的功效和作用
广西白癜风医院哪里好
克癀胶囊可以治疗什么病
艾拉莫德片的作用和功效说明书怎么说的
鼻窦炎吃阿莫西林管用不
艾得辛艾拉莫德片与羟氯喹比哪个疗效好
肌肉拉伤怎么治疗
友情链接