IT story

Spark SQL의 DataFrame에서 열 유형을 변경하는 방법은 무엇입니까?

hot-time 2020. 6. 23. 07:20
반응형

Spark SQL의 DataFrame에서 열 유형을 변경하는 방법은 무엇입니까?


내가 다음과 같은 일을한다고 가정 해보십시오.

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  

그러나 나는 정말로 yearas를 했습니다 Int(그리고 아마도 다른 열을 변형시킬 수도 있습니다).

내가 생각해 낼 수있는 최선은

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

조금 복잡합니다.

저는 R에서 왔고 글을 쓸 수있었습니다. 예 :

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

spark / scala 에서이 작업을 수행하는 더 좋은 방법이 있어야하기 때문에 뭔가 빠진 것 같습니다.


편집 : 최신 버전

spark 2.x부터 사용할 수 있습니다 .withColumn. 여기에서 문서를 확인하십시오.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:col:org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame

가장 오래된 답변

Spark 버전 1.4부터 열에 DataType을 사용하여 캐스트 메소드를 적용 할 수 있습니다.

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

SQL 표현식을 사용하는 경우 다음을 수행 할 수도 있습니다.

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

자세한 내용은 문서를 확인하십시오 : http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame


[편집 : 2016 년 3 월 : 투표 해 주셔서 감사합니다! 정말로,이 내가 기반 솔루션을 생각, 최선의 답변을하지 않습니다 withColumn, withColumnRenamed그리고 castmsemelman에 의해 제시, 마틴 Senne 및 기타] 간단하고 깨끗합니다.

나는 당신의 접근법이 괜찮다고 생각합니다 .Spark DataFrame는 (불변의) 행의 RDD라는 것을 기억하십시오 . 그래서 우리는 열을 실제로 대체 하지 않으며 DataFrame매번 새로운 스키마로 새로운 것을 생성 합니다.

다음 스키마가 포함 된 원본 df가 있다고 가정합니다.

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

그리고 일부 UDF는 하나 이상의 열에 정의되어 있습니다.

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

열 유형을 변경하거나 다른 데이터 프레임에서 새 DataFrame을 작성하면 다음과 같이 작성할 수 있습니다.

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

결과는 다음과 같습니다.

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

This is pretty close to your own solution. Simply, keeping the type changes and other transformations as separate udf vals make the code more readable and re-usable.


As the cast operation is available for Spark Column's (and as I personally do not favour udf's as proposed by @Svend at this point), how about:

df.select( df("year").cast(IntegerType).as("year"), ... )

to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.

In case you need this as a helper method, use:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

which is used like:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )

First, if you wanna cast type, then this:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

With same column name, the column will be replaced with new one. You don't need to do add and delete steps.

Second, about Scala vs R.
This is the code that most similar to R I can come up with:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Though the code length is a little longer than R's. That is nothing to do with the verbosity of the language. In R the mutate is a special function for R dataframe, while in Scala you can easily ad-hoc one thanks to its expressive power.
In word, it avoid specific solutions, because the foundation is good enough for you to be quick and easy to build your own domain language features.


side note: df.columns is surprisingly a Array[String] instead of Array[Column], maybe they want it look like Python pandas's dataframe.


You can use selectExpr to make it a little cleaner:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")

Java code for modifying the datatype of the DataFrame from String to Integer

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

It will simply cast the existing(String datatype) to Integer.


To convert the year from string to int, you can add the following option to the csv reader: "inferSchema" -> "true", see DataBricks documentation


So this only really works if your having issues saving to a jdbc driver like sqlserver, but it's really helpful for errors you will run into with syntax and types.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)

Generate a simple dataset containing five values and convert int to string type:

val df = spark.range(5).select( col("id").cast("string") )

the answers suggesting to use cast, FYI, the cast method in spark 1.4.1 is broken.

for example, a dataframe with a string column having value "8182175552014127960" when casted to bigint has value "8182175552014128100"

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

We had to face a lot of issue before finding this bug because we had bigint columns in production.


df.select($"long_col".cast(IntegerType).as("int_col"))

Using Spark Sql 2.4.0 you can do that:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

This method will drop the old column and create new columns with same values and new datatype. My original datatypes when the DataFrame was created were:-

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

After this I ran following code to change the datatype:-

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

After this my result came out to be:-

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)

You can use below code.

df.withColumn("year", df("year").cast(IntegerType))

Which will convert year column to IntegerType column.


In case you have to rename dozens of columns given by their name, the following example takes the approach of @dnlbrky and applies it to several columns at once:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

Uncasted columns are kept unchanged. All columns stay in their original order.


One can change data type of a column by using cast in spark sql. table name is table and it has two columns only column1 and column2 and column1 data type is to be changed. ex-spark.sql("select cast(column1 as Double) column1NewName,column2 from table") In the place of double write your data type.


Another solution is as follows:
1) Keep "inferSchema" as False
2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)

<Code>
        //Read CSV and create dataset
        Dataset<Row> enginesDataSet = sparkSession
                    .read()
                    .format("com.databricks.spark.csv")
                    .option("header", "true")
                    .option("inferSchema","false")
                    .load(args[0]);

        JavaRDD<Box> vertices = enginesDataSet
                    .select("BOX","BOX_CD")
                    .toJavaRDD()
                    .map(new Function<Row, Box>() {
                        @Override
                        public Box call(Row row) throws Exception {
                            return new Box((String)row.getString(0),(String)row.get(1));
                        }
                    });
</Code>

I think this is lot more readable for me.

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

This will convert your year column to IntegerType with creating any temporary columns and dropping those columns. If you want to convert to any other datatype, you can check the types inside org.apache.spark.sql.types package.


    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()

Another way:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")

참고URL : https://stackoverflow.com/questions/29383107/how-to-change-column-types-in-spark-sqls-dataframe

반응형