ChatGPT解决这个技术问题 Extra ChatGPT

Spark - load CSV file as DataFrame?

I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

I have tried:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error which I got:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What is the right command to load CSV file as DataFrame in Apache Spark?


s
sumitya

spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")


m
mrsrinivas

Parse CSV and load as DataFrame/DataSet with Spark 2.x

First, initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

1. Do it in a programmatic way

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Update: Adding all options from here in case the link will be broken in future

path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.

header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.

delimiter: by default columns are delimited using, but delimiter can be set to any character

quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored

escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored

parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.

mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are: PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored. DROPMALFORMED: drops lines that have fewer or more tokens than expected or tokens which do not match the schema FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names

PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.

DROPMALFORMED: drops lines that have fewer or more tokens than expected or tokens which do not match the schema

FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names

inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.

nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame

dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().

2. You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark version < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

do this session require hive? I am getting hive errors.
No need. Only spark-core_2.11 and spark-sql_2.11 of 2.0.1 version is fine. If possible add the error message.
can we convert a pipe delimited file to a dataframe?
@OmkarPuttagunta: Yes, off course! try some thing like this spark.read.format("csv").option("delimiter ", "|") ...
The other option for programmatic way is to leave off the .format("csv") and replace .load(... with .csv(.... The option method belongs to the DataFrameReader class as returned by the read method, where the load and csv methods return a dataframe so can't have options tagged on after they are called. This answer is pretty thorough but you should link to the documentation so people can see all the other CSV options available spark.apache.org/docs/latest/api/scala/…*):org.apache.spark.sql.DataFrame
E
Eric Yiwei Liu

It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)

O
OneCricketeer

With Spark 2.0, following is how you can read CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)

Is there a difference between spark.read.csv(path) and spark.read.format("csv").load(path)?
O
OneCricketeer

In Java 1.8 This code snippet perfectly working to read CSV files

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();

While this may be useful to someone. The question has a Scala tag.
s
stevel

Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option inferSchema to true

Here, then, assumming that spark is a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.

(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)


I hadn't seen this csv method or passing a map to options. Agreed always better off providing explicit schema, inferSchema is fine for quick n dirty (aka data science) but terrible for ETL.
k
karthiks

There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.

The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code

Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.

Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.


s
swapnil shashank

In case you are building a jar with scala 2.11 and Apache 2.0 or higher.

There is no need to create a sqlContext or sparkContext object. Just a SparkSession object suffices the requirement for all needs.

Following is mycode which works fine:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

In case you are running in cluster just change .master("local") to .master("yarn") while defining the sparkBuilder object

The Spark Doc covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html


This is same as existing answers
s
stack0114106

With Spark 2.4+, if you want to load a csv from a local directory, then you can use 2 sessions and load that into hive. The first session should be created with master() config as "local[*]" and the second session with "yarn" and Hive enabled.

The below one worked for me.

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

When ran with spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar it went fine and created the table in hive.


e
ejuhjav

Add following Spark dependencies to POM file :

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

Spark configuration:

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

Read csv file:

val df = spark.read.option("header", "true").csv("FILE_PATH")

Display output:

df.show()

A
Ajay Ahuja

Try this if using spark 2.0+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Note:- this work for any delimited file. Just use option(“delimiter”,) to change value.

Hope this is helpful.


This is same as existing answers
V
Venkat Kotra

To read from relative path on the system use System.getProperty method to get current directory and further uses to load the file using relative path.

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

spark:2.4.4 scala:2.11.12


t
tazak

Default file format is Parquet with spark.read.. and file reading csv that why you are getting the exception. Specify csv format with api you are trying to use


P
Piyush Patel

With in-built Spark csv, you can get it done easily with new SparkSession object for Spark > 2.0.

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

There are various options you can set.

header: whether your file includes header line at the top

inferSchema: whether you want to infer schema automatically or not. Default is true. I always prefer to provide schema to ensure proper datatypes.

mode: parsing mode, PERMISSIVE, DROPMALFORMED or FAILFAST

delimiter: to specify delimiter, default is comma(',')


关注公众号,不定期副业成功案例分享
Follow WeChat

Success story sharing

Want to stay one step ahead of the latest teleworks?

Subscribe Now