Kafka源码系列之kafka如何实现高性能读写的
Kafka源码系列之kafka如何实现高性能读写的
浪尖 浪尖聊大数据
本文依然是以kafka 0.8.2.2的源码为例进行讲解。
一,kafka高性能的原因
Kafka吞吐量是大家公认的高,那么这是为什么呢?个人总结为以下三点:
1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。
2,磁盘顺序写。
3,零拷贝。
本文主要是讲解,磁盘顺序写和零拷贝。关于Broker的消息处理体系,请大家阅读我的另一篇文章<Kafka源码系列之Broker的IO服务及业务处理>。
二,本文牵涉到的类
由于本文要从生产者生产消息到消费者消费到消息整个流水讲起,那么会导致源码量比较大,这里只会截取部分关键步骤的重要代码,不会全部贴出,影响大家阅读效果。
1,ByteBufferMessageSet
代表着存储在byte buffer的一系列的消息。有两种创建的方式
方式一,使用一个已经包含了序列化好的Message的ByteBuffer。消费者使用的是这种方式。消费者端代码
new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
方式二,给它一个消息列表,以及关于序列化的指令。生产者会使用这种方式。
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
2,FileMessageSet
这个是在SimpleConsumer请求消息的时候封装的,然后通过自身的writeTo方法,将数据高效的传输给SimpleConusmer。在SimpleConsumer端,接收后又封装成了ByteBufferMessageSet。
主要作用,表示带磁盘上的消息集合,可以指定起始位置,并且允许对文件的子集进行切片。
3,LogSegment
表示一段日志。每段日志有两个元素:日志文件和索引。数据就是FileMessageSet的包含的实际消息。索引文件就是从逻辑偏移到物理文件位置的映射。
4,Partition
代表一个topic的分区。Leader分区管理着:AR,ISR,CUR,RAR.
AR: replicas assigned to this partition
ISR:In-sync replica set, maintained at the leader
CUR:Catch-up replica set, maintained at the leader
RAR:being reassigned to other brokers., maintained at the leader
数据追加和读写都会有leader 分区获取,交由Replica进行操作。
这里会有个关系是:AR=ISR+CUR
5,Log
仅仅支持追加的方式存储消息,代表了一些列的LogSegment的集合,每个都包含一个基准偏移,代表在segment中的第一个消息。根据我们的指定的策略创建新的logsegments,比如按照大小或者时间周期。
6,LogOffsetMetadata
这表示一个日志偏移的数据结构,包含以下几个部分:
A),消息偏移;B),定位的segment的起始偏移;C),在磁盘上的物理位置。
7,Replica
主要是代表着一个topic的分区的副本。主要是维护一个log对象,用于数据读写,和log的结束偏移等信息。由我们的Partition对象获取和维护。
8,ReplicaManager
负责管理当前机器的所有副本,处理读写具体动作。
读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。
三,kafka存储的结构及读取过程
1,topic的文件树
每个topic有n个partition,每个partition有n个segmentfile组成。一个segment有.index和.log组成
2,存储目录
Partition的位置由broker参数log.dirs=/tmp/kafka-logs,可以使目录列表,逗号分开。
3,存储时间及大小
broker 级别的日志文件的存储时间控制:
log.retention.hours=168
log.retention.minutes=168
log.retention.ms=168
更高优先级topic 级别
--config retention.ms=?
一个segmentfile大小的控制
log.segment.bytes=1073741824
--config segment.bytes=?
4,kafka Segment读取
假如消息偏移为4255133,先通过二分查找找到segmentfile的00000000000004255130.index,然后计算在该文件的第几个位置n=4255133-(4255130+1)=2,获取偏移1220,然后去00000000000004255130.log取出偏移为1220的msg4255132,结束。
四,kafka源码接触
本处总共分四个部分:
a,Producer压缩块发送消息
b,Broker顺序写入
c,Broker zero-copy 数据给SimpleConsumer
d,消费者,Broker消息大小的限制。
1,Producer压缩块发送消息
在<Kafka源码系列之通过源码分析Producer性能瓶颈>中提到过,producer会为每个Broker创建一个链接,然后使用该链接将数据发出。那么为了Broker端能将消息进行准确到识别并写入各个topic的分区副本文件中,producer在生产消息的时候就需要对消息进行分类:根据key和topic,按照topicAndpartition进行分类。
源代码是在DefaultEventHandler的partitionAndCollate进行归类。
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = { val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] try { for (message <- messages) { //返回值是指定topic的所有分区在broker分布 (brokerId, numPartitions) val topicPartitionsList = getPartitionListForTopic(message) //假如指定了分区方法和key,会调用我们的分区方法,获取key指定的分区 val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList) //获取去指定分区的PartitionAndLeader val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null //[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] // 获取数组[TopicAndPartition, Seq[KeyedMessage[K,Message]]]],不存在的话创建 ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] case None => dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId) var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null //取出[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]中的消息数组Seq[KeyedMessage[K,Message]]],将消息加入 dataPerBroker.get(topicAndPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]] case None => dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]] dataPerBroker.put(topicAndPartition, dataPerTopicPartition) } dataPerTopicPartition.append(message) } Some(ret)
归类结束之后,会针对每个分区的消息封装成ByteBufferMessageSet进行压缩序列化是在groupMessagesToSet方法中
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) => val rawMessages = messages.map(_.message) ( topicAndPartition, config.compressionCodec match { case NoCompressionCodec => debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) case _ => config.compressedTopics.size match { case 0 => debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) case _ => if(config.compressedTopics.contains(topicAndPartition.topic)) { debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) } else { debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" .format(messages.size, topicAndPartition, config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) } } } )}messagesPerTopicPartition
目前支持的压缩方式是
compressionCodec match { case DefaultCompressionCodec => new GZIPOutputStream(stream) case GZIPCompressionCodec => new GZIPOutputStream(stream) case SnappyCompressionCodec => import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case LZ4CompressionCodec => new KafkaLZ4BlockOutputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)}
2,Broker端的顺序写入
此处,代码量相当大,我们只会捡重要的函数进行讲解。
首先是Broker接受到消息后,会执行kafkaApis的.
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
下面是会针对不同topic的不同分区进行消息区分写入
partitionAndData.map {case (topicAndPartition, messages) => try { if (Topic.InternalTopics.contains(topicAndPartition.topic) && !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) { throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)) } //主要是获取到分区 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => //开始追加到日志 partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)
其中,appendMessagesToLeader方法中会通过Partition取出replica对象,调用其内部log的append方法
val leaderReplicaOpt = leaderReplicaIfLocal()leaderReplicaOpt match { case Some(leaderReplica) => val log = leaderReplica.log.getval info = log.append(messages, assignOffsets = true)
剩下的就是在append方法中,首先验证消息的有效性,比如单条消息是不是超过了Broker所接受的消息的大小等。
val appendInfo = analyzeAndValidateMessageSet(messages)然后,给消息分配偏移val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try {// 给我们的数据块赋值偏移 validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
最后,判断Segment是否需要roll up,消息正式追加写入和最大偏移更新
// maybe roll the log if this segment is fullval segment = maybeRoll(validMessages.sizeInBytes)// now append to the log 并未拆解 初始偏移和消息块segment.append(appendInfo.firstOffset, validMessages)// increment the log end offset //更新修大偏移updateLogEndOffset(appendInfo.lastOffset + 1)
通过FileMessageSet顺序追加到Segment文件中,是对整个Messageset进行写入的
def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written)}
3,Broker zero-copy 数据给SimpleConsumer
Broker接收到数据请求后,执行KafkaApis的
case RequestKeys.FetchKey => handleFetchRequest(request)val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]val dataRead = replicaManager.readMessageSets(fetchRequest)
在readMessageSets方法中
//读取指定topic 分区 offset 最大获取大小 MessageSetval (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
获取副本replica对象,获取log的最大偏移,然后获取log对象,调用其read方法
val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) getReplicaOrException(topic, partition)else getLeaderReplicaIfLocal(topic, partition)trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))val maxOffsetOpt = if (Request.isValidBrokerId(fromReplicaId)) None else Some(localReplica.highWatermark.messageOffset) //最大消息偏移val fetchInfo = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt)
根据其实offset找到LogSegment。调用其read方法读取消息,根据最大偏移,起始偏移,和读取的最大bytes。
var entry = segments.floorEntry(startOffset)val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
计算起始偏移和最大偏移对应的实际物理position
val logSize = log.sizeInBytes // this may change, need to save a consistent copyval startPosition = translateOffset(startOffset) //根据offset找到物理存储的偏移的位置// if the start position is already off the end of the log, return nullif(startPosition == null) return nullval offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)// if the size is zero, still return a log segment but with zero sizeif(maxSize == 0) return FetchDataInfo(offsetMetadata, MessageSet.Empty)// calculate the length of the message set to read based on whether or not they gave us a maxOffsetval length = maxOffset match { case None => // no max offset, just use the max size they gave unmolested 无麻烦的 maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset, startPosition.position) val endPosition = if(mapping == null) //说明消息最大偏移超过了当前块,所以最大偏移就是当前块大小 logSize // the max offset is off the end of the log, use the end of the file else mapping.position //否则就是最大偏移 min(endPosition - startPosition.position, maxSize) //文件剩余数据的大小和最大索取大小哪个小,取最小的 } }FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
最终调用FileMessageSet的read方法,根据我们提供的信息封装成了
new FileMessageSet(file, channel, start = this.start + position, end = math.min(this.start + position + size, sizeInBytes()))
至此,都是在讲我们Broker 在接收到数据读取请求后的处理业务线程。他处理结束之后会在handleFetchRequest方法中
val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
zero-copy发生在socketServer回复SimpleConsumer的过程中
Processor的write方法中
val response = key.attachment().asInstanceOf[RequestChannel.Response]val responseSend = response.responseSendif(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.")val written = responseSend.writeTo(socketChannel)
下面,嵌套比较深,这里只列出了函数名称
FetchResponseSend.writeTo
MultiSend.writeTo
TopicDataSend.writeTo
MultiSend.writeTo
PartitionDataSend.writeTo
FileMessageSet.writeTo
此时我们就可以看到我们久违transferTo了。这就是整个zero-copy的过程
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = { // Ensure that the underlying size has not changed. val newSize = math.min(channel.size().toInt, end) - start if (newSize < _size.get()) { throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d" .format(file.getAbsolutePath, _size.get(), newSize)) } val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred + " bytes requested for transfer : " + math.min(size, sizeInBytes)) bytesTransferred}
4,消费者,Broker,消息大小的限制。
Broker接收到Producer消息后会对消息的有效性进行验证,是在Log的append方法中调用了
val appendInfo = analyzeAndValidateMessageSet(messages)
在该方法内部,进行了大小的验证
// Check if the message sizes are valid.val messageSize = MessageSet.entrySize(m) //单个消息的大小if(messageSize > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(messageSize, config.maxMessageSize))}
Broker端接受的最大消息大小的配置为
消费者,对消息尺寸进行验证。看过去前面的文章应该知道,SimpleConsumer获取消息后最终是由kafkaStream(ConsumerIterator),将消息从对列里取出使用的。实际是在ConsumerIterator的makeNext方法中进行消息有效性验证
// if we just updated the current chunk and it is empty that means the fetch size is too small!if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
不满足要求就会抛出一个异常。关键就是第一次迭代取消息的时候,假如消息消费者消费时指定的最大消息大小,小于生产者生产的,此时会导致消息获取不完整,然后进一步导致,shallowValidBytes返回值为零,以此来判断消息是否超过消费者所能消费消息大小,然后抛出相关异常。
private def shallowValidBytes: Int = { if(shallowValidByteCount < 0) { var bytes = 0 val iter = this.internalIterator(true) while(iter.hasNext) { val messageAndOffset = iter.next bytes += MessageSet.entrySize(messageAndOffset.message) }// bytes=0,代表消息不完整,当时第一次迭代就读取的消息不完整的话就会出现bytes为零,这时就需要调整消费者的消息大小了。 this.shallowValidByteCount = bytes } shallowValidByteCount }
消费者所能获取的最大消息大小的配置
五,总结
本节主要目的是分析kafka的磁盘顺序写和zero-copy源码。这个也是kafka只所以高效的最关键步骤,在这里浪尖给出了一下使用时总结。
1,生产者生产消息时,采用异步
异步,并发
消息块方式减少网络io请求次数
可以更加好的利用Broker端的磁盘顺序写
2,生产者生产消息是指定压缩
节省网络传输带宽
配置方式
3,注意消息大小限制
结合自己的消息类型特点,进行相关配置。
最后赘述一个kafka高性能的原因:
1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。
2,磁盘顺序写。
3,零拷贝。
更多相关文章
- Kafka原理详解
- FlinkSQL演进过程,解析原理及一些优化策略
- 一文读懂数据湖及企业中的架构特点
- kafka|使用Interceptors实现消息端到端跟踪
- 干货--部署RocketMQ
- 消息处理
- Kafka评传——从kafka的消息生命周期引出的沉思
- kafka架构
- Bootstrap 学习 - 网格系统