Overview

PySpark是提供了Python语言API接口的Spark,经过我的初步使用,未发现和Scala APISpark有太大差别。且我们服务器上已经配置好了PySpark,正好配合Jupyter notebook使用来进行机器学习离线训练模型。
从这篇文章开始,将从一个算法工程师的视角去记录一下Spark的使用。
Spark在我看来就是一个计算工具,用来处理单机计算不了的问题。单机解决不了的问题,无非就是数据太大,内存,磁盘以及处理器不够用,然后把数据拆开给多台机器一起计算。Spark相比较于Hadoop,有一个长处就在于计算。而在机器学习过程当中,从数据预处理到模型训练都需要运算,尤其是数据量比较大的时候。我们尝试对比单机机器学习和分布式机器学习的方式,来加强对Spark的理解。

1. 建立Spark会话并读取文件

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("read data") \ #给任务命名
    .master("spark://**.**.**.***:7077") \ #指定spark集群的主节点ip和端口
    .config("spark.cores.max", "10") \ #给任务分配CPU
    .config("spark.driver.memory", "2G") \ #给每个driver分配内存
    .config("spark.executor.memory", "2G") \ #给每个executor分配内存
    .config("spark.executor.cores", "2")\ #给每个executor分配CPU
    .getOrCreate() #如果当前有那么就复用,如果没有就新建

我们在单机上是这样读数据的(一般是从csv文件中读取):

import pandas as pd
df_single = pd.read_csv('data_total.csv')

Pandas会把csv文件读取成DataFrame
Spark的处理逻辑也是一样的,会将数据从csv/json/hive/数据库/Hadoop读取为DataFrame,让分析师和算法工程师无缝对接。而语法也是比较简单。

df_spark = spark.read.format("csv").option("header", true").load('hdfs://**.**.**.***:39000/users.csv')

spark.read会返回一个DataFrameReader的实例,通过format指定DataFrameReader读取的数据的格式。
当然,也可以这样读取:

df_spark = spark.read.csv(path='hdfs://**.**.**.***:39000/users.csv', sep=',',  header=True)
df_spark.dtypes

数据类型如下:

[('order_sn', 'string'),
 ('apply_time', 'string'),
 ('label', 'string'),
 ...
 ('customer_blood_type_a', 'string'),
 ('customer_blood_type_ab', 'string'),
 ('customer_blood_type_b', 'string'),
 ('customer_blood_type_o', 'string'),
 ('customer_blood_type_other', 'string')]

我们可以看到,每一个字段的数据类型都被识别为string,这是因为我们load文件时,没有加inferSchema=True。加上这个条件就正常了。

df_spark = spark.read.csv(path='hdfs://**.**.**.***:39000/users.csv', sep=',',  header=True, inferSchema=True)
df_spark.dtypes

2. 划分数据集和筛选特征

我们根据时间来划分数据集:

df_train = df_total.filter(df_total.apply_time < '2019-10-21 00:00:00')
df_train.count()
df_val = df_total.filter((df_total.apply_time >= '2019-10-21 00:00:00') & (df_total.apply_time < '2019-10-27 00:00:00'))
df_val.count()
df_test = df_total.filter(df_total.apply_time >= '2019-10-27 00:00:00')
df_test.count()

填充空值:

df_train = df_train.fillna(0.0)
df_val = df_val.fillna(0.0)
df_test = df_test.fillna(0.0)

选择自己需要的列,首先加载特征列表:

feature_file = os.path.join("feature_list.txt")
feature_cols = [line.rstrip('\n') for line in open(feature_file)]
feature_cols.append("label")

然后选择需要的列,包括label

df_train = df_train.select(feature_cols)
df_val = df_val.select(feature_cols)
df_test = df_test.select(feature_cols)

3. 处理数据为double类型

我们如果用spark进行机器学习,除了label列之外,其他的列最好都处理为double,那么需要用castwithColumn函数:

for col, t in df_train.dtypes:
    if t != "double" and col != 'label':
        df_train = df_train.withColumn(col, df_train[col].cast("double"))
for col, t in df_val.dtypes:
    if t != "double" and col != 'label':
        df_val = df_val.withColumn(col, df_val[col].cast("double"))
for col, t in df_test.dtypes:
    if t != "double" and col != 'label':
        df_test = df_test.withColumn(col, df_test[col].cast("double"))

4. 保存处理好的数据

df_train.toPandas().to_csv("训练集特征.csv", index=False)
df_val.toPandas().to_csv("验证集特征.csv", index=False)
df_test.toPandas().to_csv("测试集特征.csv", index=False)

5. 总结

总体来说,Spark在读取数据的时候,和Pandas的体验是很接近的。可以认为是一个分布式版本的Pandas

更多可以参考官方文档