科技

Apache-Flink深度解析-DataStream-Connectors之Kafka

Kafka 簡介

Apache Kafka是一個分散式釋出-訂閱訊息傳遞系統。 它最初由LinkedIn公司開發,LinkedIn於2010年貢獻給了Apache基金會併成為頂級開源專案。Kafka用於構建實時資料管道和流式應用程式。它具有水平擴充套件性、容錯性、極快的速度,目前也得到了廣泛的應用。

Kafka不但是分散式訊息系統而且也支援流式計算,所以在介紹Kafka在Apache Flink中的應用之前,先以一個Kafka的簡單示例直觀瞭解什麼是Kafka。

安裝

本篇不是系統的,詳盡的介紹Kafka,而是想讓大家直觀認識Kafka,以便在Apahe Flink中進行很好的應用,所以我們以最簡單的方式安裝Kafka。

下載二進位制包curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

解壓安裝Kafka安裝只需要將下載的tgz解壓即可,如下:jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz

jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls

LICENSE NOTICE bin config libs site-docs

其中bin包含了所有Kafka的管理命令,如接下來我們要啟動的Kafka的Server。

啟動Kafka ServerKafka是一個釋出訂閱系統,訊息訂閱首先要有個服務存在。我們啟動一個Kafka Server 例項。 Kafka需要使用ZooKeeper,要進行投產部署我們需要安裝ZooKeeper叢集,這不在本篇的介紹範圍內,所以我們利用Kafka提供的指令碼,安裝一個只有一個節點的ZooKeeper例項。如下:jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

....

....

