Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <access/tupdesc.h>
#include <executor/tuptable.h>
#include <nodes/pg_list.h>
#include <utils/memutils.h>

#include "grouping_policy.h"

Expand Down Expand Up @@ -96,7 +97,13 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr
break;
}

policy->hashing.key_body_mctx = policy->agg_extra_mctx;
policy->hashing.key_body_mctx =
#if (PG17_LT)
AllocSetContextCreate(
#else
BumpContextCreate(
#endif
CurrentMemoryContext, "hashing keys", ALLOCSET_DEFAULT_SIZES);

policy->hashing.init(&policy->hashing, policy);

Expand All @@ -114,6 +121,8 @@ gp_hash_reset(GroupingPolicy *obj)

policy->hashing.reset(&policy->hashing);

MemoryContextReset(policy->hashing.key_body_mctx);

policy->stat_input_valid_rows = 0;
policy->stat_input_total_rows = 0;
policy->stat_bulk_filtered_rows = 0;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ typedef struct BatchHashingParams
int num_grouping_columns;
const CompressedColumnValues *grouping_column_values;

GroupingPolicyHash *policy;
GroupingPolicyHash *restrict policy;
HashingStrategy *restrict hashing;

uint32 *restrict result_key_indexes;
Expand Down
3 changes: 1 addition & 2 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
* allocations in the hot loop that fills the hash table.
*/
void
hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows)
hash_strategy_output_key_alloc(HashingStrategy *hashing, uint16 nrows)
{
HashingStrategy *hashing = &policy->hashing;
const uint32 num_possible_keys = hashing->last_used_key_index + 1 + nrows;

if (num_possible_keys > hashing->num_allocated_output_keys)
Expand Down
15 changes: 14 additions & 1 deletion tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ FUNCTION_NAME(hash_strategy_prepare_for_batch)(GroupingPolicyHash *policy,
{
uint16 nrows = 0;
vector_slot_get_qual_result(vector_slot, &nrows);
hash_strategy_output_key_alloc(policy, nrows);
hash_strategy_output_key_alloc(&policy->hashing, nrows);
FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, vector_slot);
}

Expand Down Expand Up @@ -169,6 +169,10 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
}
}

/*
* In some special cases we call a more efficient specialization of the grouping
* function.
*/
static void
FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row,
int end_row)
Expand All @@ -177,6 +181,15 @@ FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_s

BatchHashingParams params = build_batch_hashing_params(policy, vector_slot);

#ifdef USE_DICT_HASHING
if (params.hashing->use_key_index_for_dict)
{
Assert(params.single_grouping_column.decompression_type == DT_ArrowTextDict);
single_text_offsets_translate(params, start_row, end_row);
return;
}
#endif

FUNCTION_NAME(fill_offsets_impl)(params, start_row, end_row);
}

Expand Down
266 changes: 266 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_single_text.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,275 @@ single_text_emit_key(GroupingPolicyHash *policy, uint32 current_key,
return hash_strategy_output_key_single_emit(policy, current_key, aggregated_slot);
}

/*
* We use a special batch preparation function to sometimes hash the dictionary-
* encoded column using the dictionary.
*/

#define USE_DICT_HASHING

static void single_text_fill_offsets_impl(BatchHashingParams params, int start_row, int end_row);

