使用python在Spark Shell进行交互式分析

我们这里按照spark官网上的文档一步一步学习在python环境下使用Spark shell进行数据分析。下面在linux centos里已经安装好spark集群,一共8台主机:

/usr/bin/pyspark2 #我的pyspark位置,这个应用就是python环境下的spark shell。

Spark把数据抽象为分布式的Dataset。可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建 Dataset。由于Python的动态性质,我们不需要在Python中对数据集进行强类型化。这样,Python转换的所有数据集都是Dataset [Row],我们称其为DataFrame与Pandas和R中的数据框概念一致。下面的例子从Spark源目录中的README文件的文本中创建一个新的DataFrame:

>>> textFile = spark.read.text("README.md") #这里是从hdfs里直接读取缺省用户目录下的README.md文件。如果你没有,请上传一个README.md用于练习。

通过调用方法直接从DataFrame中获取值,或转换DataFrame以获取新的值。有关更多详细信息,请阅读API文档

>>> textFile.count()  # 上面把一个文本转换成DataFrame,然后在这个DataFrame把所有的单词计数,得到结果是126


>>> textFile.first()  # 在DataFrame获取第一行的值。
Row(value=u'# Apache Spark')

我们调用filter方法返回一个新的DataFrame,其中包含文件中各行的子集。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) #把文件中的包含spark的行形成一个新的DataFrame。

也可以将转换和动作链接在一起:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

数据集操作和转换可用于更复杂的计算。比如现在要查找包含最多单词的行

>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

首先,将一行映射到一个整数值,并将其别名为“ numWords”,这样就创建一个新的DataFrame。agg在这个新的DataFrame上调用,以找到最大的字数。使用df.列名从数据帧获得一列。在pyspark.sql.functions中提供了许多方便的功能来从旧的列构建新的列。

注意:”\s+”是为了匹配任何空白字符,包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]。注意 Unicode 正则表达式会匹配全角空格符。

一种常见的数据流模式是Hadoop流行的MapReduce。Spark可以轻松实现MapReduce流:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

在这里,我们调用explode函数将行的数据集转换为单词的数据集,然后使用groupBy把数据集里的单词进行分组,使用count计算文件中每个单词的数量,word作为key,而 count就是值。要收集所有的单词数,可以调用collect

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Spark支持将数据集存入内存中缓存。当重复访问数据时,例如查询小的“热”数据集或运行迭代算法(如PageRank)时,这非常有用。举一个简单的例子,让我们标记linesWithSpark要缓存的数据集:

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15


发表评论

电子邮件地址不会被公开。 必填项已用*标注