Автоматизация загрузки данных из множественных файлов

В этом примере покажем, как загрузить в Tengri данные, которые хранятся в виде множества файлов .parquet в хранилище S3.

С помощью модуля Python boto3 выведем пути всех файлов .parquet в конкретном хранилище:

import boto3
import boto3.s3.transfer

path = 'Stage/<lake_path>/DataLake/2049/'
s3_client = boto3.client('s3',
                        endpoint_url='***',
                        aws_access_key_id='***',
                        aws_secret_access_key='***',
                        region_name='us-east-1')

response = s3_client.list_buckets()

for bucket in response['Buckets']:
    print(f'Bucket: {bucket["Name"]}')
    response = s3_client.list_objects_v2(
    Bucket=bucket["Name"],
    Prefix='Stage')

    for content in response.get('Contents', []):
        file_path = str(content['Key'])
        if path in file_path:
            print(file_path)
Bucket: prostore
Stage/<lake_path>/DataLake/2049/1.parquet
Stage/<lake_path>/DataLake/2049/2.parquet
Stage/<lake_path>/DataLake/2049/3.parquet
Stage/<lake_path>/DataLake/2049/4.parquet
Stage/<lake_path>/DataLake/2049/5.parquet
Bucket: s3bench-asusnode21

Попробуем сделать запрос SQL к данным из одного из этих файлов и убедимся, что запрос обрабатывается, а данные отображаются в выводе:

SELECT *
    FROM read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
LIMIT 5;
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| f1                                                                                                                          | f2    | f3                  | f4                         | jd1      | jd2      | command_load_datetime      | command_load_session_id |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 673, "age": 77, "name": "d1b7d724ef3a8451189fc47da62b9888", "email": "b32c9c5d3b1768e7955144103d72c593@example.com"} | false | 0001-01-02 06:44:06 | <memory at 0x7f0bb8523880> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195                     |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 221, "age": 34, "name": "4de35c863a2df55018eca39f130c0738", "email": "3535ac8c3b04c33ac67ee70693707bdb@example.com"} | true  | 0001-01-02 02:59:53 | <memory at 0x7f0bb85219c0> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195                     |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 612, "age": 27, "name": "94a027942dffccb4abe3923914592ba5", "email": "10178786f4378e98f62be257c4215056@example.com"} | false | 0001-01-02 11:53:53 | <memory at 0x7f0bb8522c80> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195                     |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 136, "age": 73, "name": "500522adeb682c1efcdce8af83610a27", "email": "de507ff06310d2b35ea65c40f26f6a94@example.com"} | false | 0001-01-02 00:56:43 | <memory at 0x7f0bb8521fc0> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195                     |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 150, "age": 67, "name": "7d283d7c78b5692f6162d8b65baee1ed", "email": "df78cebca3c40d06e8b43ad8a7e98f56@example.com"} | true  | 0001-01-02 13:46:41 | <memory at 0x7f0bb8521600> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195                     |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+

Теперь нужно загрузить данные из всех файлов .parquet в одну таблицу.

Для этого сначала инициализируем эту таблицу — создадим ее, но не будем пока записывать в нее данные, чтобы сделать это на следующем шаге циклом.

Чтобы инициализировать таблицу, выполним такую команду с подзапросом SELECT * для одного из этих файлов и со значением параметра LIMIT — 0:

CREATE OR REPLACE TABLE raw.dyntest AS
    SELECT *
        FROM read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
    LIMIT 0;
Done in 22.2 sec

+--------+
| status |
+--------+
| CREATE |
+--------+

Этот запрос создает таблицу с нужным нам набором столбцов и их типами, теперь в нее можно загружать данные в цикле.

Для этого в ячейке типа Python опишем цикл по именам всех файлов и в этом цикле с помощью функции tngri.sql выполним итеративную загрузку данных из каждого файла .parquet в инициализированную таблицу raw.dyntest.

В каждой итерации будем выводить количество строк в этой таблице, которое должно прирастать.

import tngri

for i in range(1,6):
    file_name = f"s3://prostore/Stage/<lake_path>/DataLake/2049/{i}.parquet"
    insert_sql = f"INSERT INTO raw.dyntest SELECT * FROM read_parquet('{file_name}')"
    print(insert_sql)
    tngri.sql(insert_sql)

    print(tngri.sql("SELECT count(*) FROM raw.dyntest"))
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 10000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/2.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 20000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/3.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 30000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/4.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 40000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/5.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 50000000 │
+----------+

Теперь выведем количество строк в заполненной таким образом таблице:

SELECT count(*) AS row_count
    FROM raw.dyntest;
+-----------+
| row_count |
+-----------+
| 50000000  |
+-----------+

Чтобы проверить, что запросы к этим данным на таком объеме (50 млн. строк) работают штатно, выведем распределение значений true и false в столбце f2:

SELECT f2, count(*) AS f2_cnt
    FROM raw.dyntest_ng
    GROUP BY f2;
+-------+----------+
| f2    | f2_cnt   |
+-------+----------+
| false | 24998577 |
+-------+----------+
| true  | 25001423 |
+-------+----------+

В однм из столбцов таблицы записаны данные в виде структуры JSON, и один из ключей этой структуры — age (возраст). Построим таблицу распределения значений возраста. Для этого используем функцию json_extract.

SELECT
    json_extract(f1, ['age'])[1]::INT AS age,
    count(*) AS age_count
    FROM raw.dyntest
    GROUP BY 1 ORDER BY 1;
Done in 9.9 sec

+-----+-----------+
| age | age_count |
+-----+-----------+
| 0   | 500228    |
+-----+-----------+
| 1   | 499629    |
+-----+-----------+
| 2   | 500222    |
+-----+-----------+
| 3   | 500622    |
+-----+-----------+
| 4   | 500373    |
+-----+-----------+
| 5   | 500410    |
+-----+-----------+
| 6   | 499876    |
+-----+-----------+
| 7   | 499276    |
+-----+-----------+
| 8   | 500566    |
+-----+-----------+
| 9   | 499314    |
+-----+-----------+
| ... | ...       |
+-----+-----------+

На основе этой таблицы построим график распределения значений по возрасту:

import matplotlib

plt = cell_output.plot(
    title='Распределение по возрасту',
    y='age_count',
    ylabel='Количество',
    x='age',
    xlabel='Возраст',
)

plt.get_figure()
complex data upload result