static void
single_text_key_hashing_prepare_for_batch(GroupingPolicyHash *policy, TupleTableSlot *vector_slot)
{
/*
* Determine whether we're going to use the dictionary for hashing.
*/
HashingStrategy *hashing = &policy->hashing;
hashing->use_key_index_for_dict = false;

BatchHashingParams params = build_batch_hashing_params(policy, vector_slot);
if (params.single_grouping_column.decompression_type != DT_ArrowTextDict)
{
return;
}

uint16 batch_rows;
const uint64 *row_filter = vector_slot_get_qual_result(vector_slot, &batch_rows);

const int dict_rows = params.single_grouping_column.arrow->dictionary->length;
if (dict_rows > arrow_num_valid(row_filter, batch_rows))
{
return;
}

/*
* Remember which aggregation states have already existed, and which we have
* to initialize. State index zero is invalid.
*/
const uint32 last_initialized_key_index = params.hashing->last_used_key_index;
Assert(last_initialized_key_index <= policy->num_allocated_per_key_agg_states);

/*
* Initialize the array for storing the aggregate state offsets corresponding
* to a given batch row. We don't need the offsets for the previous batch
* that are currently stored there, so we don't need to use repalloc.
*/
if (dict_rows > hashing->num_key_index_for_dict)
{
if (hashing->key_index_for_dict != NULL)
{
pfree(hashing->key_index_for_dict);
}

const int new_size = Min(GLOBAL_MAX_ROWS_PER_COMPRESSION,
Max(dict_rows, hashing->num_key_index_for_dict * 2 + 1));
hashing->num_key_index_for_dict = new_size;

hashing->key_index_for_dict =
palloc(sizeof(hashing->key_index_for_dict[0]) * hashing->num_key_index_for_dict);
}

/*
* We shouldn't add the dictionary entries that are not used by any matching
* rows. Translate the batch filter bitmap to a filter bitmap for dictionary
* rows, using an array of bools as an intermediate to avoid complicated bit
* operations.
*/
if (row_filter != NULL)
{
/*
* We'll use the key index storage as a temporary to avoid reallocations.
*/
StaticAssertDecl(sizeof(bool) <= sizeof(*hashing->key_index_for_dict),
"unexpected bool size");
bool *restrict dict_row_passes = (bool *) hashing->key_index_for_dict;
Assert(sizeof(*dict_row_passes) <= sizeof(*hashing->key_index_for_dict));
memset(dict_row_passes, 0, sizeof(*dict_row_passes) * dict_rows);

/*
* Determine for every dictionary row if it passes the filter, and store
* it as a bool array.
*/
int outer;
for (outer = 0; outer < batch_rows / 64; outer++)
{
#define INNER_LOOP(INNER_MAX) \
const uint64 word = row_filter[outer]; \
for (int inner = 0; inner < (INNER_MAX); inner++) \
{ \
const int16 index = \
((int16 *) params.single_grouping_column.buffers[3])[outer * 64 + inner]; \
dict_row_passes[index] = dict_row_passes[index] || (word & (1ull << inner)); \
}

INNER_LOOP(64)
}

if (batch_rows % 64)
{
INNER_LOOP(batch_rows % 64)
}
#undef INNER_LOOP

/*
* Convert the bool array into the bitmap.
*/
uint64 *restrict dict_filter_bitmap = policy->tmp_filter;
const size_t dict_filter_bitmap_words = (dict_rows + 63) / 64;
memset(dict_filter_bitmap, 0, sizeof(*dict_filter_bitmap) * dict_filter_bitmap_words);

for (outer = 0; outer < dict_rows / 64; outer++)
{
#define INNER_LOOP(INNER_MAX) \
uint64 word = 0; \
for (int inner = 0; inner < (INNER_MAX); inner++) \
{ \
word |= (dict_row_passes[outer * 64 + inner] ? 1ull : 0ull) << inner; \
} \
dict_filter_bitmap[outer] = word;

INNER_LOOP(64)
}
if (dict_rows % 64)
{
INNER_LOOP(dict_rows % 64)
}
#undef INNER_LOOP

params.batch_filter = dict_filter_bitmap;
}
else
{
params.batch_filter = NULL;
}

/*
* The dictionary contains no null entries, so we will be adding the null
* key separately. Determine if we have any null key that also passes the
* batch filter.
*/
bool have_null_key = false;
if (params.single_grouping_column.arrow->null_count > 0)
{
if (row_filter != NULL)
{
Assert(params.single_grouping_column.buffers[0] != NULL);
const size_t batch_words = (batch_rows + 63) / 64;
for (size_t i = 0; i < batch_words; i++)
{
const uint64 filtered_null_word =
row_filter[i] & (~((uint64 *) params.single_grouping_column.buffers[0])[i]);
have_null_key = (filtered_null_word != 0) || have_null_key;
}
}
else
{
Assert(params.single_grouping_column.buffers[0] != NULL);
have_null_key = true;
}
}

/*
* The dictionary doesn't store nulls, so add the null key separately if we
* have one. Note that we have to respect NULLS FIRST/LAST. Here we add the
* null key before all other keys in case of NULLS FIRST.
*/
const bool nulls_first = !arrow_row_is_valid(params.single_grouping_column.buffers[0], 0);
if (have_null_key && nulls_first && hashing->null_key_index == 0)
{
hashing->null_key_index = ++params.hashing->last_used_key_index;
hashing->output_keys[hashing->null_key_index] = PointerGetDatum(NULL);
}

/*
* Build key indexes for the dictionary entries as for normal non-nullable
* text values.
*/
Assert(params.single_grouping_column.decompression_type = DT_ArrowTextDict);
Assert(dict_rows <= hashing->num_key_index_for_dict);
memset(hashing->key_index_for_dict, 0, sizeof(*hashing->key_index_for_dict) * dict_rows);

params.single_grouping_column.decompression_type = DT_ArrowText;
params.single_grouping_column.buffers[0] = NULL;
params.result_key_indexes = hashing->key_index_for_dict;

single_text_fill_offsets_impl(params, 0, dict_rows);

/*
* The dictionary doesn't store nulls, so add the null key separately if we
* have one. Note that we have to respect NULLS FIRST/LAST. Here we add the
* null key after all other keys in case of NULLS LAST.
*/
if (have_null_key && !nulls_first && hashing->null_key_index == 0)
{
hashing->null_key_index = ++params.hashing->last_used_key_index;
hashing->output_keys[hashing->null_key_index] = PointerGetDatum(NULL);
}

/*
* Initialize the new keys if we added any.
*/
if (params.hashing->last_used_key_index > last_initialized_key_index)
{
const uint64 new_aggstate_rows = policy->num_allocated_per_key_agg_states * 2 + 1;
const int num_fns = policy->num_agg_defs;
for (int i = 0; i < num_fns; i++)
{
const VectorAggDef *agg_def = &policy->agg_defs[i];
if (params.hashing->last_used_key_index >= policy->num_allocated_per_key_agg_states)
{
policy->per_agg_per_key_states[i] =
repalloc(policy->per_agg_per_key_states[i],
new_aggstate_rows * agg_def->func.state_bytes);
}

/*
* Initialize the aggregate function states for the newly added keys.
*/
void *first_uninitialized_state =
agg_def->func.state_bytes * (last_initialized_key_index + 1) +
(char *) policy->per_agg_per_key_states[i];
agg_def->func.agg_init(first_uninitialized_state,
params.hashing->last_used_key_index -
last_initialized_key_index);
}

/*
* Record the newly allocated number of rows in case we had to reallocate.
*/
if (params.hashing->last_used_key_index >= policy->num_allocated_per_key_agg_states)
{
Assert(new_aggstate_rows > policy->num_allocated_per_key_agg_states);
policy->num_allocated_per_key_agg_states = new_aggstate_rows;
}
}

hashing->use_key_index_for_dict = true;

DEBUG_PRINT("computed the dict offsets\n");
}

