Amazon S3 buckets are good place for storing files, it can be used to store file in long term. Even, it’s possible to store your database in a “queryable” format, without copying the whole dump of the database. Apache Parquet is a columnar format that allows storing your original data preserving and the schema, optimizations to speed up queries and is a far more efficient file format than CSV or JSON, with support on many Data Support System like Apache Spark.
Let’s use an possible use case: The database is growing with data every day, reaching a point where is hard to scale the database size, and the old data which might be used occasionally, but in the practice it’s rarely used on the day a day, maybe it’s there just for a retention policy. Moving the oldest data to an “archive” structure, which allows being queried when required might be strategy to follow.
In that scenario, we could have a SQL database (Postgres, MySQL, Oracle, MS Sql Server, etc), the storage for the archive data an AWS S3 Bucket, persisting data in Parquet Files (allow being queryable, distributed, partitioned) and using Apache Spark to migrate the data from the database. All that with just a few line of code.
Running a Task in Apache Spark
Apache Spark needs to be installed in some place, it could in our local by running the Stand Alone setup, or in cluster (Yarn, Mesos or Kubernetes). Using a cluster will allow to have different nodes running a simple task in distributed mode.
If you want to know more about Spark submit, and how to run a task follow this article https://spark.apache.org/docs/latest/submitting-applications.html
Creating the Spark Task class
Spark can run a jar
file, with all the classes required for the task. So, let’s create a new java project. It is time to create a class, a regular class with a main method.
public class DatabaseToS3TransferTask { public static void main(String[] args){ } }
And, add the spark libraries to the project. Spark SQL already includes the spark core as a transitive dependency, it helps interact to interact with a SQL database from Spark, which is going to be used to find the records to migrate.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.13</artifactId> <version>3.5.0</version> <scope>provided</scope> <!-- Provided by the Spark installation --> </dependency> <dependency> <!-- Adding the JDBC driver for the used SQL engine --> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.7.1</version> </dependency>
//Provided by the Spark installation compileOnly 'org.apache.spark:spark-sql_2.13:3.5.0' //Adding the JDBC driver for the used SQL engine implementation 'org.postgresql:postgresql:42.7.1'
Reading from the SQL Database in Spark
Now, let’s add some logic to initialize the context of Spark in the code, and read data from the SQL database.
import org.apache.spark.SparkConf; import org.apache.spark.sql.*; public class DatabaseToS3TransferTask { public static void main(String[] args){ var sparkConf = new SparkConf() var spark = SparkSession.builder .config(sparkConf) .getOrCreate(); String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'"; String dbUrl = "jdbc:postgresql://<database-connection?"; var df = spark.read .format("jdbc") .option("url", dbUrl) .option("query", sql) .option("user", "database-user") .option("password", "database-password") .option("driver", "org.postgresql.Driver") .load(); } }
Writing Parquet files with Spark
The next step is to write the found data into S3 in parquet format. Let’s say there is a AWS S3 bucket with name MyBucket
, and into it, there is a directory archive/
which contains another directory my_table
for all the archive data of the my_table
table.
df.write.parquet("s3a://MyBucket/archive/my_table/");
But, let say, it might be required to migrate the data “monthly”, so into the location MyBucket/archive/my_table/
a folder for each month is desired, containing only the data corresponding to that month. Spark provides “partition by” out of the box for one of our columns in the data, in this case the queries returns a column name subMonth
with the format YYYYMM
, with values like 202401
, 202312
, 202311
… 201902
, etc.
df.write .partitionBy("subMonth") .parquet("s3a://MyBucket/archive/my_table/");
That will generate folders with the following structure:
MyBucket/ archive/ my_table/ 202401/ a_file.parquet 202312/ another_file.parquet 202311/ the_file.parquet .....
Accessing to S3 with Spark
In the previous step a location in a S3 Bucket is indicated, however to make it work properly it’s required to provide the credentials to access that AWS account. Hopefully, with Apache Spark that is very easy, and it’s possible by adding a Hadoop AWS dependency in the project and adding a few lines of code.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>3.3.6</version> </dependency>
implementation 'org.apache.hadoop:hadoop-aws:3.3.6'
Then, using some Hadoop settings in the Spark Context. Depending on the way you have to connect, you could use different settings.
Access and Secret Key
When you have a ACCESS
and SECRET
keys, generate from the web-console, you can use them this way.
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");
Temporary token
Because ACCESS
and SECRET
keys are not usually shared in some companies, it’s required to get credentials through some authentication which generates a session token for some minutes, and during that time you can authenticate generating temporary token. It’s required to indicate the SESSION_TOKEN
and modify the provider to org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token", "SESSION_TOKEN"); spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
Instance Profile
Your task might be running as part of a EC2 instance in AWS, in that case it’s not required to provide credentials because it’s already on the AWS account. For example, if you are running a EMR task or using Kubernetes as the Spark Cluster through AWS EKS.
In that case, just indicated the credential provider as com.amazonaws.auth.InstanceProfileCredentialsProvider
.
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");
In a Nutshell
Basically, that process to migrate your data from a SQL database to a S3 bucket using Parquet files could be summarize in the next lines of code. Something that could sound as complex, a nightmare is a simple and straight forward solution.
package com.orthanc.spark; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; public class DatabaseToS3TransferTask { public static void main(String[] args){ //Initializing Spark Context var sparkConf = new SparkConf() var spark = SparkSession.builder .config(sparkConf) .getOrCreate(); spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com"); spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider"); //Reading from database String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'"; String dbUrl = "jdbc:postgresql://<database-connection?"; var df = spark.read .format("jdbc") .option("url", dbUrl) .option("query", sql) .option("user", "database-user") .option("password", "database-password") .option("driver", "org.postgresql.Driver") .load(); //Writing on S3 partitioning by Submonth in Parquet format df.write .partitionBy("subMonth") .parquet("s3a://MyBucket/archive/my_table/"); } }
And that could be submitted with a command like:
./bin/spark-submit --class com.orthanc.spark.DatabaseToS3TransferTask --master spark://207.184.161.138:7077 --deploy-mode client --conf <key>=<value> ... # other options my_generated_file.jar
The Spark cluster will be in charge of doing the whole process, using Spark it’s possible to distribute it in different nodes using executors, assign multiple cpu cores, memory. Making that task an efficient workload.
References
- https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
- https://sparkbyexamples.com/spark/spark-read-write-parquet-file-from-amazon-s3/
- https://www.javacodegeeks.com/2024/01/apache-spark-unleashing-big-data-power.html
- https://stackoverflow.com/questions/54223242/aws-access-s3-from-spark-using-iam-role