Kafka源码系列之kafka如何实现高性能读写的

浪尖 浪尖聊大数据

本文依然是以kafka 0.8.2.2的源码为例进行讲解。

一,kafka高性能的原因

Kafka吞吐量是大家公认的高,那么这是为什么呢?个人总结为以下三点:
1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。
2,磁盘顺序写。
3,零拷贝。
本文主要是讲解,磁盘顺序写和零拷贝。关于Broker的消息处理体系,请大家阅读我的另一篇文章<Kafka源码系列之Broker的IO服务及业务处理>。

二,本文牵涉到的类

由于本文要从生产者生产消息到消费者消费到消息整个流水讲起,那么会导致源码量比较大,这里只会截取部分关键步骤的重要代码,不会全部贴出,影响大家阅读效果。

1,ByteBufferMessageSet

代表着存储在byte buffer的一系列的消息。有两种创建的方式
方式一,使用一个已经包含了序列化好的Message的ByteBuffer。消费者使用的是这种方式。消费者端代码

new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))

方式二,给它一个消息列表,以及关于序列化的指令。生产者会使用这种方式。

new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)

2,FileMessageSet

这个是在SimpleConsumer请求消息的时候封装的,然后通过自身的writeTo方法,将数据高效的传输给SimpleConusmer。在SimpleConsumer端,接收后又封装成了ByteBufferMessageSet。
主要作用,表示带磁盘上的消息集合,可以指定起始位置,并且允许对文件的子集进行切片。

3,LogSegment

表示一段日志。每段日志有两个元素:日志文件和索引。数据就是FileMessageSet的包含的实际消息。索引文件就是从逻辑偏移到物理文件位置的映射。

4,Partition

代表一个topic的分区。Leader分区管理着:AR,ISR,CUR,RAR.
AR: replicas assigned to this partition
ISR:In-sync replica set, maintained at the leader
CUR:Catch-up replica set, maintained at the leader
RAR:being reassigned to other brokers., maintained at the leader
数据追加和读写都会有leader 分区获取,交由Replica进行操作。
这里会有个关系是:AR=ISR+CUR

5,Log

仅仅支持追加的方式存储消息,代表了一些列的LogSegment的集合,每个都包含一个基准偏移,代表在segment中的第一个消息。根据我们的指定的策略创建新的logsegments,比如按照大小或者时间周期。

6,LogOffsetMetadata

这表示一个日志偏移的数据结构,包含以下几个部分:
A),消息偏移;B),定位的segment的起始偏移;C),在磁盘上的物理位置。

7,Replica

主要是代表着一个topic的分区的副本。主要是维护一个log对象,用于数据读写,和log的结束偏移等信息。由我们的Partition对象获取和维护。

8,ReplicaManager

负责管理当前机器的所有副本,处理读写具体动作。
读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

三,kafka存储的结构及读取过程

1,topic的文件树

每个topic有n个partition,每个partition有n个segmentfile组成。一个segment有.index和.log组成

2,存储目录

Partition的位置由broker参数log.dirs=/tmp/kafka-logs,可以使目录列表,逗号分开。

3,存储时间及大小

broker 级别的日志文件的存储时间控制:

log.retention.hours=168
log.retention.minutes=168
log.retention.ms=168
更高优先级topic 级别
--config retention.ms=?
一个segmentfile大小的控制
log.segment.bytes=1073741824
--config segment.bytes=?

4,kafka Segment读取

假如消息偏移为4255133,先通过二分查找找到segmentfile的00000000000004255130.index,然后计算在该文件的第几个位置n=4255133-(4255130+1)=2,获取偏移1220,然后去00000000000004255130.log取出偏移为1220的msg4255132,结束。

四,kafka源码接触

本处总共分四个部分:
a,Producer压缩块发送消息
b,Broker顺序写入
c,Broker zero-copy 数据给SimpleConsumer
d,消费者,Broker消息大小的限制。