static void
single_text_offsets_translate(BatchHashingParams params, int start_row, int end_row)
{
HashingStrategy *hashing = params.hashing;
Assert(hashing->use_key_index_for_dict);

uint32 *restrict indexes_for_rows = params.result_key_indexes;
uint32 *restrict indexes_for_dict = hashing->key_index_for_dict;

for (int row = start_row; row < end_row; row++)
{
const bool row_valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row);
const int16 dict_index = ((int16 *) params.single_grouping_column.buffers[3])[row];

if (row_valid)
{
indexes_for_rows[row] = indexes_for_dict[dict_index];
}
else
{
indexes_for_rows[row] = hashing->null_key_index;
}

Assert(indexes_for_rows[row] != 0 || !arrow_row_is_valid(params.batch_filter, row));
}
}

#undef APPLY_FOR_SPECIALIZATIONS
#undef APPLY_FOR_VALIDITY
#undef APPLY_FOR_BATCH_FILTER

#include "hash_strategy_impl.c"
11 changes: 10 additions & 1 deletion tsl/src/nodes/vector_agg/hashing/hashing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,17 @@ typedef struct HashingStrategy
*/
uint8 *tmp_key_storage;
uint64 num_tmp_key_storage_bytes;

/*
* For single text key that uses dictionary encoding, in some cases we first
* calculate the key indexes for the dictionary entries, and then translate
* it to the actual rows.
*/
uint32 *restrict key_index_for_dict;
int64 num_key_index_for_dict;
bool use_key_index_for_dict;
} HashingStrategy;

void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows);
void hash_strategy_output_key_alloc(HashingStrategy *hashing, uint16 nrows);
void hash_strategy_output_key_single_emit(GroupingPolicyHash *policy, uint32 current_key,
TupleTableSlot *aggregated_slot);
Loading
Loading