Is there a way to insert new record for iceberg table using pyiceberg api?

2 min read 05-10-2024
Is there a way to insert new record for iceberg table using pyiceberg api?


Inserting Records into Iceberg Tables with the PyIceberg API

The PyIceberg API provides a powerful and efficient way to interact with Iceberg tables, including inserting new records. This article will guide you through the process of adding data to an existing Iceberg table using Python.

The Challenge

Imagine you have a structured dataset stored in an Iceberg table, and you need to update this data with new information. How do you add these new records without disrupting your existing data and maintaining the integrity of the table?

Setting the Stage

Let's assume you have an Iceberg table named my_table stored in your data lake. This table has the following schema:

{
  "type": "struct",
  "fields": [
    {"name": "id", "type": "integer", "nullable": false},
    {"name": "name", "type": "string", "nullable": false},
    {"name": "age", "type": "integer", "nullable": false}
  ]
}

You'll be using the PyIceberg library to interact with this table. Here's how to get started:

from pyiceberg.table import Table
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, StringType

# Define the schema
schema = Schema(
    [
        {"name": "id", "type": IntegerType(), "nullable": False},
        {"name": "name", "type": StringType(), "nullable": False},
        {"name": "age", "type": IntegerType(), "nullable": False},
    ]
)

# Specify the location of your Iceberg table
table_location = "s3://your-bucket/my_table"

# Load the existing table
table = Table.load(table_location, schema)

Adding New Records

Now, let's define a list of dictionaries containing the new records you want to insert:

new_records = [
    {"id": 4, "name": "Alice", "age": 28},
    {"id": 5, "name": "Bob", "age": 32}
]

To insert these records, use the append method provided by the Table object:

table.append(
    new_records,
    partition_spec=None, 
    snapshot_id=None, 
    operation=table.operations.append()
)

This method takes the following arguments:

  • new_records: A list of dictionaries containing the new data.
  • partition_spec: (Optional) A partition specification if your table has partitions.
  • snapshot_id: (Optional) An explicit snapshot ID to append to.
  • operation: (Optional) The specific operation to perform. In this case, we're using the append operation.

Explanation

The append operation adds the new records to the Iceberg table without altering the existing data. The data is appended to the current data file or, if necessary, a new file is created. This process ensures efficient storage and query performance, even with large datasets.

Key Benefits of PyIceberg

  • Schema Evolution: PyIceberg enables adding new columns, modifying existing columns, and even removing columns from the table without impacting existing data.
  • Data Versioning: Iceberg maintains a history of all data modifications, making it easy to track changes and roll back to previous versions.
  • Partitioning: You can partition your data based on specific columns to improve query performance.
  • File Format Agnostic: Iceberg can work with various data formats, including Avro, Parquet, and ORC.

Conclusion

The PyIceberg API simplifies the process of adding new records to your Iceberg tables. This process ensures data integrity, efficiency, and flexibility. For more advanced data management and query operations, explore the comprehensive documentation available on the PyIceberg website.

Resources