Автоматизация загрузки данных из множественных файлов
В этом примере покажем, как загрузить в Tengri данные, которые хранятся в виде множества файлов .parquet
в хранилище S3.
С помощью модуля Python boto3
выведем пути всех файлов .parquet
в конкретном хранилище:
import boto3
import boto3.s3.transfer
path = 'Stage/<lake_path>/DataLake/2049/'
s3_client = boto3.client('s3',
endpoint_url='***',
aws_access_key_id='***',
aws_secret_access_key='***',
region_name='us-east-1')
response = s3_client.list_buckets()
for bucket in response['Buckets']:
print(f'Bucket: {bucket["Name"]}')
response = s3_client.list_objects_v2(
Bucket=bucket["Name"],
Prefix='Stage')
for content in response.get('Contents', []):
file_path = str(content['Key'])
if path in file_path:
print(file_path)
Bucket: prostore
Stage/<lake_path>/DataLake/2049/1.parquet
Stage/<lake_path>/DataLake/2049/2.parquet
Stage/<lake_path>/DataLake/2049/3.parquet
Stage/<lake_path>/DataLake/2049/4.parquet
Stage/<lake_path>/DataLake/2049/5.parquet
Bucket: s3bench-asusnode21
Попробуем сделать запрос SQL к данным из одного из этих файлов и убедимся, что запрос обрабатывается, а данные отображаются в выводе:
SELECT *
FROM read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
LIMIT 5;
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| f1 | f2 | f3 | f4 | jd1 | jd2 | command_load_datetime | command_load_session_id |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 673, "age": 77, "name": "d1b7d724ef3a8451189fc47da62b9888", "email": "b32c9c5d3b1768e7955144103d72c593@example.com"} | false | 0001-01-02 06:44:06 | <memory at 0x7f0bb8523880> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195 |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 221, "age": 34, "name": "4de35c863a2df55018eca39f130c0738", "email": "3535ac8c3b04c33ac67ee70693707bdb@example.com"} | true | 0001-01-02 02:59:53 | <memory at 0x7f0bb85219c0> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195 |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 612, "age": 27, "name": "94a027942dffccb4abe3923914592ba5", "email": "10178786f4378e98f62be257c4215056@example.com"} | false | 0001-01-02 11:53:53 | <memory at 0x7f0bb8522c80> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195 |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 136, "age": 73, "name": "500522adeb682c1efcdce8af83610a27", "email": "de507ff06310d2b35ea65c40f26f6a94@example.com"} | false | 0001-01-02 00:56:43 | <memory at 0x7f0bb8521fc0> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195 |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
| {"id": 150, "age": 67, "name": "7d283d7c78b5692f6162d8b65baee1ed", "email": "df78cebca3c40d06e8b43ad8a7e98f56@example.com"} | true | 0001-01-02 13:46:41 | <memory at 0x7f0bb8521600> | 7000-1-1 | 7000-2-1 | 2025-06-23 08:05:23.636063 | 195 |
+-----------------------------------------------------------------------------------------------------------------------------+-------+---------------------+----------------------------+----------+----------+----------------------------+-------------------------+
Теперь нужно загрузить данные из всех файлов .parquet
в одну таблицу.
Для этого сначала инициализируем эту таблицу — создадим ее, но не будем пока записывать в нее данные, чтобы сделать это на следующем шаге циклом.
Чтобы инициализировать таблицу, выполним такую команду с подзапросом SELECT *
для одного из этих файлов и со значением параметра LIMIT
— 0
:
CREATE OR REPLACE TABLE raw.dyntest AS
SELECT *
FROM read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
LIMIT 0;
Done in 22.2 sec
+--------+
| status |
+--------+
| CREATE |
+--------+
Этот запрос создает таблицу с нужным нам набором столбцов и их типами, теперь в нее можно загружать данные в цикле.
Для этого в ячейке типа Python опишем цикл по именам всех файлов и в этом цикле с помощью функции tngri.sql
выполним итеративную загрузку данных из каждого файла .parquet
в инициализированную таблицу raw.dyntest
.
В каждой итерации будем выводить количество строк в этой таблице, которое должно прирастать.
import tngri
for i in range(1,6):
file_name = f"s3://prostore/Stage/<lake_path>/DataLake/2049/{i}.parquet"
insert_sql = f"INSERT INTO raw.dyntest SELECT * FROM read_parquet('{file_name}')"
print(insert_sql)
tngri.sql(insert_sql)
print(tngri.sql("SELECT count(*) FROM raw.dyntest"))
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/1.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 10000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/2.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 20000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/3.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 30000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/4.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 40000000 │
+----------+
insert into raw.dyntest select * from read_parquet('s3://prostore/Stage/<lake_path>/DataLake/2049/5.parquet')
shape: (1, 1)
+----------+
│ column_0 │
│ --- │
│ i64 │
+----------+
│ 50000000 │
+----------+
Теперь выведем количество строк в заполненной таким образом таблице:
SELECT count(*) AS row_count
FROM raw.dyntest;
+-----------+
| row_count |
+-----------+
| 50000000 |
+-----------+
Чтобы проверить, что запросы к этим данным на таком объеме (50 млн. строк) работают штатно, выведем распределение значений true
и false
в столбце f2
:
SELECT f2, count(*) AS f2_cnt
FROM raw.dyntest_ng
GROUP BY f2;
+-------+----------+
| f2 | f2_cnt |
+-------+----------+
| false | 24998577 |
+-------+----------+
| true | 25001423 |
+-------+----------+
В однм из столбцов таблицы записаны данные в виде структуры JSON, и один из ключей этой структуры — age
(возраст). Построим таблицу распределения значений возраста. Для этого используем функцию json_extract
.
SELECT
json_extract(f1, ['age'])[1]::INT AS age,
count(*) AS age_count
FROM raw.dyntest
GROUP BY 1 ORDER BY 1;
Done in 9.9 sec
+-----+-----------+
| age | age_count |
+-----+-----------+
| 0 | 500228 |
+-----+-----------+
| 1 | 499629 |
+-----+-----------+
| 2 | 500222 |
+-----+-----------+
| 3 | 500622 |
+-----+-----------+
| 4 | 500373 |
+-----+-----------+
| 5 | 500410 |
+-----+-----------+
| 6 | 499876 |
+-----+-----------+
| 7 | 499276 |
+-----+-----------+
| 8 | 500566 |
+-----+-----------+
| 9 | 499314 |
+-----+-----------+
| ... | ... |
+-----+-----------+
На основе этой таблицы построим график распределения значений по возрасту:
import matplotlib
plt = cell_output.plot(
title='Распределение по возрасту',
y='age_count',
ylabel='Количество',
x='age',
xlabel='Возраст',
)
plt.get_figure()
