Accessing Iceberg storage directly using Python

In this example, we will show you how to access the Iceberg repository directly using Python. Using this method, you will be able to read and edit existing tables, create new tables, including those based on data from existing tables, and perform other operations directly in Iceberg.

First of all, in a cell of the SQL type, we will create a test table with one column and write 1 million numbers starting from 1 into the rows. To do this, we will use the unnest and generate_series functions.

CREATE OR REPLACE TABLE demo.iceberg_test (numbers BIGINT);

INSERT INTO demo.iceberg_test (numbers)
    SELECT unnest(generate_series(1,1000000));
Done in 2.9 sec

+---------+
| Count   |
+---------+
| 1000000 |
+---------+

Let’s display the resulting table:

SELECT * FROM demo.iceberg_test
    ORDER BY numbers
Done in 2.1 sec

+---------+
| numbers |
+---------+
| 1       |
+---------+
| 2       |
+---------+
| 3       |
+---------+
| 4       |
+---------+
| 5       |
+---------+
| ...     |
+---------+
999+ rows

Suppose we need to create a new table based on this table, which should contain all columns from the original table and one new column. In the new column we should write the data calculated using the function described at Python, passing the values from the columns of the original table as arguments.

As such a test function, let’s describe the check_odd function, which returns the odd string for odd numbers and the even string for even numbers.

We will do all this by accessing Iceberg directly using Python:

import tngri
import pyarrow
import pyiceberg
from contextlib import suppress
import pandas


# Задаем имя исходной таблицы
source_table_name = 'demo.iceberg_test'

# Задаем имя целевой таблицы
target_table_name = 'demo.iceberg_test_target'

# Задаем имя колонки в целевой таблице для записи новых данных
target_column = 'odd_or_even'

print(f'Source table: {source_table_name}')
print(f'Target table: {target_table_name}')

# Загружаем исходную таблицу
source_table = catalog.load_table(source_table_name)

# Создаем целевую таблицу, копируем схему из исходной
with suppress(Exception):
    catalog.drop_table(target_table_name)
sink = catalog.create_table(target_table_name, source_table.schema())

# Добавляем колону для записи новых данных (обязательно указать тип данных) (1)
with sink.update_schema() as tx:
    tx.add_column(target_column, pyiceberg.schema.StringType())

# Тестовая функция для вычисления новых данных
def check_odd(num):
    if num % 2:
        return 'odd'
    else:
        return 'even'

# Делим исходную таблицу на батчи, чтобы не загружать в память целиком
table_batches = source_table.scan().to_arrow_batch_reader()

# Записываем новые данные по батчам
step = 0
for batch in table_batches:
    batch: pyarrow.RecordBatch

    step += 1
    print(f'Step: {step}')

    # Преобразуем батч в DataFrame
    part_df = batch.to_pandas()

    # Записываем новые данные в новую колонку DataFrame с тем же именем, что в таблице
    part_df[target_column] = part_df['numbers'].apply(lambda num: check_odd(num))

    # Выводим размер DataFrame с новыми данными
    print(f'Result dataframe shape: {part_df.shape}')

    # Преобразуем DataFrame обратно в таблицу
    sink_part = pyarrow.Table.from_pandas(part_df)

    # Аппендим данный батч в целевую таблицу
    sink.append(sink_part)

# Выводим длину целевой таблицы через scan().count()
print(f'Result table length: {sink.scan().count()}')
# Выводим длину целевой таблицы через tngri.sql
print(tngri.sql(f'SELECT count(*) FROM {target_table_name}')) (2)
1 Learn more about data types in pyiceberg here
2 Detailed description of the function: tngri.sql
Done in 11.9 sec

Source table: demo.iceberg_test
Target table: demo.iceberg_test_target
Step: 1
Result dataframe shape: (16960, 2)
Step: 2
Result dataframe shape: (122880, 2)
Step: 3
Result dataframe shape: (122880, 2)
Step: 4
Result dataframe shape: (122880, 2)
Step: 5
Result dataframe shape: (122880, 2)
Step: 6
Result dataframe shape: (122880, 2)
Step: 7
Result dataframe shape: (122880, 2)
Step: 8
Result dataframe shape: (122880, 2)
Step: 9
Result dataframe shape: (122880, 2)
Result table length: 1000000
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 1000000  │
+----------+

Let’s check the target table. To do this, in a cell of type SQL we will display its first five rows, ordering the rows by the numbers column:

SELECT * FROM demo.iceberg_test_target
    ORDER BY numbers
    LIMIT 5
Done in 2.1 sec

+---------+-------------+
| numbers | odd_or_even |
+---------+-------------+
| 1 | odd |
+---------+-------------+
| 2 | even |
+---------+-------------+
| 3 | odd |
+---------+-------------+
| 4 | even |
+---------+-------------+
| 5 | odd |
+---------+-------------+