How do I add a new column to a Spark DataFrame (using PySpark)?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

I've tried the following without any success:

    type(randomed_hours) # => list

    # Create in Python and transform to RDD

    new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

    spark_new_col = sqlContext.createDataFrame(new_col)

    my_df_spark.withColumn("hours", spark_new_col["new_col"])

Also got an error using this:

    my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

    from pyspark.sql.functions import lit

    df = sqlContext.createDataFrame(
        [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

    df_with_x4 = df.withColumn("x4", lit(0))
    df_with_x4.show()

    ## +---+---+-----+---+
    ## | x1| x2|   x3| x4|
    ## +---+---+-----+---+
    ## |  1|  a| 23.0|  0|
    ## |  3|  B|-23.0|  0|
    ## +---+---+-----+---+

transforming an existing column:

    from pyspark.sql.functions import exp

    df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
    df_with_x5.show()

    ## +---+---+-----+---+--------------------+
    ## | x1| x2|   x3| x4|                  x5|
    ## +---+---+-----+---+--------------------+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9|
    ## |  3|  B|-23.0|  0|1.026187963170189...|
    ## +---+---+-----+---+--------------------+

included using join:

    from pyspark.sql.functions import exp

    lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    df_with_x6 = (df_with_x5
        .join(lookup, col("x1") == col("k"), "leftouter")
        .drop("k")
        .withColumnRenamed("v", "x6"))

    ## +---+---+-----+---+--------------------+----+
    ## | x1| x2|   x3| x4|                  x5|  x6|
    ## +---+---+-----+---+--------------------+----+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
    ## |  3|  B|-23.0|  0|1.026187963170189...|null|
    ## +---+---+-----+---+--------------------+----+

or generated with function / udf:

    from pyspark.sql.functions import rand

    df_with_x7 = df_with_x6.withColumn("x7", rand())
    df_with_x7.show()

    ## +---+---+-----+---+--------------------+----+-------------------+
    ## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
    ## +---+---+-----+---+--------------------+----+-------------------+
    ## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
    ## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
    ## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can

From: stackoverflow.com/q/33681487

Back to homepage or read more recommendations: