top of page
Green Juices

Migrating On-Premise Hadoop into GCP Bigqury


Data processing has become an essential part of modern applications and businesses, as organizations are continually trying to find ways to derive valuable insights from their data. One such requirement is the ability to migrate data from on-premise storage solutions, such as Hadoop, to cloud-based data warehouses like Google BigQuery. This not only improves the scalability and availability of data, but also allows businesses to leverage cutting-edge cloud technologies for advanced analytics, machine learning, and other data-driven tasks.


In this blog post, we will walk you through an end-to-end pipeline for migrating data from an on-premise Hadoop cluster to Google BigQuery using Google Cloud Storage (GCS), Google Dataflow, and Apache Airflow. The pipeline will perform data cleansing, validation, deduplication, and standardization tasks using PySpark. Additionally, we will incorporate data quality checks to ensure the data adheres to certain quality criteria before loading it into BigQuery.


Our pipeline will consist of the following steps:

  1. Set up a Google Cloud Storage (GCS) bucket for temporarily storing Hadoop data.

  2. Export Hadoop data to GCS using DistCp or Google Cloud Storage Connector for Hadoop.

  3. Create a Dataflow pipeline using Apache Beam Python SDK and PySpark to read data from GCS, perform data transformations, and quality checks.

  4. Load the transformed data into BigQuery.

  5. Orchestrate and schedule the pipeline using Cloud Composer and Apache Airflow.

By the end of this post, you will have a clear understanding of how to build a fully automated data migration pipeline that not only handles data transformations but also ensures data quality.


Here's an example of the directory structure for this pipeline:


my_data_pipeline/
|-- dags/
|   |-- hadoop_to_bigquery.py
|-- dataflow/
|   |-- dataflow_script.py
|-- scripts/
|   |-- export_hadoop_to_gcs.sh
|-- README.md
  • my_data_pipeline/ is the root directory of your project.

  • dags/ contains your Airflow DAGs, including hadoop_to_bigquery.py which defines the data pipeline DAG.

  • dataflow/ contains your Dataflow pipeline code (dataflow_script.py), which performs data cleansing, validation, deduplication, and standardization tasks.

  • scripts/ contains any helper scripts, such as export_hadoop_to_gcs.sh, which you may use to export Hadoop data to GCS if you are not using the Google Cloud Storage Connector for Hadoop.

  • README.md contains any relevant documentation about your project.


The export_hadoop_to_gcs.sh script will contain the commands required to transfer data from your Hadoop cluster to the Google Cloud Storage (GCS) bucket using either Hadoop DistCp or Google Cloud Storage Connector for Hadoop.


Here's an example of the script for both options:


1. Using Hadoop DistCp:

#!/bin/bash

# Set your Hadoop and GCS paths
HADOOP_SRC_PATH="hdfs://your_hadoop_cluster:8020/path/to/your/hadoop/data"
GCS_DEST_PATH="gs://your-gcs-bucket/path/to/your/gcs/data"# Set the location of your Hadoop installation
HADOOP_HOME="/path/to/your/hadoop"# Run the DistCp command${HADOOP_HOME}/bin/hadoop distcp \
    -D fs.gs.project.id=your_project_id \
    -D fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem \
    -D google.cloud.auth.service.account.enable=true \
    -D google.cloud.auth.service.account.json.keyfile="/path/to/your/keyfile.json" \
    ${HADOOP_SRC_PATH} ${GCS_DEST_PATH}

Replace your_hadoop_cluster, your_gcs-bucket, your_project_id, and the paths with appropriate values.


2. Using Google Cloud Storage Connector for Hadoop:

#!/bin/bash

# Set your Hadoop and GCS paths
HADOOP_SRC_PATH="hdfs://your_hadoop_cluster:8020/path/to/your/hadoop/data"
GCS_DEST_PATH="gs://your-gcs-bucket/path/to/your/gcs/data"# Set the location of your Hadoop installation
HADOOP_HOME="/path/to/your/hadoop"# Run the Hadoop command${HADOOP_HOME}/bin/hadoop fs \
    -D fs.gs.project.id=your_project_id \
    -D fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem \
    -D google.cloud.auth.service.account.enable=true \
    -D google.cloud.auth.service.account.json.keyfile="/path/to/your/keyfile.json" \
    -cp ${HADOOP_SRC_PATH} ${GCS_DEST_PATH}

