GCP Series ~ Part 2: Connecting Google Cloud Storage using PySpark

Gobalakrishnan Viswanathan
6 min readMar 6, 2024

Unleashing the power of Pyspark by connecting it with GCP’s Cloud Storage service.

As a part of my GCP learning, I was learning Google Cloud Storage (GCS). It is an Object storage service that is capable of storing anything from images, documents, and text files to large parquets, etc. If you want to have an introduction to GCS, you can head into my story GCS. It will give you a good introduction to the Service.

Since I am a Python Automation Developer with Big data stacks, I always explore new Big Data technologies and their connection with Python. For this time, I was doing some hands-on by integrating Pyspark and GCS. I am sharing my hands-on knowledge here to make it useful for someone.

Objective:

My objective is simple here. I wanted to explore how to make a connection between Pysaprk and GCS. My scenario here is,

  1. I have an employee.csv file in the GCS bucket.
  2. This CSV file has five columns id, name, age, dept, and salary. Now, My Pyspark script needs to read this employee.csv file from GCS and create two new columns first_name, and last_name from the name column.
  3. After doing this transformation, I want to write back the output to the same GCS bucket in CSV format.

Let's discuss how to achieve this task.

  • GCP account
    There is a Free tier option available in GCP that can be used for this purpose where we have credits to use its services. Use your email to create a new GCP free-tier account. Once you create an account, we need to do three basic things.
    1. Setup a New Project
    2. Create New Bucket
    3. Upload employee.csv to the newly created bucket.
    All these steps are clearly explained in my post and you can get the details here.
  • Service Account in GCP
    For connecting with GCS using Pyspark from our local, We need to have a service account created in GCP. Once the account is created, We need to create a key which is a JSON file. we will use this file to connect with GCS through our Pyspark programming.
    1. In the GCP console Home Page, search “Service Accounts” and select it. You will see the page of Service Accounts. You may see the table on the page with a few already available default service accounts.
Service Accounts Page
  • Click on the “CREATE SERVICE ACCOUNT”. Type the service account name and description as you want and click CREATE AND CONTINUE.
service account details
  • Next, we need to give access to this service account so that it can access GCP programmatically. We need STORAGE OBJECT ADMIN access to connect with GCS as given below. We can add as much access as we need for this account. For now, this is sufficient.
grant GCS access to service account
  • The next configuration is optional and let's not use it for our objective. Click done and create the service account. Once created, we need to generate the JSON key for this service account that we will use for the connection.
  • Click on your newly created service account and click on KEYS from the top nav menu. Click on the add key dropdown and select Create new key. This will ask about the type of our key file, and select JSON from it. Now a JSON key file will be downloaded and stored in your local. Please be careful about this and don't share it with unauthorized persons.
  • You can rename it whatever. we will use this key JSON file in our spark program. Now it's the time to go through our Pyspark program.
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession.builder \
.appName('pyspark-run-with-gcp-bucket') \
.config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
.config("spark.sql.repl.eagerEval.enabled", True) \
.getOrCreate()

# Set GCS credentials if necessary
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/home/gv/PycharmProjects/DMF/dmf/configs/GCS_SERVICE_ACCOUNT_KEY.json")

# Define GCS bucket and file path
bucket_name = "dataproc_practice_bucket"
file_name = "employee.csv"
file_path = f"gs://{bucket_name}/{file_name}"

split_column = "name"
split_by = " "
new_columns = ["first_name", "last_name"]

# Read CSV file from GCS into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Split the name column into first name and last name
split_columns = [
split(df[split_column], split_by, len(new_columns)).getItem(i).alias(new_columns[i])
for i in range(len(new_columns))
]
df = df.select("*", *split_columns)

# Coalesce the DataFrame to a single partition
df = df.coalesce(1)

# Write the DataFrame to a single CSV file in GCS
df.write.csv(f"gs://{bucket_name}/employee_output/", header=True)

# Stop SparkSession
spark.stop()
  1. The first three lines are importing necessary packages related to Pyspark.
  2. The next line is to build a spark session with the app name and other configurations. here, spark.jars is an important configuration where we added the HTTP address for the jar file. Pypark will automatically get the file at the run time and add it as a dependency to the program.
    (Usually, I pass the actual jar file to this spark.jars configuration option which was working fine for connecting Pyspark with RDBMS databases. But in this case, If I download the jar and add it, somehow it is not working. Maybe, for relational databases, it was using JDBC driver files, But I am not sure about it here though. If anyone knows the reason, Please comment it I would love to know the reason.)
  3. The next line is important here. We are adding our service account’s key JSON file as an argument to the configuration to get access to our bucket.
  4. The next lines are creating a file path for our GCS file using the bucket name and file name. here gs:// is a common format we use for contacting Google Storage files.
  5. Then we use spark’s read function to read our CSV file and create the df. below are the screenshots of the schema and data after reading the csv file from the GCS
Schena and data

6. Now, next part of the program is splitting the name column in to two new columns first_name. last_name using pyspark’s split function. below are the screen shots of the df’s schema and sample after adding two new columns.

after the split column transformation applied

Finally, we write the output again to the GCS path and then close the spark session. Our code walk-through is completed and viola !!! Now we can see the output CSV file in our GCS output path like below. Inside the employee_output directory, we will see our output CSV file with a name starting with part-*.

output after writing back to GCS.

Wow !!! We have completed the basic hands-on with GCS and Pyspark. Now we know how to read and write from GCS using our own Python with Pyspark. I know it is not a big project to showcase but we should be getting some good confidence to start and explore well-designed Data Engineering projects with services offered by GCP.

ta-ta for now. Let’s meet in the comments section and next post of course!! stay tuned. Bye.

Gobalakrishnan Viswanathan.

--

--