1,Producer压缩块发送消息

在<Kafka源码系列之通过源码分析Producer性能瓶颈>中提到过,producer会为每个Broker创建一个链接,然后使用该链接将数据发出。那么为了Broker端能将消息进行准确到识别并写入各个topic的分区副本文件中,producer在生产消息的时候就需要对消息进行分类:根据key和topic,按照topicAndpartition进行分类。
源代码是在DefaultEventHandler的partitionAndCollate进行归类。

def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {  val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]    try {    for (message <- messages) {      //返回值是指定topic的所有分区在broker分布 (brokerId, numPartitions)      val topicPartitionsList = getPartitionListForTopic(message)      //假如指定了分区方法和key,会调用我们的分区方法,获取key指定的分区      val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)      //获取去指定分区的PartitionAndLeader      val brokerPartition = topicPartitionsList(partitionIndex)      // postpone the failure until the send operation, so that requests for other brokers are handled correctly      val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)      var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null      //[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]      // 获取数组[TopicAndPartition, Seq[KeyedMessage[K,Message]]]],不存在的话创建      ret.get(leaderBrokerId) match {        case Some(element) =>          dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]        case None =>          dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]          ret.put(leaderBrokerId, dataPerBroker)      }      val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)      var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null      //取出[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]中的消息数组Seq[KeyedMessage[K,Message]]],将消息加入      dataPerBroker.get(topicAndPartition) match {        case Some(element) =>          dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]        case None =>          dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]          dataPerBroker.put(topicAndPartition, dataPerTopicPartition)      }      dataPerTopicPartition.append(message)    }    Some(ret)

归类结束之后,会针对每个分区的消息封装成ByteBufferMessageSet进行压缩序列化是在groupMessagesToSet方法中

val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>  val rawMessages = messages.map(_.message)  ( topicAndPartition,    config.compressionCodec match {      case NoCompressionCodec =>        debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))        new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)      case _ =>        config.compressedTopics.size match {          case 0 =>            debug("Sending %d messages with compression codec %d to %s"              .format(messages.size, config.compressionCodec.codec, topicAndPartition))            new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)          case _ =>            if(config.compressedTopics.contains(topicAndPartition.topic)) {              debug("Sending %d messages with compression codec %d to %s"                .format(messages.size, config.compressionCodec.codec, topicAndPartition))              new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)            }            else {              debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"                .format(messages.size, topicAndPartition, config.compressedTopics.toString))              new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)            }        }    }    )}messagesPerTopicPartition

目前支持的压缩方式是

compressionCodec match {  case DefaultCompressionCodec => new GZIPOutputStream(stream)  case GZIPCompressionCodec => new GZIPOutputStream(stream)  case SnappyCompressionCodec =>     import org.xerial.snappy.SnappyOutputStream    new SnappyOutputStream(stream)  case LZ4CompressionCodec =>    new KafkaLZ4BlockOutputStream(stream)  case _ =>    throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)}

2,Broker端的顺序写入

此处,代码量相当大,我们只会捡重要的函数进行讲解。
首先是Broker接受到消息后,会执行kafkaApis的.

case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)

下面是会针对不同topic的不同分区进行消息区分写入

partitionAndData.map {case (topicAndPartition, messages) =>  try {    if (Topic.InternalTopics.contains(topicAndPartition.topic) &&        !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) {      throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))    }    //主要是获取到分区    val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)    val info = partitionOpt match {      case Some(partition) =>        //开始追加到日志        partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)

其中,appendMessagesToLeader方法中会通过Partition取出replica对象,调用其内部log的append方法

