Flink 寫數據到ClickHouse
標簽: Flink - 必知必會 flink clickhouse jdbc 大數據
目錄
四、運行向localhost,7777端口發送數據,并啟動Flink應用程序
五、查詢ClickHouse 數據結果,驗證數據是否寫入成功
一、導入clickhouse jdbc 依賴
<!-- 寫入數據到clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.54</version>
</dependency>
二、編寫 Flink 寫入ClickHouse代碼
Java Bean實體類
package com.lei.domain;
public class J_User {
public int id;
public String name;
public int age;
public J_User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public static J_User of(int id, String name, int age) {
return new J_User(id, name, age);
}
}
編寫ClickHouseUtil 工具類
package com.lei.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class ClickHouseUtil {
private static Connection connection;
public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
connection = DriverManager.getConnection(address);
return connection;
}
public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
return getConn(host,port,"default");
}
public static Connection getConn() throws SQLException, ClassNotFoundException {
return getConn("node-01",8123);
}
public void close() throws SQLException {
connection.close();
}
}
編寫 業務寫入ClickHouse類
package com.lei.util;
import com.lei.domain.J_User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class J_MyClickHouseUtil extends RichSinkFunction<J_User> {
Connection connection = null;
String sql;
public J_MyClickHouseUtil(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = ClickHouseUtil.getConn("node-01", 8123, "default");
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
@Override
public void invoke(J_User user, Context context) throws Exception {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setLong(1, user.id);
preparedStatement.setString(2, user.name);
preparedStatement.setLong(3, user.age);
preparedStatement.addBatch();
long startTime = System.currentTimeMillis();
int[] ints = preparedStatement.executeBatch();
connection.commit();
long endTime = System.currentTimeMillis();
System.out.println("批量插入完畢用時:" + (endTime - startTime) + " -- 插入數據 = " + ints.length);
}
}
編寫Flink 業務類,即執行業務邏輯
package com.lei.sinktest;
import com.lei.domain.J_User;
import com.lei.util.J_MyClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/*
進入clickhouse-client
use default;
drop table if exists user_table;
CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
*/
public class J05_ClickHouseSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
// source
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// Transform 操作
SingleOutputStreamOperator<J_User> dataStream = inputStream.map(new MapFunction<String, J_User>() {
@Override
public J_User map(String data) throws Exception {
String[] split = data.split(",");
return J_User.of(Integer.parseInt(split[0]),
split[1],
Integer.parseInt(split[2]));
}
});
// sink
String sql = "INSERT INTO default.user_table (id, name, age) VALUES (?,?,?)";
J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil(sql);
dataStream.addSink(jdbcSink);
dataStream.print();
env.execute("clickhouse sink test");
}
}
三、創建ClickHouse 表
-- 進入clickhouse-client
use default;
drop table if exists user_table;
CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
四、運行向localhost,7777端口發送數據,并啟動Flink應用程序
五、查詢ClickHouse 數據結果,驗證數據是否寫入成功
文章最后,給大家推薦一些受歡迎的技術博客鏈接:
- JAVA相關的深度技術博客鏈接
- Flink 相關技術博客鏈接
- Spark 核心技術鏈接
- 設計模式 —— 深度技術博客鏈接
- 機器學習 —— 深度技術博客鏈接
- Hadoop相關技術博客鏈接
- 超全干貨--Flink思維導圖,花了3周左右編寫、校對
- 深入JAVA 的JVM核心原理解決線上各種故障【附案例】
- 請談談你對volatile的理解?--最近小李子與面試官的一場“硬核較量”
- 聊聊RPC通信,經常被問到的一道面試題。源碼+筆記,包懂
- 深入聊聊Java 垃圾回收機制【附原理圖及調優方法】
歡迎掃描下方的二維碼或 搜索 公眾號“大數據高級架構師”,我們會有更多、且及時的資料推送給您,歡迎多多交流!
智能推薦
三元運算符引起的bug
三元運算符引起的bug 前言 延伸 總結 前言 今天測試提交了一個bug,跟蹤發現了一段代碼,初看還沒什么問題,簡易后的代碼: 這段代碼想要實現的功能是返回 細看發現后面使用的是三元運算符,這就是造成這個bug的元兇。 解決方案:使用括號括起來。 延伸 在做這個例子的時候,使用ideal,習慣性的使用sonar掃描: 這里很明顯的提醒這里表達式有問題。 總結 三元運算符格式:expression1...
利用dom4j來生成xml
xml比較常用,處理xml的方式也比較多。現在就聊聊如何用dom4j來生成xml吧 先看看效果。。。 上傳代碼: 大概說下主要步驟吧: 1.創建document對象,創建root根節點,然后通過根節點繼續生成節點。其中也可以給該節點添加屬性,復制等。 2.創建輸出流,將該對象輸出到xml文件中去。需要設置好路徑和文件名等。...
Android輪播圖原理思路分析+實現方案
來自:http://blog.csdn.net/wubihang/article/details/52512597 ListView的headerView設置為輪播圖之后結合上/下拉刷新/加載的模式成為現在大多數APP的一個必須具備的功能,對于許多初學者來說想要實現輪播圖這樣一個集線程睡眠、自動處理、替換過程中刷新UI界面的組合功能非常困難,沒有思路,感覺無從下手,去搜索各種實現方案,發現目前充斥...
Lambda的編寫和使用---java8編程實戰
目錄 1.使用匿名類來表示不同的行為來實現行為參數,代碼有點啰嗦,解決這個問題的新工具--Lambda表達式。 2.Lambda的概念 3.Lambda語法測試 4.在程序中如何使用Lambda表達式 1.使用匿名類來表示不同的行為來實現行為參數,代碼有點啰嗦,解決這個問題的新工具--Lambda表達式。 Lambda可以讓你很簡潔地表示一個行...
Electron學習筆記[1]
什么是Electron Electron(最初名為Atom Shell[3])是GitHub開發的一個開源框架。它允許使用Node.js(作為后端)和Chromium(作為前端)完成桌面GUI應用程序的開發。 Electron 可以讓你使用純 JavaScript 調用豐富的原生 APIs 來創造桌面應用。你可以把它看作一個專注于桌面應用的 Node.js 的變體,而不是 Web 服務器。 很多很...
猜你喜歡
SpingCloud踩坑記(二)SpringCloud配置中心
springCloud配置中心 官網介紹如下 簡單來說:springCloud config項目,用來為分布式的微服務系統中提供集成式外部配置支持,分為客戶端和服務端。并且通過配置服務中心集中配置不同環境的變量,方便管理和配置遷移。 大致流程如下 涉及到三個角色: 配置中心服務端:為配置客戶端提供對應的配置信息,配置信息的來源是配置倉庫。應用啟動時,會從配置倉庫拉取配置信息緩存到本地倉庫中。 配置...
按模板導出Excel
在項目需求中,經常會遇到導出Excel,一般沒有模板的Excel很容易導出,那如果遇到格式很復雜的Excel怎么處理呢 例如下面這種: 我們使用 aspose很容易實現 需要jar包aspose-cells-8.5.2.jar 工具類 controller 編輯模板 導出效果如下...
POI Excel 09 格式化單元格數據,設置打印區域
@author YHC 格式化單元格數據: 運行后效果: 設置打印區域: 設置后的效果圖: 上面應該說單元格的線條變色的是打印區域; 如果以上有錯誤的地方還請大家指出!thanks!...