Python Spark DataFrame: replace null with SparseVector

 

Questions


In spark, I have following data frame called “df” with some null entries:

+-------+--------------------+--------------------+                     
|     id|           features1|           features2|
+-------+--------------------+--------------------+
|    185|(5,[0,1,4],[0.1,0...|                null|
|    220|(5,[0,2,3],[0.1,0...|(10,[1,2,6],[0.1,...|
|    225|                null|(10,[1,3,5],[0.1,...|
+-------+--------------------+--------------------+

df.features1 and df.features2 are type vector (nullable). Then I tried to use following code to fill null entries with SparseVectors:

df1 = df.na.fill({"features1":SparseVector(5,{}), "features2":SparseVector(10, {})})

This code led to following error:

AttributeError: 'SparseVector' object has no attribute '_get_object_id'

Then I found following paragraph in spark documentation:

fillna(value, subset=None)
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

Parameters: 
value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.

Does this explain my failure to replace null entries with SparseVectors in DataFrame? Or does this mean that there’s no way to do this in DataFrame?

I can achieve my goal by converting DataFrame to RDD and replacing None values with SparseVectors, but it will be much more convenient for me to do this directly in DataFrame.

Is there any method to do this directly in DataFrame?
Thanks!

 

 

————————————————-

Answer

You can use udf:

from pyspark.sql.functions import udf, lit
from pyspark.ml.linalg import *

fill_with_vector = udf(
    lambda x, i: x if x is not None else SparseVector(i, {}),
    VectorUDT()
)

df = sc.parallelize([
    (SparseVector(5, {1: 1.0}), SparseVector(10, {1: -1.0})), (None, None)
]).toDF(["features1", "features2"])

(df
    .withColumn("features1", fill_with_vector("features1", lit(5)))
    .withColumn("features2", fill_with_vector("features2", lit(10)))
    .show())

# +-------------+---------------+
# |    features1|      features2|
# +-------------+---------------+
# |(5,[1],[1.0])|(10,[1],[-1.0])|
# |    (5,[],[])|     (10,[],[])|
# +-------------+---------------+

apache-spark,pyspark,pyspark-sql,python,spark-dataframe

Facebook Comments

Post a comment