Overview

最近在做拒绝推断的尝试,需要从Cassandra快照库里面用PySpark把那些被模型拒绝的申请单子对应的特征快照取出来,这里记录一下怎么来做。

1. 取特征

被拒绝的用户是没有order_sn的,我们只能通过credit_trace_id去查找对应的快照,然而我们的快照库和模型打分库是没有办法关联的,所以,我们拿到每一条模型打分记录之后,只能去查找模型打分时间之前的该用户的最近的一条快照记录,这里就要传两个列表:customer_idam_time。我们这里用PySpark里面的窗口函数来做。
我们先读取用户列表:

df_reject = pd.read_excel('repeat_reject_score.xlsx')
df_reject.head()

包含三列:

    customer_id am_time am_credit_score
0   3966719 2020-03-09 15:25:46 434
1   4075202 2020-03-09 15:27:09 395
2   4297087 2020-03-09 15:27:19 465
3   4924323 2020-03-09 15:28:14 357
4   1305133 2020-03-09 15:29:55 429

我们取分数低的那一部分:

df_select = df_reject[(df_reject.am_credit_score>410) & (df_reject.am_credit_score<=420)]
df_select.shape

我们的特征是在feature_data表里面存着的,包含credit_trace_idft_union两部分:

feature_df = (
    spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "apac_ds")
    .option("table", "feature_data")
    .load()
    .filter("stage='scb' or stage='scab'")
    .select(['credit_trace_id',  'ft_union'])
)
feature_df.createOrReplaceTempView("features")

申请的记录在customer_trace_relation_data里面存着,包含credit_trace_idcustomer_idapply_time三部分:

customer_trace_df = (
    spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "apac_ds")
    .option("table", "customer_trace_relation_data")
    .load()
    .filter("stage='scb' or stage='scab'")
    .select(['credit_trace_id',  'customer_id', 'apply_time'])
)
customer_trace_df.createOrReplaceTempView("customer_trace")

我们导入窗口函数:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
data = df_select.values.tolist()
cols = list(df_select.columns)
df_tmp = spark.createDataFrame(data, cols)

df1 = customer_trace_df.filter(F.col('apply_time')>='2020-02-16 00:00:00')\
    .filter(F.col('apply_time')<'2020-06-07 00:00:00')\
    .join(df_tmp, (customer_trace_df.customer_id==df_tmp.customer_id) & (customer_trace_df.apply_time < df_tmp.am_time))

w = Window.partitionBy(customer_trace_df.customer_id).orderBy(customer_trace_df.apply_time.desc())

df2 = df1.withColumn('row', F.row_number().over(w)).where(F.col('row') == 1).drop("row")
df3 = df2.toPandas()
df3.head()

我们就得到了每个用户当时对应的credit_trace_id

credit_trace_id customer_id apply_time  customer_id am_time am_credit_score
0   99c8b70a-7cdc-11ea-b3cf-aece3a3b037e    1256285 2020-04-12 23:42:30.474 1256285 2020-04-13 00:42:36 428
1   23630126-685d-11ea-9ae7-02b3d9e95519    1370527 2020-03-17 21:39:42.420 1370527 2020-07-21 04:03:28 422
2   c9d6eb74-7264-11ea-a512-969a4ef250e3    1813572 2020-03-30 15:59:39.797 1813572 2020-03-30 16:59:45 423
3   958419e5-978d-4cdb-8b92-0be2aed14cf1    1857198 2020-05-07 16:20:57.264 1857198 2020-06-22 12:23:52 426
4   482f10bc-7429-11ea-92d5-82e5d1166694    2191829 2020-04-01 21:58:44.333 2191829 2020-04-01 22:58:46 430

然后我们用credit_trace_idfeatures里面就可以查到了:

credit_trace_ids = tuple(df3.credit_trace_id)
sql = f"""
select credit_trace_id, ft_union 
from features where credit_trace_id in {credit_trace_ids}
"""
df = spark.sql(sql)

保存下来:

from pyspark.sql.functions import explode, first
df_csv = df.select('credit_trace_id', explode('ft_union'))\
    .groupby('credit_trace_id')\
    .pivot('key')\
    .agg(first('value'))
df_old = df_csv.toPandas()
df_old.head()

搞定。