嗨,大家好,今天要介绍的是关于透过Python完成Spark-Streaming
,基本的Spark概念和MQTT这边可能不会多做解释,就当作大家已经有基本的RDD
与MQTT
概念啰。
Summary
Spark Streaming ConceptDiscretized Streams (DStreams)A Quick ExampleTransformations on DStreamsHow Subscribe a MQTT Topic关于Discretized Streams, A Quick Example, Transformations on DStreams的部分,基本上都是从官方的文件翻译过来的,需要看细节都可以从Spark Document找到:
streaming-programming-guideSpark Streaming(Pyspark)
Spark Streaming 是Spark API的一个扩充能够即时资料串流处理,从不同来源取得资料后利用不同RDD函式转换资料格式或计算,最后将资料储存到资料库等地方,方便后续做机器学习演算法等等,如下图:
kafka、flume、Twitter、 ZeroMQ、Kinesis发送资料透过Spark将资料做储存到HDFS、资料库或是显示在DashboardSpark Streaming和spark不同的是,它提供了一种高级的抽象类别Discretized Stream
或称Dstream
,它代表一个连续的资料串流。DStream
能够藉由不同来源取得输入的资料。DStream
的内部是由序列的RDD
组成。
Discretized Streams (DStreams)
什么是DStreams
?是由Spark Streaming提供的基本抽象类别,表现了一个连续的资料串流,它能够透过transform
从接收来源输入资料或处理产生的资料串流。一个DStream
表示一个一系列连续的RDDs
,RDD
是Spark中不可变的抽象类别,分散式数据库。
在DStream中每个RDD
中间有一定的间隔,每个RDD
内包含了资料,如图:
在DStream
上应用的任何操作(translates)都会转换为在基础RDD
的操作,例如WordCount将串流每一行的字转换的例子中,将flatMap
的操作应用于DStream
行中的每个RDD
,以生成字串的DStream
,如图:
这些基础RDD transformations
由Spark engine计算,DStream操作隐藏了大部分的细节,并为开发人员提供了更高级的API以方便使用。
A Quick Example
在进入如何编写Spark Streaming程式的细节前,我们来看一个简单的例子,程式从监听TCP Socket的资料伺服器取得文字资料,计算文字包含的单字数:
首先要先导入StreamingContext
,这是所有Streaming功能的进入点。from pyspark import SparkContextfrom pyspark.streaming import StreamingContext
使用两个执行执行绪创建本地(local)
的批次处理间隔为1秒(以秒为单位分割资料串)的StreamingContext
。sc = SparkContext("local[2]", "NetworkWordCount")ssc = StreamingContext(sc, 1)
利用StreamingContext
,能够创建一个DStream
,它代表从TCP来源(主机位址localhost,port为9999)取得的资料。lines = ssc.socketTextStream("localhost", 9999)
lines
变数是一个DStream,表示即将获得的资料串流。这个DStream
的每条纪录都代表一行文字,并利用split
来将资料做切割变成单字。flatMap
是一个一对多的DStream操作,把DStream
的每条纪录都生成多个新纪录来创建成新的DStream
。在这个例子中,每行文字都被切分成了多个单字,我们把切割出的单字串流用words
这个DStream
表示。words = lines.flatMap(lambda line: line.split(" "))
words
这个DStream
被一对一转换操作成了一个新的DStream
,由(word,1)对(pair)组成。接着,就可以用这个新的DStream
计算每批次资料的单字频率。最后,用wordCounts.print()
印出每秒计算的单字频率。pairs = words.map(lambda word: (word, 1))wordCounts = pairs.reduceByKey(lambda x, y: x + y)wordCounts.pprint()
值得注意的是上述的程式码Spark Streaming
只是準备它要执行的计算,实际上并没有真正的执行,要真正的计算必须要调用Action函数。ssc.start() ssc.awaitTermination()
如果已经将环境準备好了,开启终端机:nc -lk 9999
开启另外一个终端机执行内建的範例程式码:./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
Transformations on DStreams
和RDDs
很类似,transformations
允许资料从输入DStream
被修改。DStream
支援很多可用的建立在一般Spark RDD
的transformations
,可以到Spark官方文件Transformations on DStreams查看细节。
How Subscribe a MQTT Topic
终于要介绍Spark
如何订阅接收MQTT broker
发布的资料,这里会主要着重Spark
程式码的讲解,而不是MQTT
介绍,就当作你已经有了MQTT
的概念了。
当然,如果需要稍微暸解MQTT
概念以及安装broker
的话,可以看我之前的文章有提到:Flask上使用 MQTT!
和前面的介绍一样,我们需要引入些需要用到的函式库(包含SparkContext
,mqtt
等等):
import sysfrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom mqtt import MQTTUtils
前面引入sys
用来接收系统的参数,下面程式码中,我们判断是否接收刚好三个参数分别为pyspark.py
, <broker url>
, <MQTT Topic>
if len(sys.argv) != 3: print >> sys.stderr, "Usage: pyspark.py <broker url> <topic>" exit(-1)brokerUrl ='tcp://'+sys.argv[1]topic = sys.argv[2]
关于SparkContext
前面介绍过的功能这里就不多作介绍了。sc = SparkContext(appName="PythonStreamingMQTT") ssc = StreamingContext(sc, 1)
定义lines
为MQTT
接收到资料后创建的RDD,参数包含StreamingContext
, brokerUrl
, Mqtt topic
。lines = MQTTUtils.createStream(ssc, brokerUrl, topic) mqtt_get_str = lines.map(lambda word:'get world from topic'+topic+" : "+word) mqtt_get_str.pprint()
最后开启执行:ssc.start()ssc.awaitTermination()
好了之后就可以执行程式码了:spark-submit PythonStreamingMQTT.py localhost:1883 mytopic
开启另外一个终端机:mosquitto_pub -t mytopic -m hello_spark -h localhost
就能看到如下图的结果,我们收到了hello_spark
。当然你可以一直发布讯息,Spark将会一直接收。谢谢大家的观看,还是Spark新手,有误的话请大家不吝啬给予指教!