Moviendo datos de la base de datos a AWS S3 con Apache Spark

Advertisements

Los buckets de Amazon S3 son un buen lugar para almacenar archivos y se pueden utilizar para almacenar archivos a largo plazo. Incluso, es posible almacenar su base de datos en un formato “consultable”, sin copiar todo el volcado de la base de datos. Apache Parquet es un formato de columnas que permite almacenar los datos originales, preservar el esquema, optimizaciones para acelerar las consultas y es un formato de archivo mucho más eficiente que CSV o JSON, con soporte en muchos sistemas de soporte de datos como Apache Spark.

Usemos un posible caso de uso: la base de datos crece con datos todos los días, llegando a un punto en el que es difícil escalar el tamaño de la base de datos y los datos antiguos pueden usarse ocasionalmente, pero en la práctica rara vez se usan día a día. , tal vez esté ahí solo por una política de retención. Mover los datos más antiguos a una estructura de “archivo”, que permita consultarlos cuando sea necesario, podría ser una estrategia a seguir.

En ese escenario, podríamos tener una base de datos SQL (Postgres, MySQL, Oracle, MS Sql Server, etc.), el almacenamiento de los datos archivados en un AWS S3 Bucket, datos persistentes en archivos Parquet (permiten ser consultables, distribuidos, particionados) y usando Apache Spark para migrar los datos de la base de datos. Todo eso con sólo unas pocas líneas de código.

Running a Task in Apache Spark

Apache Spark debe instalarse en algún lugar, podría hacerlo en nuestro local ejecutando la configuración independiente o en un clúster (Yarn, Mesos o Kubernetes). El uso de un clúster permitirá tener diferentes nodos ejecutando una tarea simple en modo distribuido.

Si desea obtener más información sobre el envío de Spark y cómo ejecutar una tarea, siga este artículo: https://spark.apache.org/docs/latest/submitting-applications.html

Creando una clase Task de Spark

Spark puede ejecutar un archivo jar, con todas las clases necesarias para la tarea. Entonces, creemos un nuevo proyecto Java. Es hora de crear una clase, una clase normal con un método principal.

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      
    }
}

Y agregue las bibliotecas Spark al proyecto. Spark SQL ya incluye el core de Spark como una dependencia transitiva, ayuda a interactuar con una base de datos SQL de Spark, que se utilizará para encontrar los registros a migrar.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.5.0</version>
    <scope>provided</scope>  <!-- Provided by the Spark installation -->
</dependency>

<dependency> <!-- Adding the JDBC driver for the used SQL engine -->
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.7.1</version>
</dependency>
//Provided by the Spark installation
compileOnly 'org.apache.spark:spark-sql_2.13:3.5.0'
//Adding the JDBC driver for the used SQL engine 
implementation 'org.postgresql:postgresql:42.7.1'

Leyendo la base de datos con Spark

Ahora, agreguemos algo de lógica para inicializar el contexto de Spark en el código y leer datos de la base de datos SQL.

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      var sparkConf = new SparkConf()
      var spark = SparkSession.builder
              .config(sparkConf)
              .getOrCreate();
      
      String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'";
      String dbUrl = "jdbc:postgresql://<database-connection?";
      var df = spark.read
          .format("jdbc")
          .option("url", dbUrl)
          .option("query", sql)
          .option("user", "database-user")
          .option("password", "database-password")
          .option("driver", "org.postgresql.Driver")
          .load();
      
    } 
}

Escribiendo archivos Parquet con Spark

El siguiente paso es escribir los datos encontrados en S3 en formato parquet. Digamos que hay un bucket de AWS S3 con el nombre MyBucket, y en él hay un directorio archive/ que contiene otro directorio my_table para todos los datos de archivo de la tabla my_table.

df.write.parquet("s3a://MyBucket/archive/my_table/");

Pero, digamos, podría ser necesario migrar los datos “mensualmente”, por lo que en la ubicación MyBucket/archive/my_table/ se desea una carpeta para cada mes, que contenga solo los datos correspondientes a ese mes. Spark proporciona “partición de datos” para usar con algunas de nuestras columnas en los datos; en este caso, las consultas devuelven un nombre de columna subMonth con el formato YYYYMM, con valores como 202401, 202312, 202311201902, etc.