Replace your_hadoop_cluster, your_gcs-bucket, your_project_id, and the paths with appropriate values.


After writing the script, make sure it is executable by running chmod +x export_hadoop_to_gcs.sh. Update the bash_command in your Airflow DAG BashOperator to reference the location of your export_hadoop_to_gcs.sh script:


export_hadoop_to_gcs = BashOperator(
    task_id='export_hadoop_to_gcs',
    bash_command='/path/to/your/export_hadoop_to_gcs.sh',
    dag=dag,
)

Replace /path/to/your with the appropriate path to the script.



Both Hadoop DistCp and the Google Cloud Storage Connector for Hadoop are viable options for migrating data from Hadoop to GCS, but they serve slightly different purposes and have some advantages and disadvantages. Your choice will depend on your specific use case and requirements.


Hadoop DistCp:


Advantages:

  • `DistCp` (short for Distributed Copy) is a native Hadoop tool specifically designed for large-scale data transfers and is highly optimized for parallelism and fault tolerance.

  • It can handle very large datasets and efficiently distribute copy tasks across your Hadoop cluster.

  • `DistCp` supports a variety of file systems, including HDFS, S3, and GCS, making it more versatile for copying data between different storage systems.

Disadvantages:

  • `DistCp` is a standalone tool and requires manual configuration for the source and destination file systems, such as setting up the GCS connector for Hadoop.

  • If you use `DistCp` frequently, you may need to monitor and manage its resource usage to prevent it from impacting other Hadoop cluster workloads.

Google Cloud Storage Connector for Hadoop:


Advantages:

  • The GCS connector allows you to use GCS as if it were another Hadoop-compatible file system, making it easier to integrate with existing Hadoop workflows and tools.

  • Once configured, you can use Hadoop's standard fs commands (e.g., hadoop fs -cp) to copy data between Hadoop and GCS, simplifying the migration process.

  • The GCS connector provides better integration with other Google Cloud services, such as Dataflow and BigQuery.

Disadvantages:

  • The GCS connector is not specifically designed for large-scale data transfers, so it might not be as optimized as DistCp for handling very large datasets.

  • It requires additional configuration and setup within your Hadoop environment, which can be more complex than using DistCp.


Below is the dataflow_script.py file with data quality checks using PySpark functions. We'll add a new transformation step to calculate some data quality metrics (e.g., minimum, maximum, and average age) and then filter the dataset based on those metrics.


import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, min, maxclass JsonToDict(beam.DoFn):
    def process(self, element):
        import json
        return [json.loads(element)]

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://your-gcs-bucket/input-data/*',
        help='Input file(s) to read from GCS.')
    parser.add_argument(
        '--table',
        dest='table',
        default='your_project_id:your_dataset.your_table',
        help='BigQuery table to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | 'ReadFromGCS' >> beam.io.ReadFromText(known_args.input)

        # Transform the data using PySparkdef process_data(element):
            spark = SparkSession.builder.master('local[*]').appName('DataProcessor').getOrCreate()
            df = spark.read.json(element)

            # Example transformations:# 1. Data cleansing: Remove null values
            df = df.na.drop()

            # 2. Data validation: Filter rows based on a specific condition
            df = df.filter(col('age') > 18)

            # 3. Data deduplication: Drop duplicate rows
            df = df.dropDuplicates()

            # 4. Data standardization: Convert all names to lowercase
            df = df.withColumn('name', col('name').lower())

            # Data quality checks# Calculate data quality metrics
            metrics = df.agg(
                min(col("age")).alias("min_age"),
                max(col("age")).alias("max_age"),
                mean(col("age")).alias("avg_age")
            ).collect()[0]

            # Filter the dataset based on data quality metrics# Example: Keep only rows with age within 1 standard deviation from the mean
            min_age = max(0, metrics["avg_age"] - metrics["max_age"])
            max_age = metrics["avg_age"] + metrics["max_age"]
            df = df.filter((col("age") >= min_age) & (col("age") <= max_age))

            return df.toJSON().collect()

        transformed_data = lines | 'ProcessData' >> beam.FlatMap(process_data)

        # Convert JSON to dictionary
        dict_data = transformed_data | 'JsonToDict' >> beam.ParDo(JsonToDict())

        # Write the results to BigQuery
        schema = 'name:STRING, age:INTEGER'
        dict_data | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
            known_args.table,
            schema=schema,
            write_disposition=beam

Then, create a new Airflow DAG file (e.g., data_pipeline_dag.py) with the following content:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow 
import DataflowPythonOperator

# Replace these variables with your own values
project_id = 'your_project_id'
gcs_bucket = 'your-gcs-bucket'
input_path = f'gs://{gcs_bucket}/input-data/*'
temp_location = f'gs://{gcs_bucket}/temp_location'
region = 'your_region'
bigquery_table = f'{project_id}:your_dataset.your_table'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 1),
}

