PySpark Streaming 接收MQTT发布的资料!

嗨,大家好,今天要介绍的是关于透过Python完成Spark-Streaming,基本的Spark概念和MQTT这边可能不会多做解释,就当作大家已经有基本的RDDMQTT概念啰。

Summary

Spark Streaming ConceptDiscretized Streams (DStreams)A Quick ExampleTransformations on DStreamsHow Subscribe a MQTT Topic

关于Discretized Streams, A Quick Example, Transformations on DStreams的部分,基本上都是从官方的文件翻译过来的,需要看细节都可以从Spark Document找到:

streaming-programming-guide

Spark Streaming(Pyspark)

Spark Streaming 是Spark API的一个扩充能够即时资料串流处理,从不同来源取得资料后利用不同RDD函式转换资料格式或计算,最后将资料储存到资料库等地方,方便后续做机器学习演算法等等,如下图:

kafka、flume、Twitter、 ZeroMQ、Kinesis发送资料透过Spark将资料做储存到HDFS、资料库或是显示在Dashboard

Imgur

Spark Streaming和spark不同的是,它提供了一种高级的抽象类别Discretized Stream或称Dstream,它代表一个连续的资料串流。DStream能够藉由不同来源取得输入的资料。DStream的内部是由序列的RDD组成。

Discretized Streams (DStreams)

什么是DStreams?是由Spark Streaming提供的基本抽象类别,表现了一个连续的资料串流,它能够透过transform从接收来源输入资料或处理产生的资料串流。一个DStream表示一个一系列连续的RDDsRDD是Spark中不可变的抽象类别,分散式数据库。

在DStream中每个RDD中间有一定的间隔,每个RDD内包含了资料,如图:
Imgur

DStream上应用的任何操作(translates)都会转换为在基础RDD的操作,例如WordCount将串流每一行的字转换的例子中,将flatMap的操作应用于DStream行中的每个RDD,以生成字串的DStream,如图:

这些基础RDD transformations由Spark engine计算,DStream操作隐藏了大部分的细节,并为开发人员提供了更高级的API以方便使用。

A Quick Example

在进入如何编写Spark Streaming程式的细节前,我们来看一个简单的例子,程式从监听TCP Socket的资料伺服器取得文字资料,计算文字包含的单字数:

首先要先导入StreamingContext,这是所有Streaming功能的进入点。
from pyspark import SparkContextfrom pyspark.streaming import StreamingContext
使用两个执行执行绪创建本地(local)的批次处理间隔为1秒(以秒为单位分割资料串)的StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")ssc = StreamingContext(sc, 1)
利用StreamingContext,能够创建一个DStream,它代表从TCP来源(主机位址localhost,port为9999)取得的资料。
lines = ssc.socketTextStream("localhost", 9999)
lines变数是一个DStream,表示即将获得的资料串流。这个DStream的每条纪录都代表一行文字,并利用split来将资料做切割变成单字。flatMap是一个一对多的DStream操作,把DStream的每条纪录都生成多个新纪录来创建成新的DStream。在这个例子中,每行文字都被切分成了多个单字,我们把切割出的单字串流用words这个DStream表示。
words = lines.flatMap(lambda line: line.split(" "))
words这个DStream被一对一转换操作成了一个新的DStream,由(word,1)对(pair)组成。接着,就可以用这个新的DStream计算每批次资料的单字频率。最后,用wordCounts.print()印出每秒计算的单字频率。
pairs = words.map(lambda word: (word, 1))wordCounts = pairs.reduceByKey(lambda x, y: x + y)wordCounts.pprint()
值得注意的是上述的程式码Spark Streaming只是準备它要执行的计算,实际上并没有真正的执行,要真正的计算必须要调用Action函数。
ssc.start()             ssc.awaitTermination()  
如果已经将环境準备好了,开启终端机:
nc -lk 9999
开启另外一个终端机执行内建的範例程式码:
./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

Transformations on DStreams

RDDs很类似,transformations允许资料从输入DStream被修改。DStream支援很多可用的建立在一般Spark RDDtransformations,可以到Spark官方文件Transformations on DStreams查看细节。


How Subscribe a MQTT Topic

终于要介绍Spark如何订阅接收MQTT broker发布的资料,这里会主要着重Spark程式码的讲解,而不是MQTT介绍,就当作你已经有了MQTT的概念了。

当然,如果需要稍微暸解MQTT概念以及安装broker的话,可以看我之前的文章有提到:Flask上使用 MQTT!

和前面的介绍一样,我们需要引入些需要用到的函式库(包含SparkContext,mqtt等等):

import sysfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom mqtt import MQTTUtils
前面引入sys用来接收系统的参数,下面程式码中,我们判断是否接收刚好三个参数分别为pyspark.py, <broker url>, <MQTT Topic>
if len(sys.argv) != 3:    print >> sys.stderr, "Usage: pyspark.py <broker url> <topic>"    exit(-1)brokerUrl ='tcp://'+sys.argv[1]topic = sys.argv[2]
关于SparkContext前面介绍过的功能这里就不多作介绍了。
sc = SparkContext(appName="PythonStreamingMQTT")    ssc = StreamingContext(sc, 1)
定义linesMQTT接收到资料后创建的RDD,参数包含StreamingContext, brokerUrl, Mqtt topic
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)    mqtt_get_str = lines.map(lambda word:'get world from topic'+topic+" : "+word)    mqtt_get_str.pprint()
最后开启执行:
ssc.start()ssc.awaitTermination()
好了之后就可以执行程式码了:
spark-submit  PythonStreamingMQTT.py localhost:1883 mytopic
开启另外一个终端机:
mosquitto_pub -t mytopic -m hello_spark -h localhost
就能看到如下图的结果,我们收到了hello_spark。当然你可以一直发布讯息,Spark将会一直接收。

Imgur

谢谢大家的观看,还是Spark新手,有误的话请大家不吝啬给予指教!


关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章