Spark ML里的核心API已经由基于RDD换成了基于DataFrame,为了使读取到的值成为DataFrame类型,我们可以直接使用读取CSV的方式来读取文本文件,可问题来了,当文本文件中每一行的各个数据被不定数目的空格所隔开时,我们无法将这些不定数目的空格当作CSV文件的分隔符(因为Spark读取CSV文件时,不支持正则表达式作为分隔符),一个常用方法是先将数据读取为rdd,然后用map方法构建元组,再用toDF方法转为DataFrame,但是如果列数很多的话,构建元组会很麻烦。本文将介绍spark读取多列txt文件后转成DataFrame以供一些数据源使用的三种方法。
1 数据说明
使用Synthetic Control Chart Time Series数据synthetic_control.data,数据包括600个数据点(行),每个数据点有60个属性,详细信息见:
http://archive.ics.uci.edu/ml/databases/synthetic_control/
如图,每个数据点的不同属性用不定数量的空格隔开,为了解决这个问题,本文将介绍两种方法(现已更新为三种方法)。
18.08.17更新!今天发现了一个新的方法,比原来的第二种方法还简单了许多,请读者在上策中查看。
2 下策
2.1 基本思想
本方法非常繁琐且效率较低,是我在没看到第二种方法时自己想的,本方法的思想是:
- 直接读取数据,保存成一个String类型的RDD
- 将此RDD中每一行中的不定数量的空格用正则表达式匹配选出后替换成“,”
- 将处理过后的RDD保存到一个临时目录中
- 以CSV方式读取此临时目录中的数据,便可将读到的数据直接存成一个多列的DataFrame
- 最后将此DataFrame的数据类型转为Double
2.2 代码
1 | import org.apache.hadoop.conf.Configuration |
3 中策
3.1 代码及说明
1 | import org.apache.spark.sql.types._ |
3.2 结果展示
1 | scala> val data = spark.createDataFrame(rowRDD, schema) |
4 上策
4.1 基本思想
- 读取原始文件,用正则表达式分割每个样本点的属性值,保存成Array[String]类型的RDD
- 利用Spark ML库中的LabeledPoint类将数据转换成LabeledPoint类型的RDD。
LabeledPoint类型包含label列和features列,label列即标签列,是Double类型的,因为本次数据未经训练还没有标签,所以可随意给定一个数字;features列即特征向量列,是向量类型的,本次数据均为特征点,所以用Vectors类全部转换为向量类型。 - 将LabeledPoint类型的RDD转换为DataFrame并只选择其features列,得到一个新的DataFrame,然后就可以在此df上进行一些机器学习算法(如:KMeans)了。
4.2 代码
1 | import org.apache.spark.ml.feature.LabeledPoint |
4.3 结果展示
1 | scala> val data = spark.createDataFrame(LabeledPointRdd).select("features") |
5 参考资料
[1]董克伦.spark 将DataFrame所有的列类型改为double[OL].2018-04-27/2018-08-08
[2]董克伦.旧版spark(1.6版本) 将rdd动态转为dataframe[OL].2018-05-11/2018-08-08
[3]吴茂贵.深度实践Spark机器学习[M].北京:机械工业出版社.2018:104-106