Kafka 介绍与实践

Hadoop,Kafka,技术帖,BigData 2017-11-14

一.Kafka基础

1.1 实践内容

本文将介绍 Kafka 及实现原理,然后完整搭建,demo演示。

1.2 项目来源

参考资料: http://kafka.apache.org/documentation.html

1.3. 实践知识点

生产者/消费者模型
单机/集群的区别
设计原理

1.4 实验环境

Hadoop 2.6.1
kafka_2.10-0.8.1.1
Xfce 终端

1.5 适合人群

本文属于中等难度级别,适合具有 hadoop 基础的用户,如果对分布式文件系统了解能够更好的上手。

二、kafka 入门

2.1 简介

Kafka 是 linkedin 用于日志处理的分布式消息队列,同时支持离线和在线日志处理。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer,消息接受者成为 Consumer,此外 kafka 集群由多个 kafka 实例组成,每个实例 (server) 称为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 meta 信息。

此处输入图片的描述

2.2 Kafka应用场景

​ 1.日志收集

日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume。很多人使用 Kafka 代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。然而 Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让 Kafka 处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka 提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

​ 2.行为跟踪

Kafka 的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的 topic 里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop 离线数据仓库里处理。

​ 3.持久性日志(commit log)

Kafka 可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka 中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。

2.3 Kafka基本概念

Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。

Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。

Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。

Producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。

Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。

Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。

三、设计原理

Kafka 的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。

3.1 Kafka 的 Topics/Log

一个Topic 可以认为是一类消息,每个 topic 将被分成多 partition (区),每个partition在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称 offset(偏移量),partition 是以文件的形式存储在文件系统中。Logs 文件根据 broker 中的配置要求,保留一定时间后删除来释放磁盘空间。

Partition:

Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。 partition 中的每条消息都会被分配一个有序的 id(offset)。

此处输入图片的描述

3.2 Kafka的储存策略

kafka以 topic 来进行消息管理,每个 topic 包含多个 partition,每个 part 对应一个逻辑 log,有多个 segment 组成。

每个 segment 中存储多条消息(见下图),消息 id 由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

broker 收到发布消息往对应 partition 的最后一个 segment 上添加该消息。

每个 part 在内存中对应一个index,记录每个 segment 中的第一条消息偏移。

发布者发到某个 topic 的 消息会被均匀的分布到多个 part 上(随机或根据用户指定的回调函数进行分布),broker 收到发布消息往对应 part 的最后一个 segment 上添加 该消息,当某个 segment 上的消息条数达到配置值或消息发布时间超过阈值时,segment 上的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment 写数据,broker 会创建新的 segment。

此处输入图片的描述

3.3 Kafka的消息发送的流程

由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull 的方式消费数据 Producer 向 Kafka(push)推数据,consumer 从 kafka 拉(pull)数据。

此处输入图片的描述

3.4 Kafka 的 Zookeeper 协调控制

管理 broker 与 consumer 的动态加入与离开。
触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的订阅负载平衡。
维护消费关系及每个 partion 的消费信息。
Zookeeper上的细节:

每个 broker 启动后会在 zookeeper 上注册一个临时的 broker registry,包含 broker 的 ip 地址和端口号,所存储的 topics 和 partitions 信息。

每个 consumer 启动后会在 zookeeper 上注册一个临时的 consumer registry:包含 consumer 所属的consumer group 以及订阅的 topics。

每个 consumer group 关 联一个临时的 owner registry 和一个持久的 offset registry。对于被订阅的每个 partition 包含一个 owner registry,内容为订阅这个 partition 的 consumer id;同时包含一个 offset registry,内容为上一次订阅的 offset。

四、安装部署

集群方式:单节点单 broker,单节点多 broker,多节点多 broker。

4.1 准备工作

下载并配置启动 hadoop-2.6.1 所需的文件,成功安装后可以在 /opt 找到,只需格式化并启动 hadoop 进程即可。

双击打开桌面上的 Xfce 终端,用 sudo 命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用 cd 命令进入 /opt 目录。

    $ su hadoop
    $ cd /opt/

在 /opt 目录下格式化 hadoop。

$ hadoop-2.6.1/bin/hdfs namenode -format

在 /opt 目录下启动 hadoop 进程。

$ hadoop-2.6.1/sbin/start-all.sh

用 jps 查看 hadoop 进程是否启动。

4.2 下载 kafka 及解压

你可以通过下面命令将 Kafka 下载到实验楼环境中,作为参照对比进行学习。

$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.8.1.1.tgz

在 /opt 目录下,用 tar 命令解压下载的 Kafka。

4.3 单节点单 broker

Kafka 用到了 Zookeeper,所以首先启动 Zookeeper,下面简单的启用一个单实例的 Zookeeper 服务。可以在命令的结尾加个 & 符号,这样就可以启动后离开控制台。

