在测试从 Spark Shell读取MySQL一张Large Table时,发生了Out of memory和connection timeout问题,记录一下处理的过程:
MySQL Table资料笔数:1400万笔左右
Spark Cluster配置:Master * 1,Slave * 3,皆为1 core 8G
Spark 版本:2.1.1
Spark Config配置:
spark-env.sh
SPARK_WORKER_MEMORY=6gSPARK_DRIVER_MEMORY=6gSPARK_EXECUTOR_MEMORY=2g
执行指令:
./bin/spark-shell --master spark://192.168.xx.xx:7077 --executor-memory 4g --packages mysql:mysql-connector-java:5.1.38val sqlContext = new org.apache.spark.sql.SQLContext(sc)val df = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://192.168.x.x/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "test_table").option("user", "root").option("password", "test").load()df.createOrReplaceTempView("test_table")import org.apache.spark.storage.StorageLeveldf.persist(StorageLevel.MEMORY_AND_DISK)val sqlDf = sql("select * from test_table limit 10000")sqlDf.show()
一开始无论执行什么query,只要是query 这张大table都会出现OOM,重试几次甚至出现executor heartbeat timeout。
错误讯息:
Error Message: Executor heartbeat timeout
[Stage 0:=======================================================> (39 + 1) / 40]17/06/06 10:26:09 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 147897 ms exceeds timeout 120000 ms17/06/06 10:26:09 ERROR TaskSchedulerImpl: Lost executor 2 on 192.168.1.181: Executor heartbeat timed out after 147897 ms17/06/06 10:26:09 WARN TaskSetManager: Lost task 39.0 in stage 0.0 (TID 39, 192.168.1.181, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 147897 ms...略
Error Message: OOM
[Stage 0:======================================================> (27 + 1) / 28]17/06/06 10:14:25 WARN TaskSetManager: Lost task 27.0 in stage 0.0 (TID 27, 192.168.1.184, executor 0): java.lang.OutOfMemoryError: Java heap space at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2157) at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1964) at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3316) at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:463) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3040) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2288) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2681) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551) at ...略
查了官方文件和这篇 spark read mysql 效能调校文章后,有几种作法来 tuning:
(1) 增加 partition 数
会比较慢,但是至少可以把Table读进来,Spark会将资料切成小块partition,分散到不同的 executor上。
(2) 设定spark mysql connection options
主要是这几个设定:partitionColumn, lowerBound, upperBound, numPartitions, fetchsize
PS 如果使用partitionColumn,则lowerBound,upperBound,numPartitions都需设定
partitionColumn: 用来决定partition切割的栏位,必须是numeric型态资料,不一定要唯一。
lowerBound \ upperBound: 决定要fetch的值range
会用这几个options来决定要query的dataset。SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound
(3) Spark记忆体管理:worker 本身会保留一部份的memory来做cache
spark 会将memory大致分成三种,user可使用的、资料shuffle用的,储存RDD的如果使用者程式要操作的物件很大,可以把快取RDD的大小和资料shuffle大小调低,以增加物件处理的效能官方文件说明:
- spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.- spark.memory.storageFraction expresses the size of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution.
这边使用到的是第2和第3个方法
首先在query mysql的指令加上optionsval df = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://192.168.x.x/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "test_table").option("user", "root").option("password", "test").option("numPartitions",30).option("partitionColumn","id").option("lowerBound", "0").option("upperBound","20000").load()
并在 spark-default.conf 加入下面设定
# for heartbeat timeoutspark.network.timeout 10000000spark.executor.heartbeatInterval 10000000# for OOMspark.memory.fraction 0.75spark.storage.memoryFraction 0.45
再执行以下query,都可以顺利读入了
val sqlDf = sql("select count(*) from test_table where dt >= '2017-05-01'")sqlDf: org.apache.spark.sql.DataFrame = [count(1): bigint]scala> sqlDf.show()+--------+|count(1)|+--------+| 222166|+--------+val sqlDf = sql("select count(*) from test_table")sqlDf: org.apache.spark.sql.DataFrame = [count(1): bigint]scala> sqlDf.show()+--------+|count(1)|+--------+|14665557|+--------+
不确定这样是不是最佳解法,不过至少解决掉问题了@@
如果不需要读整张Table,其实可以把dbtable的value改成SQL query
原本是这样,直接写Table名称,就会读整张Table.option("dbtable", "test_table")
可以改写成:.option("dbtable", "(select * from test_table where dt >= '2017-05-01') as T")
PS 记得一定要用左右括号包起来,因为dbtable的value会被当成一张table作查询,mysql connector会自动dbtable后面加上 where 1=1,如果没包起来就会出现SQL Syntax Error之类的错误