Reading JSON from AWS S3 and Writing to Amazon RDS for PostgreSQL

0
12

AWS Prerequisites

  1. AWS Setup:
    • Amazon S3: Ensure the JSON file is uploaded to an S3 bucket.
    • Amazon RDS: Set up an Amazon RDS instance with PostgreSQL. Note the endpoint, database name, username, and password.
    • IAM Role: Attach the appropriate IAM role to the instance or AWS credentials to access the S3 bucket.
  2. PySpark Setup:
    • Install the AWS SDK for Python (boto3), and make sure you have the PostgreSQL JDBC driver (postgresql-<version>.jar).
  3. JSON File: Assume the JSON file is stored in S3:
    • Bucket: my-bucket
    • Key: data/data.json

Code Implementation

from pyspark.sql import SparkSession

# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .appName("S3 to RDS - JSON to PostgreSQL") \
    .config("spark.jars", "/path/to/postgresql-<version>.jar") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

# Step 2: Read JSON File from S3
s3_bucket = "s3a://my-bucket/data/data.json"  # Use 's3a' for Spark with S3
df = spark.read.json(s3_bucket)

# Step 3: Inspect the DataFrame
df.printSchema()
df.show()

# Step 4: RDS PostgreSQL Connection Properties
rds_url = "jdbc:postgresql://<rds-endpoint>:5432/<database>"  # Replace with your RDS endpoint and database
rds_properties = {
    "user": "<your_username>",       # Replace with your RDS username
    "password": "<your_password>",   # Replace with your RDS password
    "driver": "org.postgresql.Driver"
}

# Step 5: Write DataFrame to PostgreSQL Table on RDS
table_name = "public.user_data"
df.write.jdbc(url=rds_url, table=table_name, mode="overwrite", properties=rds_properties)

print("Data written successfully to Amazon RDS PostgreSQL!")

Detailed Changes for AWS

  1. Read JSON from S3:
    • Use the S3 URI format s3a://<bucket>/<key>. Ensure you configure the Spark session to use the S3A connector with AWS credentials.
  2. AWS Authentication:
    • Use the DefaultAWSCredentialsProviderChain for credentials. This can retrieve credentials from the following sources:
      • IAM roles on an EC2 instance or EMR cluster.
      • Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
      • AWS configuration files (~/.aws/credentials).
  3. Amazon RDS:
    • Update rds_url to include your RDS instance’s endpoint, database name, and port (default for PostgreSQL is 5432).
    • The security group associated with the RDS instance should allow inbound traffic from your Spark cluster or instance.
  4. JDBC Driver:
    • Provide the path to the PostgreSQL JDBC driver (postgresql-<version>.jar).

IAM Role Configuration

If running this on an EMR cluster or an EC2 instance, ensure the instance has an IAM role with the following permissions:

  • s3:GetObject for accessing the JSON file in S3.
  • Security group configuration to allow the instance to connect to the RDS instance.

Command to Execute the Script

If saved as s3_to_rds.py, execute it using:

spark-submit --jars /path/to/postgresql-<version>.jar s3_to_rds.py

Notes

  1. S3 Configuration:
    • Ensure the JSON file’s bucket and key are correct.
    • Set up appropriate permissions for S3 access.
  2. Amazon RDS Configuration:
    • Allow inbound traffic from your Spark cluster by adding the Spark instance’s security group to the RDS instance’s security group.
  3. Performance Optimization:
    • Use .repartition() or .coalesce() if needed to optimize the number of partitions when writing to PostgreSQL.

Running this workflow from AWS Glue involves leveraging Glue’s integration with S3, PySpark, and Amazon RDS. Glue simplifies the process by handling infrastructure and allowing you to focus on the ETL logic. Below is the step-by-step guide.


Steps to Read JSON from S3 and Write to RDS in AWS Glue

1. AWS Glue Setup

  1. Create an AWS Glue Job:
    • Go to the AWS Glue Console.
    • Create a new Glue job and choose the type as “Spark” with a Python shell (Glue supports PySpark).
    • Configure the IAM role with appropriate permissions for S3, RDS, and Glue.
  2. IAM Role Permissions: Ensure the IAM role assigned to Glue has:
    • s3:GetObject permission for reading from S3.
    • Access to the Amazon RDS instance (via security groups).
    • Glue permissions for logging and job execution.
  3. RDS Security Group:
    • Ensure the RDS instance’s security group allows inbound connections from Glue workers. Add Glue’s public IP range or private VPC as necessary.
  4. JDBC Connection:
    • Go to the Glue Console and create a “JDBC Connection” for the RDS instance. Provide the following details:
      • Connection type: PostgreSQL.
      • JDBC URL: jdbc:postgresql://<RDS-Endpoint>:5432/<database>.
      • Username and password for the RDS instance.
    • Test the connection to ensure it’s reachable.
  5. Upload the PostgreSQL JDBC Driver:
    • Download the PostgreSQL JDBC driver (e.g., postgresql-<version>.jar).
    • Upload it to an S3 bucket.
    • Add the path to the JAR file in the Glue job’s “Dependent JARs path”.

2. JSON File in S3

Ensure the JSON file is stored in S3:

  • Bucket: my-bucket
  • Key: data/data.json

3. Glue Job Script

Below is the Python script (PySpark) for the Glue job:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initialize GlueContext and Spark
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# S3 Input Path
s3_input_path = "s3://my-bucket/data/data.json"  # Update with your S3 bucket and file path

