Обращение к хранилищу Iceberg напрямую с помощью Python
В этом примере мы покажем, как обращаться к хранилищу Iceberg напрямую с помощью Python. Используя этот способ, вы сможете читать и редактировать существующие таблицы, создавать новые таблицы, в том числе на основе данных из существующих таблиц, и совершать другие операции непосредственно в хранилище Iceberg.
Для начала в ячейке типа SQL создадим тестовую таблицу с одним столбцом и запишем в строки 1 миллион чисел начиная с 1
. Для этого используем функции unnest
и generate_series
.
CREATE OR REPLACE TABLE demo.iceberg_test (numbers BIGINT);
INSERT INTO demo.iceberg_test (numbers)
SELECT unnest(generate_series(1,1000000));
Done in 2.9 sec
+---------+
| Count |
+---------+
| 1000000 |
+---------+
Выведем получившуюся таблицу:
SELECT * FROM demo.iceberg_test
ORDER BY numbers
Done in 2.1 sec
+---------+
| numbers |
+---------+
| 1 |
+---------+
| 2 |
+---------+
| 3 |
+---------+
| 4 |
+---------+
| 5 |
+---------+
| ... |
+---------+
999+ rows
Предположим, что на основе этой таблицы нам нужно создать новую таблицу, в которой должны быть все столбцы из исходной таблицы и один новый столбец. В новый столбец мы должны записать данные, вычисленные с помощью описанной на Python функции, передавая ей в качестве аргументов значения из столбцов исходной таблицы.
В качестве такой тестовой функции опишем функцию check_odd
, которая возвращает строку odd
для нечетных чисел и строку even
— для четных.
Все это мы сделаем, обращаясь напрямую в Iceberg с помощью Python:
import tngri
import pyarrow
import pyiceberg
from contextlib import suppress
import pandas
# Задаем имя исходной таблицы
source_table_name = 'demo.iceberg_test'
# Задаем имя целевой таблицы
target_table_name = 'demo.iceberg_test_target'
# Задаем имя колонки в целевой таблице для записи новых данных
target_column = 'odd_or_even'
print(f'Source table: {source_table_name}')
print(f'Target table: {target_table_name}')
# Загружаем исходную таблицу
source_table = catalog.load_table(source_table_name)
# Создаем целевую таблицу, копируем схему из исходной
with suppress(Exception):
catalog.drop_table(target_table_name)
sink = catalog.create_table(target_table_name, source_table.schema())
# Добавляем колону для записи новых данных (обязательно указать тип данных)
with sink.update_schema() as tx:
tx.add_column(target_column, pyiceberg.schema.StringType())
# Тестовая функция для вычисления новых данных
def check_odd(num):
if num % 2:
return 'odd'
else:
return 'even'
# Делим исходную таблицу на батчи, чтобы не загружать в память целиком
table_batches = source_table.scan().to_arrow_batch_reader()
# Записываем новые данные по батчам
step = 0
for batch in table_batches:
batch: pyarrow.RecordBatch
step += 1
print(f'Step: {step}')
# Преобразуем батч в DataFrame
part_df = batch.to_pandas()
# Записываем новые данные в новую колонку DataFrame с тем же именем, что в таблице
part_df[target_column] = part_df['numbers'].apply(lambda num: check_odd(num))
# Выводим размер DataFrame с новыми данными
print(f'Result dataframe shape: {part_df.shape}')
# Преобразуем DataFrame обратно в таблицу
sink_part = pyarrow.Table.from_pandas(part_df)
# Аппендим данный батч в целевую таблицу
sink.append(sink_part)
# Выводим длину целевой таблицы через scan().count()
print(f'Result table length: {sink.scan().count()}')
# Выводим длину целевой таблицы через tngri.sql
print(tngri.sql(f'SELECT count(*) FROM {target_table_name}'))
Done in 11.9 sec
Source table: demo.iceberg_test
Target table: demo.iceberg_test_target
Step: 1
Result dataframe shape: (16960, 2)
Step: 2
Result dataframe shape: (122880, 2)
Step: 3
Result dataframe shape: (122880, 2)
Step: 4
Result dataframe shape: (122880, 2)
Step: 5
Result dataframe shape: (122880, 2)
Step: 6
Result dataframe shape: (122880, 2)
Step: 7
Result dataframe shape: (122880, 2)
Step: 8
Result dataframe shape: (122880, 2)
Step: 9
Result dataframe shape: (122880, 2)
Result table length: 1000000
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 1000000 │
+----------+
Проверим целевую таблицу. Для этого в ячейке типа SQL выведем ее первые пять строк, упорядочив строки по столбцу numbers
:
SELECT * FROM demo.iceberg_test_target
ORDER BY numbers
LIMIT 5
Done in 2.1 sec
+---------+-------------+
| numbers | odd_or_even |
+---------+-------------+
| 1 | odd |
+---------+-------------+
| 2 | even |
+---------+-------------+
| 3 | odd |
+---------+-------------+
| 4 | even |
+---------+-------------+
| 5 | odd |
+---------+-------------+