Moving data from Database to AWS S3 with Apache Spark

Advertisements

Amazon S3 buckets are good place for storing files, it can be used to store file in long term. Even, it’s possible to store your database in a “queryable” format, without copying the whole dump of the database. Apache Parquet is a columnar format that allows storing your original data preserving and the schema, optimizations to speed up queries and is a far more efficient file format than CSV or JSON, with support on many Data Support System like Apache Spark.

Let’s use an possible use case: The database is growing with data every day, reaching a point where is hard to scale the database size, and the old data which might be used occasionally, but in the practice it’s rarely used on the day a day, maybe it’s there just for a retention policy. Moving the oldest data to an “archive” structure, which allows being queried when required might be strategy to follow.

In that scenario, we could have a SQL database (Postgres, MySQL, Oracle, MS Sql Server, etc), the storage for the archive data an AWS S3 Bucket, persisting data in Parquet Files (allow being queryable, distributed, partitioned) and using Apache Spark to migrate the data from the database. All that with just a few line of code.

Running a Task in Apache Spark

Apache Spark needs to be installed in some place, it could in our local by running the Stand Alone setup, or in cluster (Yarn, Mesos or Kubernetes). Using a cluster will allow to have different nodes running a simple task in distributed mode.

If you want to know more about Spark submit, and how to run a task follow this article https://spark.apache.org/docs/latest/submitting-applications.html

Creating the Spark Task class

Spark can run a jar file, with all the classes required for the task. So, let’s create a new java project. It is time to create a class, a regular class with a main method.

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      
    }
}

And, add the spark libraries to the project. Spark SQL already includes the spark core as a transitive dependency, it helps interact to interact with a SQL database from Spark, which is going to be used to find the records to migrate.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.5.0</version>
    <scope>provided</scope>  <!-- Provided by the Spark installation -->
</dependency>

<dependency> <!-- Adding the JDBC driver for the used SQL engine -->
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.7.1</version>
</dependency>
//Provided by the Spark installation
compileOnly 'org.apache.spark:spark-sql_2.13:3.5.0'
//Adding the JDBC driver for the used SQL engine 
implementation 'org.postgresql:postgresql:42.7.1'

Reading from the SQL Database in Spark

Now, let’s add some logic to initialize the context of Spark in the code, and read data from the SQL database.

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      var sparkConf = new SparkConf()
      var spark = SparkSession.builder
              .config(sparkConf)
              .getOrCreate();
      
      String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'";
      String dbUrl = "jdbc:postgresql://<database-connection?";
      var df = spark.read
          .format("jdbc")
          .option("url", dbUrl)
          .option("query", sql)
          .option("user", "database-user")
          .option("password", "database-password")
          .option("driver", "org.postgresql.Driver")
          .load();
      
    } 
}

Writing Parquet files with Spark

The next step is to write the found data into S3 in parquet format. Let’s say there is a AWS S3 bucket with name MyBucket, and into it, there is a directory archive/ which contains another directory my_table for all the archive data of the my_table table.

df.write.parquet("s3a://MyBucket/archive/my_table/");

But, let say, it might be required to migrate the data “monthly”, so into the location MyBucket/archive/my_table/ a folder for each month is desired, containing only the data corresponding to that month. Spark provides “partition by” out of the box for one of our columns in the data, in this case the queries returns a column name subMonth with the format YYYYMM, with values like 202401 , 202312, 202311201902 , etc.

Advertisements
df.write
	.partitionBy("subMonth")
	.parquet("s3a://MyBucket/archive/my_table/");

That will generate folders with the following structure:

MyBucket/
	archive/
		my_table/
			202401/
				a_file.parquet
			202312/
				another_file.parquet
			202311/
				the_file.parquet
.....

Accessing to S3 with Spark

In the previous step a location in a S3 Bucket is indicated, however to make it work properly it’s required to provide the credentials to access that AWS account. Hopefully, with Apache Spark that is very easy, and it’s possible by adding a Hadoop AWS dependency in the project and adding a few lines of code.

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>3.3.6</version>
</dependency>
implementation 'org.apache.hadoop:hadoop-aws:3.3.6'

Then, using some Hadoop settings in the Spark Context. Depending on the way you have to connect, you could use different settings.

Access and Secret Key

When you have a ACCESS and SECRET keys, generate from the web-console, you can use them this way.

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");

Temporary token

Because ACCESS and SECRET keys are not usually shared in some companies, it’s required to get credentials through some authentication which generates a session token for some minutes, and during that time you can authenticate generating temporary token. It’s required to indicate the SESSION_TOKEN and modify the provider to org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token", "SESSION_TOKEN");
spark.sparkContext.hadoopConfiguration
            .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");

Instance Profile

Your task might be running as part of a EC2 instance in AWS, in that case it’s not required to provide credentials because it’s already on the AWS account. For example, if you are running a EMR task or using Kubernetes as the Spark Cluster through AWS EKS.

In that case, just indicated the credential provider as com.amazonaws.auth.InstanceProfileCredentialsProvider .

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration
          .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");

In a Nutshell

Basically, that process to migrate your data from a SQL database to a S3 bucket using Parquet files could be summarize in the next lines of code. Something that could sound as complex, a nightmare is a simple and straight forward solution.

package com.orthanc.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      
      //Initializing Spark Context
      var sparkConf = new SparkConf()
      var spark = SparkSession.builder
              .config(sparkConf)
              .getOrCreate();
      
      spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com");
      spark.sparkContext.hadoopConfiguration
          .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");
      
      //Reading from database
      String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'";
      String dbUrl = "jdbc:postgresql://<database-connection?";
      var df = spark.read
          .format("jdbc")
          .option("url", dbUrl)
          .option("query", sql)
          .option("user", "database-user")
          .option("password", "database-password")
          .option("driver", "org.postgresql.Driver")
          .load();
      
      //Writing on S3 partitioning by Submonth in Parquet format
      df.write
		.partitionBy("subMonth")
		.parquet("s3a://MyBucket/archive/my_table/");
    } 
}

And that could be submitted with a command like:

./bin/spark-submit 
  --class com.orthanc.spark.DatabaseToS3TransferTask 
  --master spark://207.184.161.138:7077 
  --deploy-mode client 
  --conf <key>=<value> 
  ... # other options
  my_generated_file.jar

The Spark cluster will be in charge of doing the whole process, using Spark it’s possible to distribute it in different nodes using executors, assign multiple cpu cores, memory. Making that task an efficient workload.

References

Advertisements

Leave a Reply

Your email address will not be published. Required fields are marked *