Source
yaml
id: ingest-to-datalake-inline-python
namespace: company.team
variables:
  bucket: kestraio
  prefix: inbox
  database: default
tasks:
  - id: list_objects
    type: io.kestra.plugin.aws.s3.List
    prefix: "{{ vars.prefix }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    bucket: "{{ vars.bucket }}"
  - id: check
    type: io.kestra.plugin.core.flow.If
    condition: "{{outputs.list_objects.objects}}"
    then:
      - id: ingest_to_datalake
        type: io.kestra.plugin.scripts.python.Script
        env:
          AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          AWS_DEFAULT_REGION: "{{ secret('AWS_DEFAULT_REGION') }}"
        taskRunner:
          type: io.kestra.plugin.scripts.runner.docker.Docker
        containerImage: ghcr.io/kestra-io/aws:latest
        script: |
          import awswrangler as wr
          from kestra import Kestra
          # Iceberg table
          BUCKET_NAME = "{{ vars.bucket }}"
          DATABASE = "{{ vars.database }}"
          TABLE = "raw_fruits"
          # Iceberg table's location
          S3_PATH = f"s3://{BUCKET_NAME}/{TABLE}"
          S3_PATH_TMP = f"{S3_PATH}_tmp"
          # File to ingest
          PREFIX = "{{ vars.prefix }}"
          INGEST_S3_KEY_PATH = f"s3://{BUCKET_NAME}/{PREFIX}/"
          df = wr.s3.read_csv(INGEST_S3_KEY_PATH)
          nr_rows = df.id.nunique()
          print(f"Ingesting {nr_rows} rows")
          Kestra.counter("nr_rows", nr_rows, {"table": TABLE})
          df = df[~df["fruit"].isin(["Blueberry", "Banana"])]
          df = df.drop_duplicates(subset=["fruit"], ignore_index=True,
          keep="first")
          wr.catalog.delete_table_if_exists(database=DATABASE, table=TABLE)
          wr.athena.to_iceberg(
              df=df,
              database=DATABASE,
              table=TABLE,
              table_location=S3_PATH,
              temp_path=S3_PATH_TMP,
              partition_cols=["berry"],
              keep_files=False,
          )
          print(f"New data successfully ingested into {S3_PATH}")
      - id: merge_query
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          MERGE INTO fruits f USING raw_fruits r
              ON f.fruit = r.fruit
              WHEN MATCHED
                  THEN UPDATE
                      SET id = r.id, berry = r.berry, update_timestamp = current_timestamp
              WHEN NOT MATCHED
                  THEN INSERT (id, fruit, berry, update_timestamp)
                        VALUES(r.id, r.fruit, r.berry, current_timestamp);
      - id: optimize
        type: io.kestra.plugin.aws.athena.Query
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        database: "{{ vars.database }}"
        outputLocation: s3://{{ vars.bucket }}/query_results/
        query: |
          OPTIMIZE fruits REWRITE DATA USING BIN_PACK;
      - id: move_to_archive
        type: io.kestra.plugin.aws.cli.AwsCLI
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
        region: "{{ secret('AWS_DEFAULT_REGION') }}"
        commands:
          - aws s3 mv s3://{{ vars.bucket }}/{{ vars.prefix }}/ s3://{{
            vars.bucket }}/archive/{{ vars.prefix }}/ --recursive
triggers:
  - id: hourly_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "@hourly"
    disabled: true
About this blueprint
Python CLI AWS
This scheduled flow checks for new files in a given S3 bucket every hour. If new files have been detected, the flow will:
- Download the raw CSV files from S3
- Read those CSV files as a dataframe, use Pandas to clean the data and ingest it into the S3 data lake managed by Iceberg and AWS Glue
- Move already processed files to the archive/folder in the same S3 bucket
More Related Blueprints
