此文为宁哥翻译文章,原文章请看这里。
关于DataFrame
上一篇博客我讲到了《基于Python的数据科学技术栈的综述》。这一篇让我们关注一个非常重要的概念:DataFrame。
DataFrame非常适合处理结构化数据和半结构化数据,它们基本上是一些行的集合,而每一列都有自己的命名。考虑一下关系型数据库的表,你会发现DataFrame和它非常相似,并支持相似的操作:
对数据切片:在确定的条件下,选择某些行或者列的子集(过滤器)
对某一列或某几列数据进行排序
整合数据,并给出简要性的统计
连接多个DataFrame结构
作为一种漂亮的、类SQL的接口,DataFrame用在一种完全成熟的编程语言中,因此它有比SQL更强大的功能。这就意味着,我们可以在通用编程语言中的任意代码中加入一些解释性的类SQL的操作。
DataFrame在R语言中被普及并被其它语言和框架采用。对于Python来说有Pandas,这是一个非常棒的数据分析型库,而DataFrame也是其中一个非常关键的概念。
Pandas的局限以及Spark下的DataFrame结构
Pandas并不适用于任何场景,它只是一个单机版的工具,因此我们只能在单机上进行操作。此外,Pandas没有内建的并行机制,因此它只能仅仅使用一个CPU内核。
当你处理一个中型数据集(数亿字节)时很可能达到瓶颈,而当你想处理更大的数据集时,Pandas却无能为力。
非常幸运,几个月前Spark社区开源了一种支持DataFrame结构的Spark新版本,与Pandas有相似的接口,但它是重新设计并且支持大数据的。
在Spark DataFrame中有很多非常酷的工程,比如说代码生成、手动内存管理、Catalyst优化器,先抛开这些,关注下它的可用性:
如果你了解Pandas,使用Spark DataFrame的难易程度有多大?
Spark DataFrame有一些怪异模式吗?
有哪些重要的功能性缺失?
我们将举一些例子来对比Pandas和Spark中的DataFrame,然后解决一些我们关注的问题。
让我们开始吧!
对比:Spark相对Pandas的优势
读取数据
Pandas:
>>> data = pd.read_csv("/data/adult-dataset-data.csv")
>>> data.head(1)
Out:
age final_weight workclass education_num
0 39 77516 State-gov 13
Spark:
>>> data = sqlc.load("s3://data/adult-dataset-data.csv", "com.databricks.spark.csv")
>>> data.take(1)
Out:
[Row(age=45.0, final_weight=160962.0, workclass=u' Self-emp-not-inc', education_num=10.0)]
Pandas和Spark中的DataFrame都能很轻易读取多种格式,包括CSV、JSON以及某些二进制格式(其中有的需要依赖额外的库)
注意一点,Spark DataFrame没有index,它不能枚举出列来(对于Pandas来说有列默认的index)。在Pandas中index是特殊的列,如果在Spark DataFrame中我们真需要它,可以选择命名为”index”的列。
切片
Pandas:
>>> sliced = data[data.workclass.isin([' Local-gov', ' State-gov']) \
& (data.education_num > 1)][['age', 'workclass']]
>>> sliced.head(1)
Out:
age workclass
0 39 State-gov
Spark:
>>> slicedSpark = dataSpark[dataSpark.workclass.inSet([' Local-gov', ' State-gov'])
& (dataSpark.education_num > 1)][['age', 'workclass']]
>>> slicedSpark.take(1)
Out:
[Row(age=48.0, workclass=u' State-gov')]
正如你看到的,它们在一个DataFrame结构下的操作非常类似。
还有一个重要的区别。在Pandas中,布尔型切片能得到一个布尔型序列,这就意味着你能利用另外一个长度相同的DataFrame进行过滤。在Spark中你只能在DataFrame的列下进行过滤。
整合
简单的计数:
Pandas:
>>> data.groupby('workclass').workclass.count()
# or shortcut
>>> data.workclass.value_counts()
Out:
workclass
? 1836
Federal-gov 960
...
Spark:
>>> dataSpark.groupBy('workclass').count().collect()
Out:
[Row(workclass=u' Self-emp-not-inc', count=2541),
Row(workclass=u' Federal-gov', count=960),
Row(workclass=u' ?', count=1836),
...]
花式整合:
Pandas:
>>> data.groupby('workclass').agg({'final_weight': 'sum', 'age': 'mean'})
Out:
age final_weight
workclass
? 40.960240 346115997
Federal-gov 42.590625 177812394
Local-gov 41.751075 394822919
...
Spark:
>>> dataSpark.groupBy('workclass').agg({'capital_gain': 'sum', 'education_num': 'mean'}).collect()
Out:
[Row(workclass=u' Self-emp-not-inc', AVG(age)=44.96969696969697, SUM(final_weight)=446221558.0),
Row(workclass=u' Federal-gov', AVG(age)=42.590625, SUM(final_weight)=177812394.0),
...]
语法非常简单,你可以很轻易的进行标准的整合。很遗憾,Spark DataFrame不支持定制化的整合功能,你只能使用内建的一些功能。可以以特定的方式(使用Hive UDAF或者转换成原生RDD)来整合数据,但是这样既不方便又低效。
映射
定义一个函数:
def f(workclass, final_weight):
if "gov" in workclass.lower():
return final_weight * 2.0
else:
return final_weight
Pandas:
>>> pandasF = lambda row: f(row.workclass, row.final_weight)
>>> data.apply(pandasF, axis=1)
Out:
0 155032
1 83311
...
Spark:
>>> sparkF = pyspark.sql.functions.udf(f, pyspark.sql.types.IntegerType())
>>> dataSpark.select(
sparkF(dataSpark.workclass, dataSpark.final_weight).alias('result')
).collect()
Out:
[Row(result=155032.0),
Row(result=83311.0),
...]
正如你看到的,两者都能很容易在某一列或某几列上应用特定的函数,甚至可以重用相同的函数,它们仅仅是在函数封装上有所区别。对于Spark UDFs来说一个小小的不便在于它需要我们指定返回前期类型的函数。
对比:Spark相对Pandas的劣势
以上都是好的方面,Spark和Pandas在使用起来非常相似而且好用。虽然Spark DataFrame比其他大数据工具更易用,但是它毕竟是Spark生态圈中比较年轻的一员,存在一些毛边。让我们看几个例子。
连接
定义两个非常简单的DataFrame用来实现连接的例子:
>>> pandasA = pd.DataFrame([
[1, "te", 1],
[2, "pandas", 4]],
columns=['colX', 'colY', 'colW'])
>>> pandasB = pd.DataFrame([
[3.0, "st", 2],
[4.0, "spark", 3]],
columns=['colY', 'colZ', 'colX'])
>>> sparkA = sqlc.createDataFrame(pandasA)
>>> sparkB = sqlc.createDataFrame(pandasB)
>
Pandas有一个超级简单的语法,就是利用merge方法:
Pandas:
>>> pandasA.merge(pandasB, on='colX', suffixes=('_A', '_B'), how='left')
Out:
colX colY_A colW colY_B colZ
0 1 te 1 NaN NaN
1 2 pandas 4 3 st
Pandas具备一些有用的特征:
指定连接键来替换条件匹配
对于同名的列自动添加后缀
对于连接键仅仅保留一份副本
现在看看Spark中它怎么实现的。
Spark:
>>> joined = sparkA.join(sparkB, sparkA.colX == sparkB.colX, 'left_outer')
>>> joined.toPandas()
Out:
colX colY colW colY colZ colX
0 1 te 1 NaN None NaN
1 2 pandas 4 3 st 2
正如你看到的,Spark DataFrame需要完全的条件匹配,因此它会保留两份副本并且不会添加后缀。这是有问题的,因为我们不能使用df.col来选择列。当你使用collect函数来替代toPandas函数时,结果会令人疑惑,因为它看起来像是第2列覆盖了第1列一样(事实上不是这样的,但是确实令人疑惑)
Spark:
>>> joined.collect()
Out:
[Row(colX=None, colY=None, colW=1, colY=None, colZ=None, colX=None),
Row(colX=2, colY=3.0, colW=4, colY=3.0, colZ=u'st', colX=2)]
为了达到与Pandas同样的效果,我们需要像下面这么做:
Spark:
>>> sparkARenamed = sparkA \
.withColumnRenamed('colY', 'colY_A')
>>> sparkBRenamed = sparkB \
.withColumnRenamed('colX', 'colX_B') \
.withColumnRenamed('colY', 'colY_B')
>>> sparkARenamed.join(sparkBRenamed, sparkARenamed.colX == sparkBRenamed.colX_B, 'left_outer') \
.select('colX', 'colY_A', 'colW', 'colY_B', 'colZ') \
.toPandas()
Out:
colX colY_A colW colY_B colZ
0 1 te 1 NaN None
1 2 pandas 4 3 st
啊,这么差劲!
联合
现在试着联接/联合DataFrame。
Pandas:
>>> pd.concat([pandasA, pandasB])
Out:
colW colX colY colZ
0 1 1 te NaN
1 4 2 pandas NaN
0 NaN 2 3 st
1 NaN 3 4 spark
结果看起来比较合理。Pandas从两个DataFrame中匹配列,并且对于列中缺失数据填充空值(NaNs)
Spark:
>>> sparkA.unionAll(sparkB).collect()
Out:
[Row(colX=1.0, colY=u'te', colW=1),
Row(colX=2.0, colY=u'pandas', colW=4),
Row(colX=3.0, colY=u'st', colW=2),
Row(colX=4.0, colY=u'spark', colW=3)]
很显然这是错误的。Spark的联合功能非常有限,它仅仅在两个DataFrame具有完全相同的模式下才有效(包括列的顺序),但是它不抛出任何错误,所以很容易掉进这个陷阱。
修正Spark的劣势
正如你看到的,Spark有这些劣势,我们能做些什么呢?
看起来像是输入数据的问题,那么我们可以写一个装饰器,来在Spark DataFrame上实现类似Pandas的concat函数。
Spark:
def addEmptyColumns(df, colNames):
exprs = df.columns + ["null as " + colName for colName in colNames]
return df.selectExpr(*exprs)
def concatTwoDfs(left, right):
# append columns from right df to left df
missingColumnsLeft = set(right.columns) - set(left.columns)
left = addEmptyColumns(left, missingColumnsLeft)
# append columns from left df to right df
missingColumnsRight = set(left.columns) - set(right.columns)
right = addEmptyColumns(right, missingColumnsRight)
# let's set the same order of columns
right = right[left.columns]
# finally, union them
return left.unionAll(right)
def concat(dfs):
return reduce(concatTwoDfs, dfs)
现在我们可以以Pandas的风格来连接Spark DataFrame了:
Spark:
>>> concat([sparkA, sparkB]).collect()
Out:
[Row(colX=1, colY=u'te', colW=1, colZ=None),
Row(colX=2, colY=u'pandas', colW=4, colZ=None),
Row(colX=2, colY=u'3.0', colW=None, colZ=u'st'),
Row(colX=3, colY=u'4.0', colW=None, colZ=u'spark')]
作为练习,你可以写一个实现类似Pandas的merge函数。
总结
我仅仅讲了Pandas和Spark下DataFrame性能的一小部分,但是我希望你因此能得到启发。
Spark DataFrame就目前阶段来说是强大的而且易用的,然而却有一些问题,在可用性上仍需要与Pandas缩小差距——在某些方面接口显得太原生并导致非直观的假设,一些重要的如定制化集成的功能模块缺失。欣慰的是,这些都已经在Spark的发展规划内,相信不久的将来这些问题将会被解决。
另一方面,采用定制化的封装能非常方便的扩展基础函数的功能。我们几个月前开始使用Spark并且解决了我们遇到过的几乎所有的问题。
如果你在使用Pandas并且想处理更大规模的数据集,那么使用PySpark和DataFrame吧。
转载请注明:宁哥的小站 » “Pandas”化你的Spark DataFrames