Los buckets de Amazon S3 son un buen lugar para almacenar archivos y se pueden utilizar para almacenar archivos a largo plazo. Incluso, es posible almacenar su base de datos en un formato “consultable”, sin copiar todo el volcado de la base de datos. Apache Parquet es un formato de columnas que permite almacenar los datos originales, preservar el esquema, optimizaciones para acelerar las consultas y es un formato de archivo mucho más eficiente que CSV o JSON, con soporte en muchos sistemas de soporte de datos como Apache Spark.
Usemos un posible caso de uso: la base de datos crece con datos todos los días, llegando a un punto en el que es difícil escalar el tamaño de la base de datos y los datos antiguos pueden usarse ocasionalmente, pero en la práctica rara vez se usan día a día. , tal vez esté ahí solo por una política de retención. Mover los datos más antiguos a una estructura de “archivo”, que permita consultarlos cuando sea necesario, podría ser una estrategia a seguir.
En ese escenario, podríamos tener una base de datos SQL (Postgres, MySQL, Oracle, MS Sql Server, etc.), el almacenamiento de los datos archivados en un AWS S3 Bucket, datos persistentes en archivos Parquet (permiten ser consultables, distribuidos, particionados) y usando Apache Spark para migrar los datos de la base de datos. Todo eso con sólo unas pocas líneas de código.
Running a Task in Apache Spark
Apache Spark debe instalarse en algún lugar, podría hacerlo en nuestro local ejecutando la configuración independiente o en un clúster (Yarn, Mesos o Kubernetes). El uso de un clúster permitirá tener diferentes nodos ejecutando una tarea simple en modo distribuido.
Si desea obtener más información sobre el envío de Spark y cómo ejecutar una tarea, siga este artículo: https://spark.apache.org/docs/latest/submitting-applications.html
Creando una clase Task de Spark
Spark puede ejecutar un archivo jar
, con todas las clases necesarias para la tarea. Entonces, creemos un nuevo proyecto Java. Es hora de crear una clase, una clase normal con un método principal.
public class DatabaseToS3TransferTask { public static void main(String[] args){ } }
Y agregue las bibliotecas Spark al proyecto. Spark SQL ya incluye el core de Spark como una dependencia transitiva, ayuda a interactuar con una base de datos SQL de Spark, que se utilizará para encontrar los registros a migrar.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.13</artifactId> <version>3.5.0</version> <scope>provided</scope> <!-- Provided by the Spark installation --> </dependency> <dependency> <!-- Adding the JDBC driver for the used SQL engine --> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.7.1</version> </dependency>
//Provided by the Spark installation compileOnly 'org.apache.spark:spark-sql_2.13:3.5.0' //Adding the JDBC driver for the used SQL engine implementation 'org.postgresql:postgresql:42.7.1'
Leyendo la base de datos con Spark
Ahora, agreguemos algo de lógica para inicializar el contexto de Spark en el código y leer datos de la base de datos SQL.
import org.apache.spark.SparkConf; import org.apache.spark.sql.*; public class DatabaseToS3TransferTask { public static void main(String[] args){ var sparkConf = new SparkConf() var spark = SparkSession.builder .config(sparkConf) .getOrCreate(); String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'"; String dbUrl = "jdbc:postgresql://<database-connection?"; var df = spark.read .format("jdbc") .option("url", dbUrl) .option("query", sql) .option("user", "database-user") .option("password", "database-password") .option("driver", "org.postgresql.Driver") .load(); } }
Escribiendo archivos Parquet con Spark
El siguiente paso es escribir los datos encontrados en S3 en formato parquet. Digamos que hay un bucket de AWS S3 con el nombre MyBucket
, y en él hay un directorio archive/
que contiene otro directorio my_table
para todos los datos de archivo de la tabla my_table
.
df.write.parquet("s3a://MyBucket/archive/my_table/");
Pero, digamos, podría ser necesario migrar los datos “mensualmente”, por lo que en la ubicación MyBucket/archive/my_table/
se desea una carpeta para cada mes, que contenga solo los datos correspondientes a ese mes. Spark proporciona “partición de datos” para usar con algunas de nuestras columnas en los datos; en este caso, las consultas devuelven un nombre de columna subMonth
con el formato YYYYMM
, con valores como 202401
, 202312
, 202311
… 201902
, etc.
df.write .partitionBy("subMonth") .parquet("s3a://MyBucket/archive/my_table/");
Eso generará carpetas con la siguiente estructura:
MyBucket/ archive/ my_table/ 202401/ a_file.parquet 202312/ another_file.parquet 202311/ the_file.parquet .....
Accediendo a S3 con Spark
En el paso anterior se indica una ubicación en un bucket S3, sin embargo para que funcione correctamente es necesario proporcionar las credenciales para acceder a esa cuenta de AWS. Con suerte, con Apache Spark eso es muy fácil y es posible agregando una dependencia de Hadoop AWS en el proyecto y agregando algunas líneas de código.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>3.3.6</version> </dependency>
implementation 'org.apache.hadoop:hadoop-aws:3.3.6'
Luego, usando algunas configuraciones de Hadoop en Spark Context. Dependiendo de la forma que tengas de conectarte, podrás utilizar diferentes configuraciones.
Access Key y Secret Key
Cuando se tiene un ACCESS
y SECRET
keys, generadas desde la consola web de AWS, se pueden usar de esta manera.
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY");
Token Temporal
Debido a que las ACCESS
y SECRET
keys no son usualmente compartidas en muchas compañias, es necesario obtener credenciales a través de alguna autenticación que genera un token de sesión durante algunos minutos, y durante ese tiempo puede autenticarse generando un token temporal. Se requiere indicar el SESSION_TOKEN
y modificar el proveedor a apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "ACCESS_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "SECRET_KEY"); spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token", "SESSION_TOKEN"); spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
Instance Profile
Es posible que la tarea se esté ejecutando como parte de una instancia EC2 en AWS; en ese caso, no es necesario proporcionar credenciales porque ya está en la cuenta de AWS. Por ejemplo, si está ejecutando una tarea de EMR o utilizando Kubernetes como Spark Cluster a través de AWS EKS.
En este caso, solo se debe indicar el “credentials provider” como com.amazonaws.auth.InstanceProfileCredentialsProvider
.
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com") spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider");
En resumen
Básicamente, ese proceso para migrar sus datos desde una base de datos SQL a un bucket S3 utilizando archivos Parquet podría resumirse en las siguientes líneas de código. Algo que podría parecer en principio (y utilizando otras tecnologías) complejo y una pesadilla, es realmente una solución simple y directa.
package com.orthanc.spark; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; public class DatabaseToS3TransferTask { public static void main(String[] args){ //Initializing Spark Context var sparkConf = new SparkConf() var spark = SparkSession.builder .config(sparkConf) .getOrCreate(); spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com"); spark.sparkContext.hadoopConfiguration .set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider"); //Reading from database String sql = "SELECT * from my_table mt WHERE mt.table > '2024-01-01'"; String dbUrl = "jdbc:postgresql://<database-connection?"; var df = spark.read .format("jdbc") .option("url", dbUrl) .option("query", sql) .option("user", "database-user") .option("password", "database-password") .option("driver", "org.postgresql.Driver") .load(); //Writing on S3 partitioning by Submonth in Parquet format df.write .partitionBy("subMonth") .parquet("s3a://MyBucket/archive/my_table/"); } }
Y eso podría enviarse con un comando como:
./bin/spark-submit --class com.orthanc.spark.DatabaseToS3TransferTask --master spark://207.184.161.138:7077 --deploy-mode client --conf <key>=<value> ... # other options my_generated_file.jar
El cluster de Spark se encargará de hacer todo el proceso, usando Spark es posible distribuirlo en diferentes nodos usando ejecutores, asignar múltiples núcleos de CPU, memoria. Hacer de esa tarea una carga de trabajo eficiente.
Referencias
- https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
- https://sparkbyexamples.com/spark/spark-read-write-parquet-file-from-amazon-s3/
- https://www.javacodegeeks.com/2024/01/apache-spark-unleashing-big-data-power.html
- https://stackoverflow.com/questions/54223242/aws-access-s3-from-spark-using-iam-role