val leaderReplicaOpt = leaderReplicaIfLocal()leaderReplicaOpt match {  case Some(leaderReplica) =>    val log = leaderReplica.log.getval info = log.append(messages, assignOffsets = true)

剩下的就是在append方法中,首先验证消息的有效性,比如单条消息是不是超过了Broker所接受的消息的大小等。

val appendInfo = analyzeAndValidateMessageSet(messages)然后,给消息分配偏移val offset = new AtomicLong(nextOffsetMetadata.messageOffset)          try {//            给我们的数据块赋值偏移            validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

最后,判断Segment是否需要roll up,消息正式追加写入和最大偏移更新

// maybe roll the log if this segment is fullval segment = maybeRoll(validMessages.sizeInBytes)// now append to the log 并未拆解 初始偏移和消息块segment.append(appendInfo.firstOffset, validMessages)// increment the log end offset //更新修大偏移updateLogEndOffset(appendInfo.lastOffset + 1)

通过FileMessageSet顺序追加到Segment文件中,是对整个Messageset进行写入的

def append(messages: ByteBufferMessageSet) {  val written = messages.writeTo(channel, 0, messages.sizeInBytes)  _size.getAndAdd(written)}

3,Broker zero-copy 数据给SimpleConsumer

Broker接收到数据请求后,执行KafkaApis的

case RequestKeys.FetchKey => handleFetchRequest(request)val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]val dataRead = replicaManager.readMessageSets(fetchRequest)

在readMessageSets方法中

//读取指定topic 分区 offset 最大获取大小 MessageSetval (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)

获取副本replica对象,获取log的最大偏移,然后获取log对象,调用其read方法

val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)  getReplicaOrException(topic, partition)else  getLeaderReplicaIfLocal(topic, partition)trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))val maxOffsetOpt =  if (Request.isValidBrokerId(fromReplicaId))    None  else    Some(localReplica.highWatermark.messageOffset) //最大消息偏移val fetchInfo = localReplica.log match {  case Some(log) =>    log.read(offset, maxSize, maxOffsetOpt)

根据其实offset找到LogSegment。调用其read方法读取消息,根据最大偏移,起始偏移,和读取的最大bytes。

var entry = segments.floorEntry(startOffset)val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)

计算起始偏移和最大偏移对应的实际物理position

val logSize = log.sizeInBytes // this may change, need to save a consistent copyval startPosition = translateOffset(startOffset) //根据offset找到物理存储的偏移的位置// if the start position is already off the end of the log, return nullif(startPosition == null)  return nullval offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)// if the size is zero, still return a log segment but with zero sizeif(maxSize == 0)  return FetchDataInfo(offsetMetadata, MessageSet.Empty)// calculate the length of the message set to read based on whether or not they gave us a maxOffsetval length =   maxOffset match {    case None =>      // no max offset, just use the max size they gave unmolested 无麻烦的      maxSize    case Some(offset) => {      // there is a max offset, translate it to a file position and use that to calculate the max read size      if(offset < startOffset)        throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))      val mapping = translateOffset(offset, startPosition.position)      val endPosition =         if(mapping == null) //说明消息最大偏移超过了当前块,所以最大偏移就是当前块大小          logSize // the max offset is off the end of the log, use the end of the file        else          mapping.position //否则就是最大偏移      min(endPosition - startPosition.position, maxSize) //文件剩余数据的大小和最大索取大小哪个小,取最小的    }  }FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

最终调用FileMessageSet的read方法,根据我们提供的信息封装成了

new FileMessageSet(file,                   channel,                   start = this.start + position,                   end = math.min(this.start + position + size, sizeInBytes()))

至此,都是在讲我们Broker 在接收到数据读取请求后的处理业务线程。他处理结束之后会在handleFetchRequest方法中

val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))

zero-copy发生在socketServer回复SimpleConsumer的过程中
Processor的write方法中

val response = key.attachment().asInstanceOf[RequestChannel.Response]val responseSend = response.responseSendif(responseSend == null)  throw new IllegalStateException("Registered for write interest but no response attached to key.")val written = responseSend.writeTo(socketChannel)

