• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Kafka2.6.0生產消費示例代碼

    標簽: kafka  

    前提:需要Kafka集群。

    創建一個簡單的gradle項目
    在這里插入圖片描述
    build.gradle配置信息

    buildscript {
        repositories {
            maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
            maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'}
        }
    }
    
    plugins {
        id 'java'
    }
    
    version 'unspecified'
    
    sourceCompatibility = 1.8
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        testCompile group: 'junit', name: 'junit', version: '4.12'
        compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.6.0'
        compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
    }
    
    allprojects {
        repositories {
            maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
            maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'}
        }
    }
    

    創建生產者類

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Properties;
    import java.util.Random;
    
    public class MsgKafkaProducer extends Thread {
    
        private final KafkaProducer<Integer, String> producer;
        private final String topic;
    
        public MsgKafkaProducer(String topic) {
    
            Properties props = new Properties();
            props.put("metadata.broker.list", "192.168.88.62:9092");
            props.put("bootstrap.servers", "192.168.88.62:9092");
            props.put("retries", 0);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
            this.topic = topic;
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            Random random = new Random();
            for (int i = 0; i < 1000; i++) {
                String messageStr = "Message" + random.nextInt(10000) + " " + messageNo;
                System.out.println("Send:" + messageStr);
                producer.send(new ProducerRecord<>(topic, messageStr));
                messageNo++;
            }
        }
    
        public static void main(String[] args) {
            MsgKafkaProducer producerThread = new MsgKafkaProducer("kafka_test");
            producerThread.start();
        }
    
    }
    

    創建消費者類

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class MsgKafkaConsumer implements Runnable {
    
        private final KafkaConsumer<String, String> consumer;
        private static final String GROUP_ID = "groupA";
    
        public MsgKafkaConsumer(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.88.62:9092");
            props.put("group.id", GROUP_ID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.consumer.subscribe(Collections.singletonList(topicName));
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            try {
                while (true) {
                    ConsumerRecords<String, String> msgList = consumer.poll(1000);
                    if (null != msgList && msgList.count() > 0) {
                        for (ConsumerRecord<String, String> record : msgList) {
                            System.out.println("=> topic : kafka_test " + messageNo + "=> receive: key => " + record.key() + ", value => " + record.value() + " offset => " + record.offset());
                            messageNo++;
                        }
                    } else {
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    
        public static void main(String args[]) {
            MsgKafkaConsumer test1 = new MsgKafkaConsumer("kafka_test");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    }
    
    版權聲明:本文為ASAS1314原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/ASAS1314/article/details/109842854

    智能推薦

    Centos7生產環境離線安裝ansible

    yum安裝Ansible(服務器網絡正常是前提): 離線安裝Ansible: 如果是python2.版本,setuptools的版本必須低于45,如果python3.版本,則可以安裝,否則編譯Ansible的時候會提示警告!(建議使用Python3+版本) 控制服務器:需要安裝 Python2.6/2.7 被控服務器:需要安裝 Python2.4 以上版本,若低于 Python2.5 需要安裝 p...

    蘋果M1芯片各種不支持,但居然可以刷朋友圈!你會買單嗎?

    上個月和大家一起分享過,最新的蘋果M1芯片上支持的各種開源軟件。什么?還沒讀過?趕緊點這里:一文解讀蘋果 M1 芯片電腦上的開源軟件。 現在已經過去了半個月,想必有不少的同學都已經入手了最新的蘋果M1芯片電腦。今天呢和大家來一起分享一個只有蘋果M1芯片電腦才有的福利喲~ 現在每個人的日常生活及工作中,基本都離不開微信,大多數同學都會在辦公時開一個網頁版微信,方便溝通交流(悄悄摸魚)。但是很多人喜歡...

    動態熱修復技術的實現

    原文地址為:動態熱修復技術的實現 項目地址:https://github.com/dodola/HotFix HotFix 安卓App熱補丁動態修復框架 介紹 該項目是基于QQ空間終端開發團隊的技術文章實現的,完成了文章中提到的基本功能。 文章地址:安卓App熱補丁動態修復技術介紹 項目部分代碼從 dalvik_patch 項目中修改而來,這個項目本來是用來實現multidex的,發現可以用來實現...

    初窺SpringSecurity安全框架

    文章目錄 概念 對應依賴 創建項目 自定義登陸用戶和密碼 新增SecurityConfig配置類 修改前臺配置 重啟項目驗證 登陸頁配置:記住我 退出的問題 作為一名開發怎能不知道大名頂頂的安全框架呢?市面上流行的安全框架有:shiro和springSecurity。那么你經常用哪個框架做安全訪問控制呢?因為SpringBoot集成了SpringSecurity,所以我們這次來聊聊它 概念 Spr...

    數據結構(三)——基于數組的隊列和循環隊列

        隊Queue也是一個線性的存儲結構,原則是先入先出(FIFO),區別于棧的先進后出。就類似與排隊買票,先進入隊列的就先買票出列;入隊在一端操作(隊尾),出隊只能在另一端操作(隊首);     一個隊列的基本操作就是入隊,出隊,獲取隊列大小,判斷是否為空等等;這篇博客就是自己實現一個基于數組的隊列和循環隊列。 &...

    猜你喜歡

    解決“Win 10 ipv6無網絡權限/無Internet連接權限”問題

    高校校園網有一個好處就是可以自由使用ipv6上各種BT、PT,然而ipv6由于商用不廣泛,各種ipv6解決方案不規范,導致各種使用問題。我最近遇到的問題:裝完Win 10后,已經換了hosts文件,為了上goole,問題是 ipv6就出現了無網絡權限。 經過試驗,發現核心解決方案,僅僅是使用CMD或者Windows Power Shell執行網絡配置命令即可:     &nbs...

    規則引擎QLExpress

    本文從官網總結而來,并在網上摘抄了幾個核心概念的解釋(比如強弱類型、動態靜態類型)。由于摘抄的比較雜亂,這里就不一一列舉原網站了,望諒解。 摘要 QLExpress與Groovy比較 QLExpress Groovy 解析型,編譯成自定義的內存指令 編譯形,產生一個獨立的class文件 表達式語言(EL expression language) 表達式語言(EL expression langua...

    05、隊列(queue):隊列在線程池等有限資源池中的應用

    05、|隊列(queue):隊列在線程池等有限資源池中的應用 5.1 如何理解“隊列”? 先進者先出,這就是典型的“隊列” 最基本的操作也是兩個: 入隊 enqueue(),放一個數據到隊列尾部; 出隊 dequeue(),從隊列頭部取一個元素。 實現: 用數組實現的隊列叫作順序隊列; 用鏈表實現的隊列叫作鏈式隊列。 隊列需要兩個指針: 一個是 he...

    【轉】WPF MVVM 循序漸進 (從基礎到高級) 【已翻譯100%】【2】

    原文地址:http://www.oschina.net/translate/wpf-mvvm-step-by-step-basics-to-advance-level?lang=chs&page=2# 上面的 command 代碼中,ViewModel 對象是通過構造函數傳遞進來。所以 ViewModel 類需要創建一個 command 對象來暴露這個對象的“ICommand&...

    【Python+OpenCV】目標跟蹤-卡爾曼濾波-鼠標軌跡跟蹤

    卡爾曼是匈牙利數學家,Kalman濾波器源于其博士畢業了論文和1960年發表的論文《A New Approach to Linear Filtering and Prediction Problems》(線性濾波與預測問題的新方法)。 論文地址 卡爾曼濾波算法分為兩個階段: 預測階段:卡爾曼濾波器使用由當前點計算的協方差來估計目標的新位置; 更新階段:卡爾曼濾波器記錄目標的位置,并為下一次循環計算...

    精品国产乱码久久久久久蜜桃不卡