Skip to content

Commit f6648f8

Browse files
committed
Infer table columns from parquet uri
The PR lets us infer table columns from parquet file. It can also populate the table if user requests so. The syntax to infer the table columns: ```sql CREATE TABLE test () WITH (definition_from = 's3://...') ``` The syntax to infer + populate the table: ```sql CREATE TABLE test () WITH (load_from = 's3://...') ``` Closes #49.
1 parent 3aff704 commit f6648f8

20 files changed

+1374
-279
lines changed

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
1919
- [Installation From Source](#installation-from-source)
2020
- [Usage](#usage)
2121
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables)
22+
- [Create Postgres tables from Parquet files](#create-postgres-tables-from-parquet-files)
2223
- [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables)
2324
- [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables)
2425
- [Inspect Parquet schema](#inspect-parquet-schema)
@@ -68,7 +69,7 @@ psql> "CREATE EXTENSION pg_parquet;"
6869
## Usage
6970
There are mainly 3 things that you can do with `pg_parquet`:
7071
1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream,
71-
2. You can ingest data from Parquet files to Postgres tables,
72+
2. You can ingest data from Parquet files to Postgres tables with type coercion and schema inference,
7273
3. You can inspect the schema and metadata of Parquet files.
7374

7475
### COPY from/to Parquet files to/from Postgres tables
@@ -110,6 +111,22 @@ COPY product_example FROM '/tmp/product_example.parquet';
110111
SELECT * FROM product_example;
111112
```
112113

114+
### Create Postgres tables from Parquet files
115+
You can use `CREATE TABLE () WITH (definition_from = <uri>)` command to **infer** the columns of Postgres tables.
116+
You can even infer + populate the table via `CREATE TABLE () WITH (load_from = <uri>)`.
117+
118+
```sql
119+
-- create table with inferred columns and populated rows
120+
CREATE TABLE product_inferred_example () WITH (load_from = '/tmp/product_example.parquet');
121+
122+
-- show table
123+
SELECT * FROM product_inferred_example;
124+
```
125+
126+
> [!NOTE]
127+
> If the inferred column is of composite type, a new type will be created named as `parquet_structs.struct_<hash>`. Hash is determined by
128+
field types and names of the composite type.
129+
113130
### COPY from/to Parquet stdin/stdout to/from Postgres tables
114131

115132
You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):

sql/bootstrap.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ CREATE SCHEMA parquet;
1414
REVOKE ALL ON SCHEMA parquet FROM public;
1515
GRANT USAGE ON SCHEMA parquet TO public;
1616
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA parquet TO public;
17+
18+
-- error if the schema already exists
19+
CREATE SCHEMA parquet_structs;

sql/pg_parquet--0.5.1--0.5.2.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- error if the schema already exists
2+
CREATE SCHEMA parquet_structs;

sql/pg_parquet.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,6 @@ CREATE FUNCTION parquet."column_stats"("uri" TEXT) RETURNS TABLE (
104104
) STRICT
105105
LANGUAGE c
106106
AS 'MODULE_PATHNAME', 'column_stats_wrapper';
107+
108+
-- error if the schema already exists
109+
CREATE SCHEMA parquet_structs;

src/arrow_parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ pub(crate) mod parquet_reader;
77
pub(crate) mod parquet_version;
88
pub(crate) mod parquet_writer;
99
pub(crate) mod pg_to_arrow;
10-
pub(crate) mod schema_parser;
10+
pub(crate) mod schema;
1111
pub(crate) mod uri_utils;

src/arrow_parquet/parquet_reader.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@ use crate::{
2323
arrow_parquet::{
2424
arrow_to_pg::{context::collect_arrow_to_pg_attribute_contexts, to_pg_datum},
2525
field_ids::FieldIds,
26-
schema_parser::{
27-
error_if_copy_from_match_by_position_with_generated_columns,
28-
parquet_schema_string_from_attributes,
26+
schema::{
27+
coerce_schema::{
28+
ensure_file_schema_match_tupledesc_schema,
29+
error_if_copy_from_match_by_position_with_generated_columns,
30+
},
31+
parse_schema::{
32+
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
33+
},
2934
},
3035
},
3136
parquet_udfs::list::list_uri,
@@ -37,9 +42,6 @@ use crate::{
3742
use super::{
3843
arrow_to_pg::context::ArrowToPgAttributeContext,
3944
match_by::MatchBy,
40-
schema_parser::{
41-
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
42-
},
4345
uri_utils::{parquet_reader_from_uri, ParsedUriInfo},
4446
};
4547

src/arrow_parquet/parquet_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
compression::PgParquetCompressionWithLevel,
1717
field_ids::validate_field_ids,
1818
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
19-
schema_parser::{
19+
schema::parse_schema::{
2020
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
2121
},
2222
uri_utils::parquet_writer_from_uri,

src/arrow_parquet/schema.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(crate) mod coerce_schema;
2+
pub(crate) mod infer_schema;
3+
pub(crate) mod parse_schema;
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use std::sync::Arc;
2+
3+
use arrow_cast::can_cast_types;
4+
use arrow_schema::{DataType, FieldRef, Schema};
5+
use pgrx::{
6+
ereport,
7+
pg_sys::{
8+
can_coerce_type,
9+
CoercionContext::{self, COERCION_EXPLICIT},
10+
FormData_pg_attribute, InvalidOid, Oid, BOOLOID, BYTEAOID, DATEOID, FLOAT4OID, FLOAT8OID,
11+
INT2OID, INT4OID, INT8OID, JSONOID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID,
12+
TIMESTAMPTZOID, UUIDOID,
13+
},
14+
PgLogLevel, PgSqlErrorCode, PgTupleDesc,
15+
};
16+
17+
use crate::{
18+
arrow_parquet::match_by::MatchBy,
19+
pgrx_utils::{
20+
array_element_typoid, collect_attributes_for, domain_array_base_elem_type,
21+
is_generated_attribute, tuple_desc, CollectAttributesFor,
22+
},
23+
type_compat::pg_arrow_type_conversions::make_numeric_typmod,
24+
};
25+
26+
// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema.
27+
// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option<DataType>
28+
// to cast to for each field.
29+
pub(crate) fn ensure_file_schema_match_tupledesc_schema(
30+
file_schema: Arc<Schema>,
31+
tupledesc_schema: Arc<Schema>,
32+
attributes: &[FormData_pg_attribute],
33+
match_by: MatchBy,
34+
) -> Vec<Option<DataType>> {
35+
let mut cast_to_types = Vec::new();
36+
37+
if match_by == MatchBy::Position
38+
&& tupledesc_schema.fields().len() != file_schema.fields().len()
39+
{
40+
panic!(
41+
"column count mismatch between table and parquet file. \
42+
parquet file has {} columns, but table has {} columns",
43+
file_schema.fields().len(),
44+
tupledesc_schema.fields().len()
45+
);
46+
}
47+
48+
for (tupledesc_schema_field, attribute) in
49+
tupledesc_schema.fields().iter().zip(attributes.iter())
50+
{
51+
let field_name = tupledesc_schema_field.name();
52+
53+
let file_schema_field = match match_by {
54+
MatchBy::Position => file_schema.field(attribute.attnum as usize - 1),
55+
56+
MatchBy::Name => {
57+
let file_schema_field = file_schema.column_with_name(field_name);
58+
59+
if file_schema_field.is_none() {
60+
panic!("column \"{}\" is not found in parquet file", field_name);
61+
}
62+
63+
let (_, file_schema_field) = file_schema_field.unwrap();
64+
65+
file_schema_field
66+
}
67+
};
68+
69+
let file_schema_field = Arc::new(file_schema_field.clone());
70+
71+
let from_type = file_schema_field.data_type();
72+
let to_type = tupledesc_schema_field.data_type();
73+
74+
// no cast needed
75+
if from_type == to_type {
76+
cast_to_types.push(None);
77+
continue;
78+
}
79+
80+
if !is_coercible(
81+
&file_schema_field,
82+
tupledesc_schema_field,
83+
attribute.atttypid,
84+
attribute.atttypmod,
85+
) {
86+
panic!(
87+
"type mismatch for column \"{}\" between table and parquet file.\n\n\
88+
table has \"{}\"\n\nparquet file has \"{}\"",
89+
field_name, to_type, from_type
90+
);
91+
}
92+
93+
pgrx::debug2!(
94+
"column \"{}\" is being cast from \"{}\" to \"{}\"",
95+
field_name,
96+
from_type,
97+
to_type
98+
);
99+
100+
cast_to_types.push(Some(to_type.clone()));
101+
}
102+
103+
cast_to_types
104+
}
105+
106+
// is_coercible first checks if "from_type" can be cast to "to_type" by arrow-cast.
107+
// Then, it checks if the cast is meaningful at Postgres by seeing if there is
108+
// an explicit coercion from "from_typoid" to "to_typoid".
109+
//
110+
// Additionaly, we need to be careful about struct rules for the cast:
111+
// Arrow supports casting struct fields by field position instead of field name,
112+
// which is not the intended behavior for pg_parquet. Hence, we make sure the field names
113+
// match for structs.
114+
fn is_coercible(
115+
from_field: &FieldRef,
116+
to_field: &FieldRef,
117+
to_typoid: Oid,
118+
to_typmod: i32,
119+
) -> bool {
120+
match (from_field.data_type(), to_field.data_type()) {
121+
(DataType::Struct(from_fields), DataType::Struct(to_fields)) => {
122+
if from_fields.len() != to_fields.len() {
123+
return false;
124+
}
125+
126+
let tupledesc = tuple_desc(to_typoid, to_typmod);
127+
128+
let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc);
129+
130+
for (from_field, (to_field, to_attribute)) in from_fields
131+
.iter()
132+
.zip(to_fields.iter().zip(attributes.iter()))
133+
{
134+
if from_field.name() != to_field.name() {
135+
return false;
136+
}
137+
138+
if !is_coercible(
139+
from_field,
140+
to_field,
141+
to_attribute.type_oid().value(),
142+
to_attribute.type_mod(),
143+
) {
144+
return false;
145+
}
146+
}
147+
148+
true
149+
}
150+
(DataType::List(from_field), DataType::List(to_field))
151+
| (DataType::FixedSizeList(from_field, _), DataType::List(to_field))
152+
| (DataType::LargeList(from_field), DataType::List(to_field)) => {
153+
let element_oid = array_element_typoid(to_typoid);
154+
let element_typmod = to_typmod;
155+
156+
is_coercible(from_field, to_field, element_oid, element_typmod)
157+
}
158+
(DataType::Map(from_entries_field, _), DataType::Map(to_entries_field, _)) => {
159+
// entries field cannot be null
160+
if from_entries_field.is_nullable() {
161+
return false;
162+
}
163+
164+
let (entries_typoid, entries_typmod) = domain_array_base_elem_type(to_typoid);
165+
166+
is_coercible(
167+
from_entries_field,
168+
to_entries_field,
169+
entries_typoid,
170+
entries_typmod,
171+
)
172+
}
173+
_ => {
174+
// check if arrow-cast can cast the types
175+
if !can_cast_types(from_field.data_type(), to_field.data_type()) {
176+
return false;
177+
}
178+
179+
let (from_typoid, _) = pg_type_for_arrow_primitive_field(from_field);
180+
181+
// pg_parquet could not recognize that arrow type
182+
if from_typoid == InvalidOid {
183+
return false;
184+
}
185+
186+
// check if coercion is meaningful at Postgres (it has a coercion path)
187+
can_pg_coerce_types(from_typoid, to_typoid, COERCION_EXPLICIT)
188+
}
189+
}
190+
}
191+
192+
// pg_type_for_arrow_primitive_field returns Postgres type for given
193+
// primitive arrow field. It returns InvalidOid if the arrow field's type is not recognized.
194+
pub(crate) fn pg_type_for_arrow_primitive_field(field: &FieldRef) -> (Oid, i32) {
195+
match field.data_type() {
196+
DataType::Float32 | DataType::Float16 => (FLOAT4OID, -1),
197+
DataType::Float64 => (FLOAT8OID, -1),
198+
DataType::Int16 | DataType::UInt16 | DataType::Int8 | DataType::UInt8 => (INT2OID, -1),
199+
DataType::UInt32 => (OIDOID, -1),
200+
DataType::Int32 => (INT4OID, -1),
201+
DataType::Int64 | DataType::UInt64 => (INT8OID, -1),
202+
DataType::Decimal128(precision, scale) => (
203+
NUMERICOID,
204+
make_numeric_typmod(*precision as _, *scale as _),
205+
),
206+
DataType::Boolean => (BOOLOID, -1),
207+
DataType::Date32 => (DATEOID, -1),
208+
DataType::Time64(_) => (TIMEOID, -1),
209+
DataType::Timestamp(_, None) => (TIMESTAMPOID, -1),
210+
DataType::Timestamp(_, Some(_)) => (TIMESTAMPTZOID, -1),
211+
DataType::Utf8 | DataType::LargeUtf8 if field.extension_type_name().is_none() => {
212+
(TEXTOID, -1)
213+
}
214+
DataType::Utf8 | DataType::LargeUtf8
215+
if field
216+
.try_extension_type::<arrow_schema::extension::Json>()
217+
.is_ok() =>
218+
{
219+
(JSONOID, -1)
220+
}
221+
DataType::Binary | DataType::LargeBinary => (BYTEAOID, -1),
222+
DataType::FixedSizeBinary(16)
223+
if field
224+
.try_extension_type::<arrow_schema::extension::Uuid>()
225+
.is_ok() =>
226+
{
227+
(UUIDOID, -1)
228+
}
229+
_ => (InvalidOid, -1),
230+
}
231+
}
232+
233+
fn can_pg_coerce_types(from_typoid: Oid, to_typoid: Oid, ccontext: CoercionContext::Type) -> bool {
234+
let n_args = 1;
235+
let input_typeids = [from_typoid];
236+
let target_typeids = [to_typoid];
237+
238+
unsafe {
239+
can_coerce_type(
240+
n_args,
241+
input_typeids.as_ptr(),
242+
target_typeids.as_ptr(),
243+
ccontext,
244+
)
245+
}
246+
}
247+
248+
pub(crate) fn error_if_copy_from_match_by_position_with_generated_columns(
249+
tupledesc: &PgTupleDesc,
250+
match_by: MatchBy,
251+
) {
252+
// match_by 'name' can handle generated columns
253+
if let MatchBy::Name = match_by {
254+
return;
255+
}
256+
257+
let attributes = collect_attributes_for(CollectAttributesFor::Other, tupledesc);
258+
259+
for attribute in attributes {
260+
if is_generated_attribute(&attribute) {
261+
ereport!(
262+
PgLogLevel::ERROR,
263+
PgSqlErrorCode::ERRCODE_FEATURE_NOT_SUPPORTED,
264+
"COPY FROM parquet with generated columns is not supported",
265+
"Try COPY FROM parquet WITH (match_by 'name'). \"
266+
It works only if the column names match with parquet file's.",
267+
);
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)