airflow operator to download a file from URL and push to S3?

2 min read 06-10-2024
airflow operator to download a file from URL and push to S3?


Download and Upload: Streamlining File Transfers with Airflow

In the world of data pipelines, efficiently moving files between different systems is crucial. Often, you'll need to fetch data from a remote source, like a website or API, and store it securely in a cloud storage solution like Amazon S3. This is where Airflow shines. Its flexible architecture and diverse operators enable us to build robust, reliable workflows for seamless data transfers.

Scenario: Let's imagine you need to download a daily CSV report from a website and upload it to your S3 bucket for further analysis. This article will guide you through building an Airflow DAG to automate this process.

Original Code:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3UploadFileOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime

with DAG(
    dag_id='download_and_upload_to_s3',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:

    create_bucket = S3CreateBucketOperator(
        task_id='create_bucket',
        bucket_name='your-s3-bucket-name'
    )

    download_file = SimpleHttpOperator(
        task_id='download_file',
        method='GET',
        http_conn_id='http_default',
        endpoint='https://www.example.com/daily_report.csv',
        data={'param1': 'value1'},
        headers={'Authorization': 'your-token'},
        save_kwargs={'filename': '/tmp/daily_report.csv'},
        response_filter=lambda response: response.content
    )

    upload_to_s3 = S3UploadFileOperator(
        task_id='upload_to_s3',
        bucket_name='your-s3-bucket-name',
        filepath='/tmp/daily_report.csv',
        key='reports/daily_report_{{ ds }}.csv',
        aws_conn_id='aws_default'
    )

    create_bucket >> download_file >> upload_to_s3

Explanation:

  1. DAG Definition: We define our DAG with a descriptive name, a start date, a daily schedule, and disable catchup (ensuring only the current day's task is run).

  2. Create S3 Bucket: The S3CreateBucketOperator creates the bucket if it doesn't exist. Ensure your AWS credentials are configured in the aws_default connection.

  3. Download File: The SimpleHttpOperator fetches the file from the provided URL. You can specify custom headers, parameters, and a temporary file location.

  4. Upload to S3: The S3UploadFileOperator takes the downloaded file and uploads it to your S3 bucket. The key parameter specifies the file's path within the bucket. The ds template variable dynamically adds the execution date to the filename.

  5. Task Dependencies: The >> operator defines task dependencies, ensuring the bucket is created first, followed by download and finally upload.

Insights:

  • Flexibility: The SimpleHttpOperator allows for various HTTP methods, custom headers, and data parameters, making it adaptable to different APIs and websites.

  • Error Handling: Consider adding error handling within the SimpleHttpOperator using on_failure or on_retry callbacks to gracefully handle potential download failures.

  • Data Validation: After uploading to S3, you might want to add a validation step to check the downloaded file's format or content using a custom operator.

  • Logging: Enable logging in your DAG to monitor the execution progress and identify any potential issues.

Conclusion:

Airflow simplifies file transfers by providing specialized operators for downloading, uploading, and managing files across various systems. This article showcased a basic example, but its versatility allows you to build complex workflows that cater to your specific data needs. Remember to experiment, enhance with error handling, and adapt the code to fit your particular use case. By leveraging Airflow, you can ensure reliable and efficient data movement throughout your pipeline, fostering better data insights and analysis.

References: