`
ssxxjjii
  • 浏览: 930613 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

消息系统kafka介绍

 
阅读更多

http://dongxicheng.org/search-engine/kafka/

http://shift-alt-ctrl.iteye.com/blog/1930791

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

Php代码  收藏代码
  1. clientPort=2181  
  2. server.0=127.0.0.1:2888:3888  
  3. server.1=127.0.0.1:2889:3889  
  4. server.2=127.0.0.1:2890:3890  
  5. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

Java代码  收藏代码
  1. ./zkServer.sh start  

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2182  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2183  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

Java代码  收藏代码
  1. broker.id=0  
  2. port=9092  
  3. num.network.threads=2  
  4. num.io.threads=2  
  5. socket.send.buffer.bytes=1048576  
  6. socket.receive.buffer.bytes=1048576  
  7. socket.request.max.bytes=104857600  
  8. log.dir=./logs  
  9. num.partitions=2  
  10. log.flush.interval.messages=10000  
  11. log.flush.interval.ms=1000  
  12. log.retention.hours=168  
  13. #log.retention.bytes=1073741824  
  14. log.segment.bytes=536870912  
  15. num.replica.fetchers=2  
  16. log.cleanup.interval.mins=10  
  17. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  18. zookeeper.connection.timeout.ms=1000000  
  19. kafka.metrics.polling.interval.secs=5  
  20. kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
  21. kafka.csv.metrics.dir=/tmp/kafka_metrics  
  22. kafka.csv.metrics.reporter.enabled=false  

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

Java代码  收藏代码
  1. > cd kafka-0  
  2. > ./sbt update  
  3. > ./sbt package  
  4. > ./sbt assembly-package-dependency   

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

Java代码  收藏代码
  1. > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

Java代码  收藏代码
  1. broker.id=1  
  2. port=9093  
  3. ##其他配置和kafka-0保持一致  

    然后和kafka-0一样执行打包命令,然后启动此broker.

Java代码  收藏代码
  1. > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

Java代码  收藏代码
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.     <modelVersion>4.0.0</modelVersion>  
  4.     <groupId>com.test</groupId>  
  5.     <artifactId>test-kafka</artifactId>  
  6.     <packaging>jar</packaging>  
  7.   
  8.     <name>test-kafka</name>  
  9.     <url>http://maven.apache.org</url>  
  10.     <version>1.0.0</version>  
  11.     <dependencies>  
  12.         <dependency>  
  13.             <groupId>log4j</groupId>  
  14.             <artifactId>log4j</artifactId>  
  15.             <version>1.2.14</version>  
  16.         </dependency>  
  17.         <dependency>  
  18.             <groupId>org.apache.kafka</groupId>  
  19.             <artifactId>kafka_2.8.0</artifactId>  
  20.             <version>0.8.0-beta1</version>  
  21.             <exclusions>  
  22.                 <exclusion>  
  23.                     <groupId>log4j</groupId>  
  24.                     <artifactId>log4j</artifactId>  
  25.                 </exclusion>  
  26.             </exclusions>  
  27.         </dependency>  
  28.         <dependency>  
  29.             <groupId>org.scala-lang</groupId>  
  30.             <artifactId>scala-library</artifactId>  
  31.             <version>2.8.1</version>  
  32.         </dependency>  
  33.         <dependency>  
  34.             <groupId>com.yammer.metrics</groupId>  
  35.             <artifactId>metrics-core</artifactId>  
  36.             <version>2.2.0</version>  
  37.         </dependency>  
  38.         <dependency>  
  39.             <groupId>com.101tec</groupId>  
  40.             <artifactId>zkclient</artifactId>  
  41.             <version>0.3</version>  
  42.         </dependency>  
  43.     </dependencies>  
  44.     <build>  
  45.         <finalName>test-kafka-1.0</finalName>  
  46.         <resources>  
  47.             <resource>  
  48.                 <directory>src/main/resources</directory>  
  49.                 <filtering>true</filtering>  
  50.             </resource>  
  51.         </resources>  
  52.         <plugins>  
  53.             <plugin>  
  54.                 <artifactId>maven-compiler-plugin</artifactId>  
  55.                 <version>2.3.2</version>  
  56.                 <configuration>  
  57.                     <source>1.5</source>  
  58.                     <target>1.5</target>  
  59.                     <encoding>gb2312</encoding>  
  60.                 </configuration>  
  61.             </plugin>  
  62.             <plugin>  
  63.                 <artifactId>maven-resources-plugin</artifactId>  
  64.                 <version>2.2</version>  
  65.                 <configuration>  
  66.                     <encoding>gbk</encoding>  
  67.                 </configuration>  
  68.             </plugin>  
  69.         </plugins>  
  70.     </build>  
  71. </project>  

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

Java代码  收藏代码
  1. #partitioner.class=  
  2. metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
  3. ##,127.0.0.1:9093  
  4. producer.type=sync  
  5. compression.codec=0  
  6. serializer.class=kafka.serializer.StringEncoder  
  7. ##在producer.type=async时有效  
  8. #batch.num.messages=100  

    2) LogProducer.java代码样例

Java代码  收藏代码
  1. package com.test.kafka;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.Collection;  
  5. import java.util.List;  
  6. import java.util.Properties;  
  7.   
  8. import kafka.javaapi.producer.Producer;  
  9. import kafka.producer.KeyedMessage;  
  10. import kafka.producer.ProducerConfig;  
  11. public class LogProducer {  
  12.   
  13.     private Producer<String,String> inner;  
  14.     public LogProducer() throws Exception{  
  15.         Properties properties = new Properties();  
  16.         properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
  17.         ProducerConfig config = new ProducerConfig(properties);  
  18.         inner = new Producer<String, String>(config);  
  19.     }  
  20.   
  21.       
  22.     public void send(String topicName,String message) {  
  23.         if(topicName == null || message == null){  
  24.             return;  
  25.         }  
  26.         KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
  27.         inner.send(km);  
  28.     }  
  29.       
  30.     public void send(String topicName,Collection<String> messages) {  
  31.         if(topicName == null || messages == null){  
  32.             return;  
  33.         }  
  34.         if(messages.isEmpty()){  
  35.             return;  
  36.         }  
  37.         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
  38.         for(String entry : messages){  
  39.             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
  40.             kms.add(km);  
  41.         }  
  42.         inner.send(kms);  
  43.     }  
  44.       
  45.     public void close(){  
  46.         inner.close();  
  47.     }  
  48.       
  49.     /** 
  50.      * @param args 
  51.      */  
  52.     public static void main(String[] args) {  
  53.         LogProducer producer = null;  
  54.         try{  
  55.             producer = new LogProducer();  
  56.             int i=0;  
  57.             while(true){  
  58.                 producer.send("test-topic""this is a sample" + i);  
  59.                 i++;  
  60.                 Thread.sleep(2000);  
  61.             }  
  62.         }catch(Exception e){  
  63.             e.printStackTrace();  
  64.         }finally{  
  65.             if(producer != null){  
  66.                 producer.close();  
  67.             }  
  68.         }  
  69.   
  70.     }  
  71.   
  72. }  

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

Java代码  收藏代码
  1. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  2. ##,127.0.0.1:2182,127.0.0.1:2183  
  3. # timeout in ms for connecting to zookeeper  
  4. zookeeper.connectiontimeout.ms=1000000  
  5. #consumer group id  
  6. group.id=test-group  
  7. #consumer timeout  
  8. #consumer.timeout.ms=5000  
  9. auto.commit.enable=true  
  10. auto.commit.interval.ms=60000  

    2) LogConsumer.java代码样例

Java代码  收藏代码
  1. package com.test.kafka;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.List;  
  5. import java.util.Map;  
  6. import java.util.Properties;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9.   
  10. import kafka.consumer.Consumer;  
  11. import kafka.consumer.ConsumerConfig;  
  12. import kafka.consumer.ConsumerIterator;  
  13. import kafka.consumer.KafkaStream;  
  14. import kafka.javaapi.consumer.ConsumerConnector;  
  15. import kafka.message.MessageAndMetadata;  
  16. public class LogConsumer {  
  17.   
  18.     private ConsumerConfig config;  
  19.     private String topic;  
  20.     private int partitionsNum;  
  21.     private MessageExecutor executor;  
  22.     private ConsumerConnector connector;  
  23.     private ExecutorService threadPool;  
  24.     public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{  
  25.         Properties properties = new Properties();  
  26.         properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));  
  27.         config = new ConsumerConfig(properties);  
  28.         this.topic = topic;  
  29.         this.partitionsNum = partitionsNum;  
  30.         this.executor = executor;  
  31.     }  
  32.       
  33.     public void start() throws Exception{  
  34.         connector = Consumer.createJavaConsumerConnector(config);  
  35.         Map<String,Integer> topics = new HashMap<String,Integer>();  
  36.         topics.put(topic, partitionsNum);  
  37.         Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
  38.         List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
  39.         threadPool = Executors.newFixedThreadPool(partitionsNum);  
  40.         for(KafkaStream<byte[], byte[]> partition : partitions){  
  41.             threadPool.execute(new MessageRunner(partition));  
  42.         }   
  43.     }  
  44.   
  45.           
  46.     public void close(){  
  47.         try{  
  48.             threadPool.shutdownNow();  
  49.         }catch(Exception e){  
  50.             //  
  51.         }finally{  
  52.             connector.shutdown();  
  53.         }  
  54.           
  55.     }  
  56.       
  57.     class MessageRunner implements Runnable{  
  58.         private KafkaStream<byte[], byte[]> partition;  
  59.           
  60.         MessageRunner(KafkaStream<byte[], byte[]> partition) {  
  61.             this.partition = partition;  
  62.         }  
  63.           
  64.         public void run(){  
  65.             ConsumerIterator<byte[], byte[]> it = partition.iterator();  
  66.             while(it.hasNext()){  
  67.                                 //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
  68.                 MessageAndMetadata<byte[],byte[]> item = it.next();  
  69.                 System.out.println("partiton:" + item.partition());  
  70.                 System.out.println("offset:" + item.offset());  
  71.                 executor.execute(new String(item.message()));//UTF-8,注意异常  
  72.             }  
  73.         }  
  74.     }  
  75.       
  76.     interface MessageExecutor {  
  77.           
  78.         public void execute(String message);  
  79.     }  
  80.       
  81.     /** 
  82.      * @param args 
  83.      */  
  84.     public static void main(String[] args) {  
  85.         LogConsumer consumer = null;  
  86.         try{  
  87.             MessageExecutor executor = new MessageExecutor() {  
  88.                   
  89.                 public void execute(String message) {  
  90.                     System.out.println(message);  
  91.                       
  92.                 }  
  93.             };  
  94.             consumer = new LogConsumer("test-topic"2, executor);  
  95.             consumer.start();  
  96.         }catch(Exception e){  
  97.             e.printStackTrace();  
  98.         }finally{  
  99. //          if(consumer != null){  
  100. //              consumer.close();  
  101. //          }  
  102.         }  
  103.   
  104.     }  
  105.   
  106. }  

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

 

分享到:
评论
3 楼 西巴拉古呀那 2018-02-21  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
网盘地址:https://pan.baidu.com/s/1nwwhpP3 密码: mxu6
网盘地址:https://pan.baidu.com/s/1mjM5HaC 密码: xa5s
2 楼 kafodaote 2018-01-21  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
网盘地址:https://pan.baidu.com/s/1c3JymAk 密码: dnky
网盘地址:https://pan.baidu.com/s/1eTV5ygU 密码: 3g3v
1 楼 成大大的 2017-10-18  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
——https://pan.baidu.com/s/1cAm9AI 密码: 7fvt

内容简介
Kafka是分布式的消息队列,作为云计算服务的基石,它广泛的应用在实时数据流方面,是实时数据处理的数据中枢,广泛应用在很多互联网企业,例如:linkedin,facebook,腾讯,百度,阿里等。实时数据流是现在互联网公司、甚至拥有大规模数据的传统企业的主要模式, 实时数据(Real-time Activity Data)就是那些非交易,不需要秒级响应的数据, 但在后续的分析中产生极大作用,例如个性化推荐、运营服务监控、精细化营销、报表等 。

本课程的目的在于系统性地介绍Kafka分布式消息系统,掌握了Kafka,你就拿到了大数据处理领域消息处理机制的钥匙,能够轻松上手开发分布式消息系统应用程序开发和维护,笑傲大数据处理技术。学完本课程,你可以掌握:

1.Kafka的部署方式
2.Kafka的原理
3.Kafka与其他大数据组件的集成
4.基于Kafka的程序开发



第一章 Kafka的基本介绍
   1.1 什么是消息系统
   1.2 消息队列的分类
   1.3 Kafka的基本架构和概念
   1.4 ZooKeeper简介和安装

第二章 Kafka的原理解析
   2.1 Kafka的Producer处理逻辑
   2.2 Kafka的broker处理逻辑
   2.3 Kafka的Consumer处理逻辑
   2.4 Kafka集群部署在zk里的存储结构

第三章 Kafka的部署方式
   3.1 Kafka伪集群部署模式
   3.2 Kafka集群部署模式
   3.3 核心配置文件server.properties

第四章 Kafka的Java应用开发
   4.1 Producer端的实现
   4.2 Consumer端的实现

第五章 Kafka的Scala应用开发
   5.1 Scala的Producer的实现
   5.2 Scala的Producer的实现

第六章 Kafka与Hadoop的集成
   6.1 Hadoop简介和配置
   6.2 集成Kafka和Hadoop

第七章 Kafka与Flume的集成
   7.1 Flume简介和使用
   7.2 集成Kafka和Flume

第八章 Kafka与Storm的集成
   8.1 Storm的运行机制和部署
   8.2 Storm编程案例
   8.3 集成Kafka和Storm

相关推荐

    基于分布式的发布订阅消息系统Kafka

    Kafka是大数据平台中数据缓存的中间件,资源里有一篇关于Kafka原作者的一篇论文,讲解的很详细。也有两份介绍Kafka使用的PPT,还有Kafka的源代码。

    浅谈分布式消息技术:Kafka.docx

    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...

    kafka介绍及部署

    Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据和运营数据,同时也是支持通用的消息语义(messaging semantics)。其中活跃的流式数据包括页面访问量(page view)、被查看内容方面的...

    kafka原理介绍及参数.pptx

    消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 顺序保证  在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证...

    kafka.pdf 介绍 为何使用消息系统

    1.1. 为何使用消息系统 1.1.1. 解耦 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立...

    深入剖析Kafka设计原理:如何构建高效的消息系统

    本文详细解析了Kafka的设计原理,重点介绍了Kafka作为一种高效的分布式消息系统的核心组件和机制。首先,文档解释了Kafka的总控制器(Controller)的作用,它负责管理集群中的分区和副本状态,并在必要时进行Leader...

    kafka.mmap

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量...

    kafka深度分析

    介绍Kafka背景,使用消息系统的优势,常用消息系统对比,Kafka架构介绍,Kafka实现语义分析,Replication及Leader Election机制剖析,Consumer Group Rebalance实现原理介绍,以及Benchmark测试。

    Kafka入门教程:快速掌握消息队列的核心技术!

    kafka介绍:Kafka知识领域+消息队列技术关键词+安装、配置、使用、主题、分区、副本、消费者组、流处理、事件源存储、性能优化、安全性、集群部署内容关键词+构建实时数据管道、解耦服务、可靠消息传递、数据可靠性...

    kafka设计思想

    kafka消息系统的设计介绍,转载对kafka系统设计的翻译。

    46讲全-Kafka核心技术与实战.zip

    Kafka 入门、Kafka 的基本使用、客户端详解、Kafka 原理介绍、Kafka 运维与监控以及高级 Kafka 应用,在实际业务系统中实现消息队列应用、应用程序集成、分布式存储构建,甚至是流处理应用的开发与部署。

    kafka的使用场景.docx

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,...

    如何在 Spring Boot 3.X 中使用 Kafka 实现消息传递功能?

    本文介绍如何在 Spring Boot 3.X 中利用 Kafka 实现高效的消息传递功能。Kafka 是一个分布式流处理平台,适用于实时数据流处理、日志收集与分析、事件驱动等场景。通过集成 Kafka,我们可以实现可靠的异步消息传递,...

    Kafka设计解析(一)-Kafka背景及架构介绍

    本文介绍了Kafka的创建背景,设计目标,使用消息系统的优势以及目前流行的消息系统对比。并介绍了Kafka的架构,Producer消息路由,ConsumerGroup以及由其实现的不同消息分发方式,Topic&Partition,最后介绍了Kafka...

    如何在 Spring Boot 3.X 中使用 Kafka 实现消息传递功能?(3)

    本文介绍如何在 Spring Boot 3.X 中利用 Kafka 实现高效的消息传递功能。Kafka 是一个分布式流处理平台,适用于实时数据流处理、日志收集与分析、事件驱动等场景。通过集成 Kafka,我们可以实现可靠的异步消息传递,...

    如何在 Spring Boot 3.X 中使用 Kafka 实现消息传递功能?(4)

    本文介绍如何在 Spring Boot 3.X 中利用 Kafka 实现高效的消息传递功能。Kafka 是一个分布式流处理平台,适用于实时数据流处理、日志收集与分析、事件驱动等场景。通过集成 Kafka,我们可以实现可靠的异步消息传递,...

    如何在 Spring Boot 3.X 中使用 Kafka 实现消息传递功能?(2)

    本文介绍如何在 Spring Boot 3.X 中利用 Kafka 实现高效的消息传递功能。Kafka 是一个分布式流处理平台,适用于实时数据流处理、日志收集与分析、事件驱动等场景。通过集成 Kafka,我们可以实现可靠的异步消息传递,...

    Apache Kafka实战.pdf--有新特性的介绍-强烈推荐

    《Apache Kafka实战》共分为10章:第1章全面介绍消息引擎系统以及Kafka的基本概念与特性,快速带领读者走进Kafka的世界;第2章简要回顾了Apache Kafka的发展历史;第3章详细介绍了Kafka集群环境的搭建;第4、5章深入...

    Kafka快速实战与基本原理详解:从零到精通

    本文档提供了对Kafka这一分布式消息系统的全面解析,从基本概念到实际应用,涵盖了其在日志收集、消息系统、用户活动跟踪等方面的使用场景。首先介绍了Kafka的核心概念,如Broker、Topic、Producer、Consumer等,...

    kafka的安装和简单实例测试

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。文档介绍...

Global site tag (gtag.js) - Google Analytics