I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
I would expect this call to give me a list of the two first columns of my file but I'm getting this error :
File "", line 1, in IndexError: list index out of range
although my CSV file as more than one column.
Spark 2.0.0+
You can use built-in csv data source directly:
spark.read.csv(
"some_input_file.csv",
header=True,
mode="DROPMALFORMED",
schema=schema
)
or
(
spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv")
)
without including any external dependencies.
Spark < 2.0.0:
Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv
:
Make sure that Spark CSV is included in the path (--packages
, --jars
, --driver-class-path
)
And load your data as follows:
df = (
sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.
Note:
If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader
. Assuming you have three columns - integer, double and string:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(
sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
Alternatively, you could print the culprit (if any):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
csv
library to handle all the escaping because simply splitting by comma won't work if, say, there are commas in the values.
","
.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
print(df.collect())
And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.
For example:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c
), so it's not recommended. zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(make sure not to modify header
before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.
StringIO
. csv
can use any iterable b) __next__
shouldn't be used directly and will fail on empty line. Take a look at flatMap c) It would be much more efficient to use mapPartitions
instead of initializing reader on each line :)
rdd.mapPartitions(lambda x: csv.reader(x))
work while rdd.map(lambda x: csv.reader(x))
throws an error? I expected both to throw the same TypeError: can't pickle _csv.reader objects
. It also seems like mapPartitions
automatically calls some equivalent to "readlines" on the csv.reader
object, where with map
, I needed to call __next__
explicitly to get the lists out of the csv.reader
. 2) Where does flatMap
come in? Just calling mapPartitions
alone worked for me.
rdd.mapPartitions(lambda x: csv.reader(x))
works because mapPartitions
expects an Iterable
object. If you want to be explicit you could you comprehension or generator expression. map
alone doesn't work because it doesn't iterate over object. Hence my suggestion to use flatMap(lambda x: csv.reader([x]))
which will iterate over the reader. But mapPartitions
is much better here.
This is in PYSPARK
path="Your file path with file name"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)
Then you can check
df.show(5)
df.count()
If you want to load csv as a dataframe then you can do the following:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
It worked fine for me.
This is in-line with what JP Mercier initially suggested about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Now, there's also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csv as follows:
Assume we have the following context
sc = SparkContext
sqlCtx = SQLContext or HiveContext
First, distribute pyspark-csv.py to executors using SparkContext
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Read csv data via SparkContext and convert it to DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile()
and parse it
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
When using spark.read.csv
, I find that using the options escape='"'
and multiLine=True
provide the most consistent solution to the CSV standard, and in my experience works the best with CSV files exported from Google Sheets.
That is,
#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
inferSchema=False, header=True)
import pyspark as spark
?
spark
is already initialized. In a script submitted by spark-submit
, you can instantiate it as from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.
If you are having any one or more row(s) with less or more number of columns than 2 in the dataset then this error may arise.
I am also new to Pyspark and trying to read CSV file. Following code worked for me:
In this code I am using dataset from kaggle the link is: https://www.kaggle.com/carrie1/ecommerce-data
1. Without mentioning the schema:
from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()
Now check the columns: sdfData.columns
Output will be:
['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']
Check the datatype for each column:
sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))
This will give the data frame with all the columns with datatype as StringType
2. With schema: If you know the schema or want to change the datatype of any column in the above table then use this (let's say I am having following columns and want them in a particular data type for each of them)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)
Now check the schema for datatype of each column:
sdfData.schema
StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
Edited: We can use the following line of code as well without mentioning schema explicitly:
sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema
The output is:
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))
The output will look like this:
sdfData.show()
+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
read your csv file in such the way:
df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)
spark version is 3.0.1
Success story sharing
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(make sure to change the databricks/spark versions to the ones you have installed).spar-csv
package which supportsscala - 2.12
we are upgrading our code withScala -2.12.12 Spark - 3.0.1
and we are facing issue with_corrupt_record
not being there when the time of trying to get count, but actually it's there I can see that column in the DataFrame,