首頁技術(shù)文章正文

Kafka怎樣手動(dòng)消費(fèi)分區(qū)中的數(shù)據(jù)?

更新時(shí)間:2021-10-19 來源:黑馬程序員 瀏覽量:

IT培訓(xùn)班

我們可以讓Kafka根據(jù)消費(fèi)組中的消費(fèi)者動(dòng)態(tài)地為topic分配要消費(fèi)的分區(qū)。但在某些時(shí)候,我們需要指定要消費(fèi)的分區(qū),例如:

  1. 如果某個(gè)程序?qū)⒛硞€(gè)指定分區(qū)的數(shù)據(jù)保存到外部存儲(chǔ)中,例如:Redis、MySQL,那么保存數(shù)據(jù)的時(shí)候,只需要消費(fèi)該指定的分區(qū)數(shù)據(jù)即可

  2. 如果某個(gè)程序是高可用的,在程序出現(xiàn)故障時(shí)將自動(dòng)重啟(例如:后面我們將學(xué)習(xí)的Flink、Spark程序)。這種情況下,程序?qū)闹付ǖ姆謪^(qū)重新開始消費(fèi)數(shù)據(jù)。

 

如何進(jìn)行手動(dòng)消費(fèi)分區(qū)中的數(shù)據(jù)呢?

1. 不再使用之前的 subscribe 方法訂閱主題,而使用 「assign」方法指定想要消費(fèi)的消息

 String topic = "test";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

2. 一旦指定了分區(qū),就可以就像前面的示例一樣,在循環(huán)中調(diào)用「poll」方法消費(fèi)消息

 

注意

1. 當(dāng)手動(dòng)管理消費(fèi)分區(qū)時(shí),即使GroupID是一樣的,Kafka的組協(xié)調(diào)器都將不再起作用

2. 如果消費(fèi)者失敗,也將不再自動(dòng)進(jìn)行分區(qū)重新分配






猜你喜歡:

IOC底層實(shí)現(xiàn)原理介紹,手動(dòng)實(shí)現(xiàn)IOC容器

怎樣能確保Kafka儲(chǔ)存的數(shù)據(jù)不丟失?

Kafka是什么?kafka有什么優(yōu)點(diǎn)?

大數(shù)據(jù)流處理:Flume、Kafka和NiFi的區(qū)別

黑馬程序員python+大數(shù)據(jù)開發(fā)培訓(xùn)班

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!