dag = DAG(
    'data_pipeline_dag',
    default_args=default_args,
    description='DAG for running data pipeline',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

dataflow_task = DataflowPythonOperator(
    task_id='dataflow_pipeline',
    py_file='path/to/your/dataflow_script.py',
    py_options=[],
    dataflow_default_options={
        'project': project_id,
        'input': input_path,
        'table': bigquery_table,
        'runner': 'DataflowRunner',
        'temp_location': temp_location,
        'region': region,
    },
    dag=dag,
)

Replace the variables (project_id, gcs_bucket, region, your_dataset, and your_table) with your own values, and specify the path to your dataflow_script.py file.

Make sure to place the data_pipeline_dag.py file in your Airflow DAGs folder. Airflow will pick up the DAG definition and display it in the web UI. The DAG is scheduled to run daily.


`pydeequ` is a Python API for Deequ, which is a library built on top of Apache Spark for defining "unit tests for data." Here's a sample `pydeequ_check.py` file demonstrating how to use PyDeequ for data quality checks in your pipeline:

from pyspark.sql import SparkSession
import pydeequ

from pydeequ.checks import *
from pydeequ.verification import *

def data_quality_check(input_df):

    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("data quality check") \
        .master("local[*]") \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .getOrCreate()

    # Define the data quality checks using PyDeequ
    check = Check(spark, CheckLevel.Warning, "Data Quality Check")

    result = VerificationSuite(spark) \
        .onData(input_df) \
        .addCheck(
            check.hasSize(lambda x: x >= 0) \
            .hasMin("column1", lambda x: x > 0) \
            .hasMax("column2", lambda x: x < 100) \
            .isContainedIn("column3", ["value1", "value2", "value3"]) \
            .isUnique("column4") \
            .isNonNegative("column5") \
            .hasUniqueness("column6", lambda x: x > 0.99) \
            .hasCompleteness("column7", lambda x: x > 0.95)) \
        .run()

    # Handle the verification resultsif result.status == "Success":
        print("Data quality check passed.")
    else:
        print(f"Data quality check failed with the following results: {result}")

    # Stop the Spark session
    spark.stop()

    return result

In this example, you need to replace the `column1`, `column2`, etc., with your actual column names and adjust the checks according to your specific requirements. The `data_quality_check` function takes a PySpark DataFrame `input_df` as an input, performs the specified data quality checks, and returns the results. You can then use this function in your Dataflow pipeline for data quality validation.


README


Hadoop to BigQuery Data Migration Pipeline

This README file contains detailed information on how to set up and use the data migration pipeline for transferring data from an on-premise Hadoop cluster to Google BigQuery using Google Cloud Storage (GCS), Google Dataflow, and Apache Airflow. The pipeline will perform data cleansing, validation, deduplication, and standardization tasks using PySpark. Additionally, it will incorporate data quality checks to ensure the data adheres to certain quality criteria before loading it into BigQuery.

Prerequisites

Before you start, make sure you have the following:

  • Google Cloud Platform (GCP) account with billing enabled

  • A Hadoop cluster with data to be migrated

  • Python 3.x installed

  • Apache Airflow installed (if using Airflow for orchestration)

  • Google Cloud SDK installed and configured

Pipeline Overview

The data migration pipeline consists of the following steps:

  1. Set up a Google Cloud Storage (GCS) bucket for temporarily storing Hadoop data.

  2. Export Hadoop data to GCS using DistCp or Google Cloud Storage Connector for Hadoop.

  3. Create a Dataflow pipeline using Apache Beam Python SDK and PySpark to read data from GCS, perform data transformations, and quality checks.

  4. Load the transformed data into BigQuery.

  5. Orchestrate and schedule the pipeline using Cloud Composer and Apache Airflow.

Step-by-step Guide

Step 1: Set up a Google Cloud Storage (GCS) bucket

Create a GCS bucket to store your Hadoop data temporarily before processing and loading it into BigQuery. To do this, follow the instructions in the official Google Cloud documentation:

  • https://cloud.google.com/storage/docs/creating-buckets

Step 2: Export Hadoop data to GCS

To transfer your on-premise Hadoop data to the GCS bucket, use tools like Hadoop DistCp or Google Cloud Storage Connector for Hadoop. You can set up a cron job or use a scheduler like Apache Airflow to automate this step daily. For more information on using these tools, refer to:

  • https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage-distcp

  • https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage

Step 3: Create a Dataflow pipeline using Apache Beam Python SDK and PySpark

Develop a Dataflow pipeline using the Apache Beam Python SDK to read data from the GCS bucket, perform data cleansing, validation, deduplication, and standardization tasks. You can use PySpark libraries and functions within your Apache Beam pipeline for these transformations. Additionally, incorporate data quality checks using PySpark functions and libraries.

A sample `dataflow_script.py` file can be found in the dataflow directory.

Step 4: Load the transformed data into BigQuery

In your Dataflow pipeline, after completing the data transformations and quality checks, use the BigQuery I/O connector to load the processed data into BigQuery.

Step 5: Orchestrate and schedule the pipeline using Cloud Composer and Apache Airflow

Set up a Cloud Composer environment and use Apache Airflow to create a DAG (Directed Acyclic Graph) for orchestration and scheduling. The DAG should automate the following tasks:

a. Export Hadoop data to GCS (if not using a separate scheduler like cron).

b. Trigger the Dataflow pipeline for data processing.

c. Monitor the Dataflow pipeline and handle any errors or failures.

d. Optionally, perform any post-processing tasks like notifications or data validation.

A sample `hadoop_to_bigquery_dag.py file can be found in the `airflow/dags` directory.

Directory Structure

Here is the directory structure for the data migration pipeline:

data_migration_pipeline/
│
├── airflow/
│   └── dags/
│       └── hadoop_to_bigquery_dag.py
│
├── dataflow/
│   ├── data_quality_checks/
│   │   └── pydeequ_check.py
│   └── dataflow_script.py
│
├── export_scripts/
│   └── export_hadoop_to_gcs.sh
│
└── README.md
How to Run

How to Run

  1. Follow the steps in the step-by-step guide above to set up the GCS bucket, export Hadoop data to GCS, and create the Dataflow pipeline.

  2. Modify the `airflow/dags/hadoop_to_bigquery_dag.py` file to include the appropriate GCP project ID, GCS bucket name, and any other necessary configurations. Save the file.

  3. Copy the `hadoop_to_bigquery_dag.py` file to your Apache Airflow DAGs folder. For example:

cp hadoop_to_bigquery_dag.py ~/airflow/dags/

Troubleshooting

If you encounter any issues during the pipeline setup or execution, refer to the official documentation for each component:

  • https://cloud.google.com/storage/docs

  • https://cloud.google.com/dataflow/docs

  • https://beam.apache.org/documentation/sdks/python/

  • https://spark.apache.org/docs/latest/api/python/

  • https://airflow.apache.org/docs/

You can also check the logs in the Airflow UI for any task failures or errors.


Conclusion:


In conclusion, we've demonstrated how to build a robust, automated data migration pipeline that migrates data from an on-premise Hadoop cluster to Google BigQuery. By leveraging Google Cloud Storage, Google Dataflow, and Apache Airflow, we've created a pipeline that can efficiently handle data transformations, quality checks, and loading data into a cloud-based data warehouse.


This pipeline can serve as a foundation for more complex data processing tasks, as well as for integrating additional data sources or performing more advanced data quality checks. With the scalability and flexibility offered by Google Cloud Platform and its ecosystem of tools, you can adapt and extend this pipeline to meet your organization's specific needs.


By investing time and effort into building a robust data migration pipeline, you'll unlock the full potential of your data and enable your organization to make data-driven decisions, gain valuable insights, and stay ahead in the rapidly evolving world of data and analytics.

Comments


bottom of page