1 . 下载 nifi
2 . 解压nifi
tar -xzf nifi-x.x.x-bin.tar.gz
命令解压 ,关于解命令烦请参考文章 Linux 下tar.gz、tar、bz2、zip 等解压缩、压缩命令小结- 如图所示解压结束即完成整个安装过程
3 .配置nifi
NiFi 的配置文件(存放在 [nifi_install_location]/config
)
nifi.properties
文件针对的是 NiFi 的配置,仅需做如下修改:
# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=192.168.203.7 # 设置成本机的 ip
nifi.web.http.port=9191
nifi.web.https.host=
nifi.web.https.port=
nifi.web.jetty.working.directory=./work/jetty
nifi.web.jetty.threads=200
OK。
4 . 启动Nifi
/bin/nifi.sh start
启动 nifi
浏览器中访问 192.168.2.3:9191/nifi
5 . nifi 案例-wordcount
统计每次收到的消息中,单词出现的频次
5.1 熟悉操作nifi各个组件
官网上也提供了一些 Youtube 上的教学视频,请参考这里 nifi.apache.org/videos
熟悉操作各个组件。如何添加、启动、停止、删除一个 Processor,如何对 Processor 进行基本配置?
熟练使用下边的组件
- GenerateFlowFile
- ExecuteScript
- PutFile
5.2 nifi 部署组件
从 Processor 拖取上述组件到操作面板上,依照描述的数据流动方式将组件串起。
串联组件时,有时需要对组件进行关系的选择。即选择上一组件分发的正确/错误消息分发到下一组件。
同时,末端的组件你需要在它的 setting pannel 指定自己处理 Success、Failure。
对 Processor 的配置如下:
5.2.1 GenerateFlowFile
Profile -> Custom Text 填入需要统计单词频次的文章/内容
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:
Scheduling -> Run Duration 调节至 2s,放缓数据生成的速度
5.2.2 ExecuteScript
Profile -> ScriptEngine 选择 Groovy
ScriptBody 填入如下 Groovy 代码:
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
def wordCount = [:]
def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def words = tellTaleHeart.split(/(!|?|-|.|"|:|;|,|\s)+/)*.toLowerCase()
words.each { word ->
def currentWordCount = wordCount.get(word)
if(!currentWordCount) {
wordCount.put(word, 1)
}
else {
wordCount.put(word, currentWordCount + 1)
}
}
def outputMapString = wordCount.inject("", {k,v -> k += "v.key:{v.value}\n"})
outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
session.transfer(flowFile, REL_SUCCESS)
5.2.3 PutFile配置
- Profile -> Directory 填入存放结果的文件夹,如
/home/lbh/logs/result
启动各个 Processor.可以看到如下图:
5.3 查看 结果
查看 PutFile 中设置的文件存放目录,能够看到存放着统计结果的文件telltale_heart_wordcount