Advertisements
df.write
	.partitionBy("subMonth")
	.parquet("s3a://MyBucket/archive/my_table/");

Eso generará carpetas con la siguiente estructura:

MyBucket/
	archive/
		my_table/
			202401/
				a_file.parquet
			202312/
				another_file.parquet
			202311/
				the_file.parquet
.....

Accediendo a S3 con Spark

En el paso anterior se indica una ubicación en un bucket S3, sin embargo para que funcione correctamente es necesario proporcionar las credenciales para acceder a esa cuenta de AWS. Con suerte, con Apache Spark eso es muy fácil y es posible agregando una dependencia de Hadoop AWS en el proyecto y agregando algunas líneas de código.

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>3.3.6</version>
</dependency>
implementation 'org.apache.hadoop:hadoop-aws:3.3.6'

Luego, usando algunas configuraciones de Hadoop en Spark Context. Dependiendo de la forma que tengas de conectarte, podrás utilizar diferentes configuraciones.

Access Key y Secret Key

Cuando se tiene un ACCESS y SECRET keys, generadas desde la consola web de AWS, se pueden usar de esta manera.

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");

Token Temporal

Debido a que las ACCESS y SECRET keys no son usualmente compartidas en muchas compañias, es necesario obtener credenciales a través de alguna autenticación que genera un token de sesión durante algunos minutos, y durante ese tiempo puede autenticarse generando un token temporal. Se requiere indicar el SESSION_TOKEN y modificar el proveedor a apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");
spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token", "SESSION_TOKEN");
spark.sparkContext.hadoopConfiguration
            .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");

Instance Profile

Es posible que la tarea se esté ejecutando como parte de una instancia EC2 en AWS; en ese caso, no es necesario proporcionar credenciales porque ya está en la cuenta de AWS. Por ejemplo, si está ejecutando una tarea de EMR o utilizando Kubernetes como Spark Cluster a través de AWS EKS.

En este caso, solo se debe indicar el “credentials provider” como com.amazonaws.auth.InstanceProfileCredentialsProvider .

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
spark.sparkContext.hadoopConfiguration
          .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");

En resumen

Básicamente, ese proceso para migrar sus datos desde una base de datos SQL a un bucket S3 utilizando archivos Parquet podría resumirse en las siguientes líneas de código. Algo que podría parecer en principio (y utilizando otras tecnologías) complejo y una pesadilla, es realmente una solución simple y directa.

package com.orthanc.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

public class DatabaseToS3TransferTask {
	public static void main(String[] args){
      
      //Initializing Spark Context
      var sparkConf = new SparkConf()
      var spark = SparkSession.builder
              .config(sparkConf)
              .getOrCreate();
      
      spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com");
      spark.sparkContext.hadoopConfiguration
          .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");
      
      //Reading from database
      String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'";
      String dbUrl = "jdbc:postgresql://<database-connection?";
      var df = spark.read
          .format("jdbc")
          .option("url", dbUrl)
          .option("query", sql)
          .option("user", "database-user")
          .option("password", "database-password")
          .option("driver", "org.postgresql.Driver")
          .load();
      
      //Writing on S3 partitioning by Submonth in Parquet format
      df.write
		.partitionBy("subMonth")
		.parquet("s3a://MyBucket/archive/my_table/");
    } 
}

Y eso podría enviarse con un comando como:

./bin/spark-submit 
  --class com.orthanc.spark.DatabaseToS3TransferTask 
  --master spark://207.184.161.138:7077 
  --deploy-mode client 
  --conf <key>=<value> 
  ... # other options
  my_generated_file.jar

El cluster de Spark se encargará de hacer todo el proceso, usando Spark es posible distribuirlo en diferentes nodos usando ejecutores, asignar múltiples núcleos de CPU, memoria. Hacer de esa tarea una carga de trabajo eficiente.

Referencias

Advertisements

Leave a Reply

Your email address will not be published. Required fields are marked *