在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

Java代码  
- def hasNext(): Boolean = { 
- if(state == FAILED) //处于FAILED状态时,另外线程访问会直接异常 
- throw new IllegalStateException("Iterator is in failed state") 
- state match { 
- case DONE => false 
- case READY => true 
- case _ => maybeComputeNext() 
- } 
- } 
- def maybeComputeNext(): Boolean = { 
- state = FAILED //重置了状态 
- nextItem = Some(makeNext()) 
- if(state == DONE) { 
- false 
- } else { 
- state = READY 
- true 
- } 
- } 
- 下载 
- protected def makeNext(): MessageAndMetadata[K, V] = { 
- var currentDataChunk: FetchedDataChunk = null 
- // if we don't have an iterator, get one 
- var localCurrent = current.get() 
- if(localCurrent == null || !localCurrent.hasNext) { 
- if (consumerTimeoutMs < 0) 
- currentDataChunk = channel.take //channel是BlockingQueue这里会阻塞 
- else { 
- currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) 
- if (currentDataChunk == null) { 
- // reset state to make the iterator re-iterable 
- resetState() 
- throw new ConsumerTimeoutException 
- } 
- } 
- //省略部分代码 
- } 
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
标题名称:kafkahigh-levelconsumer多线程访-创新互联
浏览地址:http://www.scyingshan.cn/article/shddj.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 