# Step 1: Read JSON from S3 into a DataFrame
df = spark.read.json(s3_input_path)

# Step 2: Inspect the DataFrame
df.printSchema()
df.show()

# Step 3: Write DataFrame to Amazon RDS (PostgreSQL)
rds_jdbc_url = "jdbc:postgresql://<rds-endpoint>:5432/<database>"  # Update with your RDS endpoint and database
rds_properties = {
    "user": "<your_username>",       # Replace with your RDS username
    "password": "<your_password>",   # Replace with your RDS password
    "driver": "org.postgresql.Driver"
}

# Write the data to RDS table
table_name = "public.user_data"
df.write.jdbc(
    url=rds_jdbc_url,
    table=table_name,
    mode="overwrite",  # Use "append" if you want to add to existing data
    properties=rds_properties
)

# Finalize job
job.commit()

4. Configure the Glue Job

  1. Script Location:
    • Upload the script to an S3 bucket, and provide the S3 path in the Glue job configuration.
  2. Dependent JARs Path:
    • Add the S3 path to the PostgreSQL JDBC driver in the “Python library path / JAR files” field.
  3. Arguments:
    • Add the following arguments to the Glue job: --conf spark.executor.extraClassPath=/path/to/postgresql-<version>.jar --conf spark.driver.extraClassPath=/path/to/postgresql-<version>.jar
  4. Worker Type and Count:
    • Choose worker types and counts based on your data size and processing needs. A common configuration is G.1X with 2–5 workers for small-to-medium datasets.
  5. Timeout:
    • Set an appropriate timeout for the Glue job.

5. Execute the Glue Job

  • Start the job from the Glue console or trigger it using AWS Glue workflows or event-based triggers.

AWS-Specific Considerations

  1. RDS Networking:
    • Ensure the Glue job can access RDS via a public/private subnet in the same VPC.
    • If using private subnets, set up an AWS Glue Connection to use the VPC for accessing RDS.
  2. Data Volume:
    • For large datasets, partition the data in S3 and use .repartition() in Spark for optimized writing to RDS.
  3. Glue Data Catalog:
    • Optionally, you can catalog the JSON file as a table in Glue and use glueContext.create_dynamic_frame.from_catalog() instead of spark.read.json().
  4. Monitoring:
    • Use CloudWatch Logs to monitor job progress and troubleshoot issues.

The schema of the target table in PostgreSQL can be determined in one of the following ways:


1. Schema Inference from the DataFrame

  • PySpark automatically infers the schema of the JSON file when reading it into a DataFrame using spark.read.json().

Example:

Given the following JSON file:

[
    {"id": 1, "name": "Alice", "age": 25, "city": "New York"},
    {"id": 2, "name": "Bob", "age": 30, "city": "Los Angeles"}
]

PySpark will infer the schema as:

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)

When using the write.jdbc() method, PySpark will map the inferred schema to the target table in PostgreSQL.


2. Existing PostgreSQL Table Schema

If the table already exists in PostgreSQL, PySpark will try to match the DataFrame’s schema to the target table’s schema.

  • Column Matching: The DataFrame’s column names and data types must align with the target table’s schema.
  • If there’s a mismatch (e.g., missing columns, extra columns, or incompatible data types), the job will fail unless the table is created beforehand or adjusted.

3. Automatically Creating the Table

If the table does not exist in PostgreSQL and the write.jdbc() method is used, Spark will attempt to create the table using the schema of the DataFrame.

  • This requires the user provided in the JDBC connection to have the necessary permissions to create tables in PostgreSQL.
  • The created table will have columns based on the DataFrame schema, and PostgreSQL will map Spark data types to equivalent PostgreSQL types. For example:
    • stringVARCHAR
    • longBIGINT
    • doubleDOUBLE PRECISION

Example of Table Creation

For the schema above, PySpark will create the following table in PostgreSQL:

CREATE TABLE user_data (
    id BIGINT,
    name VARCHAR,
    age BIGINT,
    city VARCHAR
);

4. Providing the Schema Explicitly

You can explicitly define the schema for the DataFrame before writing it to PostgreSQL. This is useful if:

  • The JSON file does not contain all the columns in the target table.
  • You want to enforce strict typing for the DataFrame schema.

Example of Explicit Schema Definition:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

df = spark.read.schema(schema).json("s3://my-bucket/data/data.json")

5. Handling Schema Mismatches

To ensure the schema matches the PostgreSQL table:

  1. Ensure the DataFrame columns align with the table schema:
    • Use select() to reorder columns or drop extra columns: df = df.select("id", "name", "age", "city")
  2. Cast Data Types Explicitly:
    • Use cast() to adjust data types if needed: from pyspark.sql.functions import col df = df.withColumn("id", col("id").cast("int"))

6. Schema Validation

Before writing to the table, validate the DataFrame schema against the target table schema using PostgreSQL queries.

Example of Query to Check Table Schema in PostgreSQL:

SELECT column_name, data_type 
FROM information_schema.columns 
WHERE table_name = 'user_data';

Compare the result with the DataFrame schema using df.printSchema().


Summary of Steps

  1. Ensure the JSON file structure aligns with the PostgreSQL table schema.
  2. If the table doesn’t exist:
    • Let Spark create the table automatically, or
    • Create it manually in PostgreSQL using CREATE TABLE.
  3. Validate schemas by inspecting the DataFrame (df.printSchema()) and the table structure.
  4. Explicitly define or adjust the schema in PySpark, if necessary.