下面,嵌套比较深,这里只列出了函数名称
FetchResponseSend.writeTo
MultiSend.writeTo
TopicDataSend.writeTo
MultiSend.writeTo
PartitionDataSend.writeTo
FileMessageSet.writeTo
此时我们就可以看到我们久违transferTo了。这就是整个zero-copy的过程

def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {  // Ensure that the underlying size has not changed.  val newSize = math.min(channel.size().toInt, end) - start  if (newSize < _size.get()) {    throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"      .format(file.getAbsolutePath, _size.get(), newSize))  }  val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt  trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred    + " bytes requested for transfer : " + math.min(size, sizeInBytes))  bytesTransferred}

4,消费者,Broker,消息大小的限制。

Broker接收到Producer消息后会对消息的有效性进行验证,是在Log的append方法中调用了

val appendInfo = analyzeAndValidateMessageSet(messages)

在该方法内部,进行了大小的验证

// Check if the message sizes are valid.val messageSize = MessageSet.entrySize(m) //单个消息的大小if(messageSize > config.maxMessageSize) {  BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)  BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)  throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."    .format(messageSize, config.maxMessageSize))}

Broker端接受的最大消息大小的配置为


消费者,对消息尺寸进行验证。看过去前面的文章应该知道,SimpleConsumer获取消息后最终是由kafkaStream(ConsumerIterator),将消息从对列里取出使用的。实际是在ConsumerIterator的makeNext方法中进行消息有效性验证

// if we just updated the current chunk and it is empty that means the fetch size is too small!if(currentDataChunk.messages.validBytes == 0)  throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +                                         "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."                                         .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))

不满足要求就会抛出一个异常。关键就是第一次迭代取消息的时候,假如消息消费者消费时指定的最大消息大小,小于生产者生产的,此时会导致消息获取不完整,然后进一步导致,shallowValidBytes返回值为零,以此来判断消息是否超过消费者所能消费消息大小,然后抛出相关异常。

  private def shallowValidBytes: Int = {    if(shallowValidByteCount < 0) {      var bytes = 0      val iter = this.internalIterator(true)      while(iter.hasNext) {        val messageAndOffset = iter.next        bytes += MessageSet.entrySize(messageAndOffset.message)      }//      bytes=0,代表消息不完整,当时第一次迭代就读取的消息不完整的话就会出现bytes为零,这时就需要调整消费者的消息大小了。      this.shallowValidByteCount = bytes    }    shallowValidByteCount  }

消费者所能获取的最大消息大小的配置

五,总结

本节主要目的是分析kafka的磁盘顺序写和zero-copy源码。这个也是kafka只所以高效的最关键步骤,在这里浪尖给出了一下使用时总结。

1,生产者生产消息时,采用异步

异步,并发
消息块方式减少网络io请求次数
可以更加好的利用Broker端的磁盘顺序写

2,生产者生产消息是指定压缩

节省网络传输带宽
配置方式

3,注意消息大小限制

结合自己的消息类型特点,进行相关配置。

最后赘述一个kafka高性能的原因:

1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。
2,磁盘顺序写。
3,零拷贝。

©著作权归作者所有:来自51CTO博客作者mob604756ed02fe的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Kafka原理详解
  2. FlinkSQL演进过程,解析原理及一些优化策略
  3. 一文读懂数据湖及企业中的架构特点
  4. kafka|使用Interceptors实现消息端到端跟踪
  5. 干货--部署RocketMQ
  6. 消息处理
  7. Kafka评传——从kafka的消息生命周期引出的沉思
  8. kafka架构
  9. Bootstrap 学习 - 网格系统

随机推荐

  1. 【web browser】启动android默认浏览器
  2. Android画图之Matrix
  3. android traceview分析
  4. Android--自定义SeekBarPreference控件
  5. Android SDK和最新ADT下载地址
  6. Linux 统计代码行数
  7. 【Android】hwbinder的selinux配置
  8. Android的各种onTouch
  9. Android Style.xml 详解
  10. android 安装apk包 卸载 包