[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

啟動之後,ZooKeeper會繫結2181埠(預設)。接下來我們啟動Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties

[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)

[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)

...

...

[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果上面一切順利,Kafka的安裝就完成了。

建立Topic

Kafka是訊息訂閱系統,首先建立可以被訂閱的Topic,我們建立一個名為flink-tipic的Topic,在一個新的terminal中,執行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

在Kafka Server的terminal中也會輸出如下成功建立資訊:

...

[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

...

上面顯示了flink-topic的基本屬性配置,如訊息壓縮方式,訊息格式,備份數量等等。

除了看日誌,我們可以用命令顯示的查詢我們是否成功的建立了flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181

flink-tipic

如果輸出flink-tipic,那麼說明我們的Topic成功建立了。

那麼Topic是儲存在哪裡?Kafka是怎樣進行訊息的釋出和訂閱的呢?為直觀,我們看如下Kafka架構示意圖簡單理解一下:

簡單介紹一下,Kafka利用ZooKeeper來儲存叢集資訊,也就是上面我們啟動的Kafka Server 例項,一個叢集中可以有多個Kafka Server 例項,Kafka Server叫做Broker,我們建立的Topic可以在一個或多個Broker中。Kafka利用Push模式傳送訊息,利用Pull方式拉取訊息。

傳送訊息

如何向已經存在的Topic中傳送訊息呢,當然我們可以API的方式編寫程式碼傳送訊息。同時,還可以利用命令方式來便捷的傳送訊息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic

>Kafka test msg

>Kafka connector

上面我們傳送了兩條訊息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

讀取訊息

如果讀取指定Topic的訊息呢?同樣可以API和命令兩種方式都可以完成,我們以命令方式讀取flink-topic的訊息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning

Kafka test msg

Kafka connector

其中--from-beginning 描述了我們從Topic開始位置讀取訊息。

Flink Kafka Connector

前面我們以最簡單的方式安裝了Kafka環境,那麼我們以上面的環境介紹Flink Kafka Connector的使用。Flink Connector相關的基礎知識會在《Apache Flink 漫談系列(14) - Connectors》中介紹,這裡我們直接介紹與Kafka Connector相關的內容。

Apache Flink 中提供了多個版本的Kafka Connector,本篇以flink-1.7.0版本為例進行介紹。

mvn 依賴

要使用Kakfa Connector需要在我們的pom中增加對Kafka Connector的依賴,如下:

org.apache.flink

flink-connector-kafka_2.11

1.7.0

Flink Kafka Consumer需要知道如何將Kafka中的二進位制資料轉換為Java / Scala物件。 DeserializationSchema允許使用者指定這樣的模式。 為每個Kafka訊息呼叫 T deserialize(byte [] message)方法,從Kafka傳遞值。

Examples

我們示例讀取Kafka的資料,再將資料做簡單處理之後寫入到Kafka中。我們需要再建立一個用於寫入的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

所以示例中我們Source利用flink-topic, Sink用slink-topic-output。

Simple ETL

我們假設Kafka中儲存的就是一個簡單的字串,所以我們需要一個用於對字串進行serialize和deserialize的實現,也就是我們要定義一個實現DeserializationSchema和SerializationSchema 的序列化和反序列化的類。因為我們示例中是字串,所以我們自定義一個KafkaMsgSchema實現類,然後在編寫Flink主程式。

KafkaMsgSchema - 完整程式碼import org.apache.flink.api.common.serialization.DeserializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.util.Preconditions;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema, SerializationSchema {

private static final long serialVersionUID = 1L;

private transient Charset charset;

public KafkaMsgSchema() {

// 預設UTF-8編碼

this(Charset.forName("UTF-8"));

}

public KafkaMsgSchema(Charset charset) {

this.charset = Preconditions.checkNotNull(charset);

}

public Charset getCharset() {

return this.charset;

}

public String deserialize(byte[] message) {

// 將Kafka的訊息反序列化為java物件

return new String(message, charset);

}

public boolean isEndOfStream(String nextElement) {

// 流永遠不結束

return false;

}

public byte[] serialize(String element) {

// 將java物件序列化為Kafka的訊息

return element.getBytes(this.charset);

}

public TypeInformation getProducedType() {

// 定義產生的資料Typeinfo

return BasicTypeInfo.STRING_TYPE_INFO;

}

private void writeObject(ObjectOutputStream out) throws IOException

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {

in.defaultReadObject();

String charsetName = in.readUTF();

this.charset = Charset.forName(charsetName);

}

}

主程式 - 完整程式碼import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {

public static void main(String[] args) throws Exception {

// 使用者引數獲取

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

// Stream 環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Source的topic

String sourceTopic = "flink-topic";

// Sink的topic

String sinkTopic = "flink-topic-output";

// broker 地址

String broker = "localhost:9092";

// 屬性引數 - 實際投產可以在命令列傳入

Properties p = parameterTool.getProperties();

p.putAll(parameterTool.getProperties());

p.put("bootstrap.servers", broker);

env.getConfig().setGlobalJobParameters(parameterTool);

// 建立消費者

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(

sourceTopic,

new KafkaMsgSchema(),

p);

// 設定讀取最早的資料

// consumer.setStartFromEarliest();

// 讀取Kafka訊息

DataStream input = env.addSource(consumer);

// 資料處理

DataStream result = input.map(new MapFunction() {

public String map(String s) throws Exception {

String msg = "Flink study ".concat(s);

System.out.println(msg);

return msg;

}

});

// 建立生產者

FlinkKafkaProducer producer = new FlinkKafkaProducer(

sinkTopic,

new KeyedSerializationSchemaWrapper(new KafkaMsgSchema()),

p,

FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

// 將資料寫入Kafka指定Topic中

result.addSink(producer);

// 執行job

env.execute("Kafka Example");

}

}

執行主程式如下:

我測試操作的過程如下:

啟動flink-topic和flink-topic-output的消費拉取;通過命令向flink-topic中新增測試訊息only for test;通過命令列印驗證新增的測試訊息 only for test;最簡單的FlinkJob source->map->sink 對測試訊息進行map處理:"Flink study ".concat(s);通過命令列印sink的資料;#### 內建Schemas

Apache Flink 內部提供瞭如下3種內建的常用訊息格式的Schemas:

TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基於Flink的TypeInformation建立模式。 如果資料由Flink寫入和讀取,這將非常有用。JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它將序列化的JSON轉換為ObjectNode物件,可以使用objectNode.get(“field”)作為(Int / String / ...)()從中訪問欄位。 KeyValue objectNode包含“key”和“value”欄位,其中包含所有欄位以及可選的"metadata"欄位,該欄位公開此訊息的偏移量/分割槽/主題。AvroDeserializationSchema 它使用靜態提供的模式讀取使用Avro格式序列化的資料。 它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))推斷出模式,或者它可以與GenericRecords一起使用手動提供的模式(使用AvroDeserializationSchema.forGeneric(...))要使用內建的Schemas需要新增如下依賴:

org.apache.flink

flink-avro

1.7.0

讀取位置配置

我們在消費Kafka資料時候,可能需要指定消費的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置設定,如下:

consumer.setStartFromEarliest() - 從最早的記錄開始;consumer.setStartFromLatest() - 從最新記錄開始;consumer.setStartFromTimestamp(...); // 從指定的epoch時間戳(毫秒)開始;consumer.setStartFromGroupOffsets(); // 預設行為,從上次消費的偏移量進行繼續消費。上面的位置指定可以精確到每個分割槽,比如如下程式碼:

Map specificStartOffsets = new HashMap();

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一個分割槽從23L開始

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二個分割槽從31L開始

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三個分割槽從43L開始

consumer.setStartFromSpecificOffsets(specificStartOffsets);

對於沒有指定的分割槽還是預設的setStartFromGroupOffsets方式。

Topic發現

Kafka支援Topic自動發現,也就是用正則的方式建立FlinkKafkaConsumer,比如:

// 建立消費者

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),

new KafkaMsgSchema(),

p);

在上面的示例中,當作業開始執行時,消費者將訂閱名稱與指定正則表示式匹配的所有Topic(以sourceTopic的值開頭並以單個數字結尾)。

定義Watermark(Window)

對Kafka Connector的應用不僅限於上面的簡單資料提取,我們更多時候是期望對Kafka資料進行Event-time的視窗操作,那麼就需要在Flink Kafka Source中定義Watermark。

要定義Event-time,首先是Kafka資料裡面攜帶時間屬性,假設我們資料是String#Long的格式,如only for test#1000。那麼我們將Long作為時間列。