注意:kafka 自带了 zookeeper,为了方便,这里我们直接使用,当然也可以使用自己下载的 zookeeper*.tar.gz

    权限不足,授权
    hadoop@945f39ae074b:/opt$ sudo chmod 777 -R kafka_2.10-0.8.1.1
    启动自带的zookeeper
    hadoop@945f39ae074b:/home/shiyanlou$ cd /opt/kafka_2.10-0.8.1.1/
    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  
        bin/zookeeper-server-start.sh   config/zookeeper.properties &

查看进程:QuorumPeerMain
启动 kafka 服务器。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  
    bin/kafka-server-start.sh  config/server.properties      >/dev/null 2>&1 &
    查看 kafka 进程:Kafka

创建一个叫做“test”的 topic,它只有一个分区,一个副本。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic     
    test

可以通过 list 命令查看创建的 topic: test

查看一个 topic 的分区及副本状态信息。

    Topic:test  PartitionCount:1  ReplicationFactor:1
    Configs:Topic:test  Partition:0  Leader:0  Replicas:0  Isr:0

启动 producer:运行 producer 并在控制台中输一些消息,这些消息将被发送到服务端。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list 
    localhost:9092 --topic test

启动 consumer:接着再开启一个终端,运行 consumer 可以读取消息并输出到标准输出。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 
    --topic test --from-beginning

此时就可以边生产边消费啦。

4.4 单节点多 broker

4.3节只是启动了单个 broker,现在启动由3个 broker 组成的集群,这些 broker 节点也都是在本机上的,首先为每个节点编写配置文件。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ cp config/server.properties  config/server-1.properties 
    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ cp config/server.properties  config/server-2.properties 

注意: broker.id 在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。

在拷贝出的新文件中添加以下参数(两个配置文件对应位置修改):
config/server-1.properties:

    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:

    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2
    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ sudo vi config/server-1.properties
    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ sudo vi config/server-2.properties

刚才已经启动一个 Zookeeper 和一个节点,现在启动另外两个节点,只需要在开启的两个终端做同样的操作,如下:

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server-1.properties &
    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server-2.properties &

此时应该有3个Kafka的进程出现。

创建一个拥有3个副本的 topic:

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$bin/kafka-topics.sh --create --zookeeper localhost:2181 --
    replication-factor 3 --partitions 1 --topic test2

单节点多 broker 搭建完毕,那么现在怎么知道每个节点的信息呢?运行“describe topics”命令就可以。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$bin/kafka-topics.sh --describe --zookeeper localhost:2181 
    --topic test2

    Topic:test2  PartitionCount:1  ReplicationFactor:1
    Configs:Topic:test2  Partition:0  Leader:1  Replicas:1,0,2  Isr:1,0,2

下面解释一下这些输出,第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。

leader:负责处理消息的读和写,leader 是从所有节点中随机选择的。

replicas:列出了所有的副本节点,不管节点是否在服务中。

isr:是正在服务中的节点。

在我们的例子中,节点2是作为 leader 运行。启动生产者向 topic 发送消息:

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list 
    localhost:9092 --topic test2

生产者生产消息。

启动消费者,接收消息。

    hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 
    --from-beginning --topic test2

消费者消费消息。

4.5 多节点 broker

介绍了上面两种配置方法,再理解集群配置就简单了,zookeeper 配置文件(zookeeper.properties)不变

broker 的配置配置文件(server.properties):按照单节点多实例配置方法在一个节点上启动一个或多个实例,不同的地方是 zookeeper 的连接串需要把所有节点的 zookeeper 都连接起来,然后复制 kafka 到从机,在所有节点启动 Kafka broker 即可。

可能遇到的问题:

    kafka.common.KafkaException: Fail to acquire lock on file .lock in /tmp/kafka-logs.A kafka instance in             
    another process or thread is using this directory.

解决办法:首先检查server.properties文件的broker.id及端口号是否唯一,

必要时用 kill 命令杀死 kafka 进程,重启 kafka 服务器:

kill -9 进程id

五、参考文献

http://kafka.apache.org/documentation.html。
http://blog.sina.com.cn/s/blog_5c51172c0102uxb0.html


本文由 BF 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

楼主残忍的关闭了评论

bst g22 jinniu lilai opebet orange88 vinbet xbet yuebo zunlong shijiebei bet007 hg0088 ju111 letiantang m88 mayaba qg777 qianyiguoji sbf777 tengbohui tlc ule weilianxier waiweitouzhu xingfayule xinhaotiandi yinheyule youfayule zhongying 2018shijiebei w88 18luck 188bet beplay manbet 12bet 95zz shenbo weide1946 ca88 88bifa aomenxinpujing betway bodog bt365 bwin tongbao vwin weinisiren 88jt fenghuangyule hongyunguoji 918botiantang huanyayule jianada28 jixiangfang libo long8 hongzuyishi zuqiutouzhu