• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Flink 寫數據到ClickHouse

    標簽: Flink - 必知必會  flink  clickhouse  jdbc  大數據

    目錄

     

    一、導入clickhouse jdbc 依賴

    二、編寫 Flink 寫入ClickHouse代碼

    三、創建ClickHouse 表

    四、運行向localhost,7777端口發送數據,并啟動Flink應用程序

    五、查詢ClickHouse 數據結果,驗證數據是否寫入成功


    一、導入clickhouse jdbc 依賴

            <!-- 寫入數據到clickhouse -->
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.1.54</version>
            </dependency>

    二、編寫 Flink 寫入ClickHouse代碼

    Java Bean實體類

    package com.lei.domain;
    
    public class J_User {
        public int id;
        public String name;
        public int age;
    
        public J_User(int id, String name, int age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }
    
        public static J_User of(int id, String name, int age) {
            return new J_User(id, name, age);
        }
    }

    編寫ClickHouseUtil 工具類

    package com.lei.util;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    
    
    public class ClickHouseUtil {
        private static Connection connection;
    
        public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            String  address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
            connection = DriverManager.getConnection(address);
            return connection;
        }
    
        public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
            return getConn(host,port,"default");
        }
        public static Connection getConn() throws SQLException, ClassNotFoundException {
            return getConn("node-01",8123);
        }
        public void close() throws SQLException {
            connection.close();
        }
    }
    

    編寫 業務寫入ClickHouse類

    package com.lei.util;
    
    import com.lei.domain.J_User;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    
    
    public class J_MyClickHouseUtil extends RichSinkFunction<J_User> {
        Connection connection = null;
    
        String sql;
    
        public J_MyClickHouseUtil(String sql) {
            this.sql = sql;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = ClickHouseUtil.getConn("node-01", 8123, "default");
        }
    
        @Override
        public void close() throws Exception {
            super.close();
            if (connection != null) {
                connection.close();
            }
        }
    
        @Override
        public void invoke(J_User user, Context context) throws Exception {
            PreparedStatement preparedStatement = connection.prepareStatement(sql);
            preparedStatement.setLong(1, user.id);
            preparedStatement.setString(2, user.name);
            preparedStatement.setLong(3, user.age);
            preparedStatement.addBatch();
    
            long startTime = System.currentTimeMillis();
            int[] ints = preparedStatement.executeBatch();
            connection.commit();
            long endTime = System.currentTimeMillis();
            System.out.println("批量插入完畢用時:" + (endTime - startTime) + " -- 插入數據 = " + ints.length);
        }
    }
    

    編寫Flink 業務類,即執行業務邏輯

    package com.lei.sinktest;
    
    import com.lei.domain.J_User;
    import com.lei.util.J_MyClickHouseUtil;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /*
        進入clickhouse-client
        use default;
        drop table if exists user_table;
    
        CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
     */
    public class J05_ClickHouseSinkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            env.setParallelism(1);
    
            // source
            DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    
            // Transform 操作
            SingleOutputStreamOperator<J_User> dataStream = inputStream.map(new MapFunction<String, J_User>() {
                @Override
                public J_User map(String data) throws Exception {
                    String[] split = data.split(",");
                    return J_User.of(Integer.parseInt(split[0]),
                            split[1],
                            Integer.parseInt(split[2]));
                }
            });
    
            // sink
            String sql = "INSERT INTO default.user_table (id, name, age) VALUES (?,?,?)";
            J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil(sql);
            dataStream.addSink(jdbcSink);
            dataStream.print();
    
            env.execute("clickhouse sink test");
        }
    }
    

    三、創建ClickHouse 表

    -- 進入clickhouse-client
    use default;
    drop table if exists user_table;
    
    CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();

    四、運行向localhost,7777端口發送數據,并啟動Flink應用程序

    五、查詢ClickHouse 數據結果,驗證數據是否寫入成功


    文章最后,給大家推薦一些受歡迎的技術博客鏈接

    1. JAVA相關的深度技術博客鏈接
    2. Flink 相關技術博客鏈接
    3. Spark 核心技術鏈接
    4. 設計模式 —— 深度技術博客鏈接
    5. 機器學習 —— 深度技術博客鏈接
    6. Hadoop相關技術博客鏈接
    7. 超全干貨--Flink思維導圖,花了3周左右編寫、校對
    8. 深入JAVA 的JVM核心原理解決線上各種故障【附案例】
    9. 請談談你對volatile的理解?--最近小李子與面試官的一場“硬核較量”
    10. 聊聊RPC通信,經常被問到的一道面試題。源碼+筆記,包懂
    11. 深入聊聊Java 垃圾回收機制【附原理圖及調優方法】

    歡迎掃描下方的二維碼或 搜索 公眾號“大數據高級架構師”,我們會有更多、且及時的資料推送給您,歡迎多多交流!

                                               

           

    版權聲明:本文為weixin_32265569原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/weixin_32265569/article/details/112133937

    智能推薦

    三元運算符引起的bug

    三元運算符引起的bug 前言 延伸 總結 前言 今天測試提交了一個bug,跟蹤發現了一段代碼,初看還沒什么問題,簡易后的代碼: 這段代碼想要實現的功能是返回 細看發現后面使用的是三元運算符,這就是造成這個bug的元兇。 解決方案:使用括號括起來。 延伸 在做這個例子的時候,使用ideal,習慣性的使用sonar掃描: 這里很明顯的提醒這里表達式有問題。 總結 三元運算符格式:expression1...

    利用dom4j來生成xml

    xml比較常用,處理xml的方式也比較多。現在就聊聊如何用dom4j來生成xml吧 先看看效果。。。 上傳代碼: 大概說下主要步驟吧:   1.創建document對象,創建root根節點,然后通過根節點繼續生成節點。其中也可以給該節點添加屬性,復制等。   2.創建輸出流,將該對象輸出到xml文件中去。需要設置好路徑和文件名等。...

    Android輪播圖原理思路分析+實現方案

    來自:http://blog.csdn.net/wubihang/article/details/52512597 ListView的headerView設置為輪播圖之后結合上/下拉刷新/加載的模式成為現在大多數APP的一個必須具備的功能,對于許多初學者來說想要實現輪播圖這樣一個集線程睡眠、自動處理、替換過程中刷新UI界面的組合功能非常困難,沒有思路,感覺無從下手,去搜索各種實現方案,發現目前充斥...

    Lambda的編寫和使用---java8編程實戰

    目錄 1.使用匿名類來表示不同的行為來實現行為參數,代碼有點啰嗦,解決這個問題的新工具--Lambda表達式。 2.Lambda的概念 3.Lambda語法測試  4.在程序中如何使用Lambda表達式 1.使用匿名類來表示不同的行為來實現行為參數,代碼有點啰嗦,解決這個問題的新工具--Lambda表達式。       Lambda可以讓你很簡潔地表示一個行...

    Electron學習筆記[1]

    什么是Electron Electron(最初名為Atom Shell[3])是GitHub開發的一個開源框架。它允許使用Node.js(作為后端)和Chromium(作為前端)完成桌面GUI應用程序的開發。 Electron 可以讓你使用純 JavaScript 調用豐富的原生 APIs 來創造桌面應用。你可以把它看作一個專注于桌面應用的 Node.js 的變體,而不是 Web 服務器。 很多很...

    猜你喜歡

    SpingCloud踩坑記(二)SpringCloud配置中心

    springCloud配置中心 官網介紹如下 簡單來說:springCloud config項目,用來為分布式的微服務系統中提供集成式外部配置支持,分為客戶端和服務端。并且通過配置服務中心集中配置不同環境的變量,方便管理和配置遷移。 大致流程如下 涉及到三個角色: 配置中心服務端:為配置客戶端提供對應的配置信息,配置信息的來源是配置倉庫。應用啟動時,會從配置倉庫拉取配置信息緩存到本地倉庫中。 配置...

    按模板導出Excel

    在項目需求中,經常會遇到導出Excel,一般沒有模板的Excel很容易導出,那如果遇到格式很復雜的Excel怎么處理呢 例如下面這種: 我們使用 aspose很容易實現 需要jar包aspose-cells-8.5.2.jar 工具類 controller 編輯模板 導出效果如下...

    POI Excel 09 格式化單元格數據,設置打印區域

    @author YHC 格式化單元格數據: 運行后效果: 設置打印區域: 設置后的效果圖: 上面應該說單元格的線條變色的是打印區域; 如果以上有錯誤的地方還請大家指出!thanks!...

    精品国产乱码久久久久久蜜桃不卡