- flink-1.13.6
- apache-pulsar-2.10.0
- apache-inlong-1.1.0
部署完成开始上手,先来个本地csv到kafka
基于1.1版本,不了解没使用过flink、pulsar, 跑通不容易,最开始本地csv到kafka一直没成功,经过几天调试代码熟悉,终于成功,下面步骤
数据分组-新建接入
下一步添加数据源和kafka,选择json
调试看代码的时候发现DeserializationSchemaFactory没csv,就换成json
流向接入kafka
kafka没有字段配置,需要手动数据库表stream_sink_field添加字段信息,不然输出空信息
审批管理-我的审批
数据分组-配置成功
consumer和reader差异,当配置成功删除pulsar自动创建的订阅,因为与flink订阅冲突,flink job报错
org.apache.pulsar.client.api.PulsarClientException$NotAllowedException: Durable subscription with the same name already exists.
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1001)
at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:78)
at org.apache.inlong.sort.flink.pulsar.PulsarUtils.createReader(PulsarUtils.java:123)
at org.apache.inlong.sort.flink.pulsar.PulsarSourceFunction.run(PulsarSourceFunction.java:291)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
测试数据
添加数据
#删除文件,貌似没监听modify
rm -rf b.csv
echo {"a":1121,"b":34} > b.csv
查看kafka消息,有了