KafkaWithTsMsgSchema - 完整程式碼要想解析上面的Kafka的資料格式,我們需要開發一個自定義的Schema,比如叫KafkaWithTsMsgSchema,將String#Long解析為一個Java的Tuple2,完整程式碼如下:import org.apache.flink.api.common.serialization.DeserializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

import org.apache.flink.util.Preconditions;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema>, SerializationSchema> {

private static final long serialVersionUID = 1L;

private transient Charset charset;

public KafkaWithTsMsgSchema()

public KafkaWithTsMsgSchema(Charset charset) {

this.charset = Preconditions.checkNotNull(charset);

}

public Charset getCharset() {

return this.charset;

}

public Tuple2 deserialize(byte[] message) {

String msg = new String(message, charset);

String[] dataAndTs = msg.split("#");

if(dataAndTs.length == 2){

return new Tuple2(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));

}else{

// 實際生產上需要丟擲runtime異常

System.out.println("Fail due to invalid msg format.. ["+msg+"]");

return new Tuple2(msg, 0L);

}

}

@Override

public boolean isEndOfStream(Tuple2 stringLongTuple2) {

return false;

}

public byte[] serialize(Tuple2 element) {

return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);

}

private void writeObject(ObjectOutputStream out) throws IOException

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {

in.defaultReadObject();

String charsetName = in.readUTF();

this.charset = Charset.forName(charsetName);

}

@Override

public TypeInformation> getProducedType() {

return new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

}

}

Watermark生成提取時間戳和建立Watermark,需要實現一個自定義的時間提取和Watermark生成器。在Apache Flink 內部有2種方式如下:

AssignerWithPunctuatedWatermarks - 每條記錄都產生Watermark。AssignerWithPeriodicWatermarks - 週期性的生成Watermark。我們以AssignerWithPunctuatedWatermarks為例寫一個自定義的時間提取和Watermark生成器。程式碼如下:import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;

import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks

implements AssignerWithPunctuatedWatermarks> {

@Nullable

@Override

public Watermark checkAndGetNextWatermark(Tuple2 o, long l) {

// 利用提取的時間戳建立Watermark

return new Watermark(l);

}

@Override

public long extractTimestamp(Tuple2 o, long l) {

// 提取時間戳

return o.f1;

}

}

主程式 - 完整程式我們計算一個大小為1秒的Tumble視窗,計算視窗內最大的值。完整的程式如下:import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {

public static void main(String[] args) throws Exception {

// 使用者引數獲取

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

// Stream 環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 設定 Event-time

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Source的topic

String sourceTopic = "flink-topic";

// Sink的topic

String sinkTopic = "flink-topic-output";

// broker 地址

String broker = "localhost:9092";

// 屬性引數 - 實際投產可以在命令列傳入

Properties p = parameterTool.getProperties();

p.putAll(parameterTool.getProperties());

p.put("bootstrap.servers", broker);

env.getConfig().setGlobalJobParameters(parameterTool);

// 建立消費者

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer>(

sourceTopic,

new KafkaWithTsMsgSchema(),

p);

// 讀取Kafka訊息

TypeInformation> typeInformation = new TupleTypeInfo>(

BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

DataStream> input = env

.addSource(consumer).returns(typeInformation)

// 提取時間戳,並生產Watermark

.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

// 資料處理

DataStream> result = input

.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))

.max(0);

// 建立生產者

FlinkKafkaProducer producer = new FlinkKafkaProducer>(

sinkTopic,

new KeyedSerializationSchemaWrapper>(new KafkaWithTsMsgSchema()),

p,

FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

// 將資料寫入Kafka指定Topic中

result.addSink(producer);

// 執行job

env.execute("Kafka With Event-time Example");

}

}

測試執行如下

簡單解釋一下,我們輸入數如下:

MsgWatermarkE#10000001000000A#30000003000000B#50000005000000C#50001005000100E#50001205000120A#70000007000000

我們看的5000000~7000000之間的資料,其中B#5000000, C#5000100和E#5000120是同一個視窗的內容。計算MAX值,按字串比較,最大的訊息就是輸出的E#5000120。

Kafka攜帶Timestamps

在Kafka-0.10+ 訊息可以攜帶timestamps,也就是說不用單獨的在msg中顯示新增一個數據列作為timestamps。只有在寫入和讀取都用Flink時候簡單一些。一般情況用上面的示例方式已經足夠了。

小結

本篇重點是向大家介紹Kafka如何在Flink中進行應用,開篇介紹了Kafka的簡單安裝和收發訊息的命令演示,然後以一個簡單的資料提取和一個Event-time的視窗示例讓大家直觀的感受如何在Apache Flink中使用Kafka。

你可能感興趣的文章

Flink入門Flink DataSet&DataSteam APIFlink叢集部署Flink重啟策略Flink分散式快取Flink重啟策略Flink中的TimeFlink中的視窗Flink的時間戳和水印Flink廣播變數Flink-Kafka-connetorFlink-Table&SQLFlink實戰專案-熱銷排行Flink-Redis-SinkFlink消費Kafka寫入Mysql後面會繼續更新更多實戰案例...

Reference:科技日報

看更多!請加入我們的粉絲團

轉載請附文章網址

不可錯過的話題