@@ -2004,6 +2004,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable,
20042004 /* Store the hash value in the HashJoinTuple header. */
20052005 hashTuple -> hashvalue = hashvalue ;
20062006 memcpy (HJTUPLE_MINTUPLE (hashTuple ), tuple , tuple -> t_len );
2007+ HeapTupleHeaderClearMatch (HJTUPLE_MINTUPLE (hashTuple ));
20072008
20082009 /* Push it onto the front of the bucket's list */
20092010 ExecParallelHashPushTuple (& hashtable -> buckets .shared [bucketno ],
@@ -2388,6 +2389,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
23882389 hjstate -> hj_CurTuple = NULL ;
23892390}
23902391
2392+ /*
2393+ * Decide if this process is allowed to run the unmatched scan. If so, the
2394+ * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
2395+ * Otherwise the batch is detached and false is returned.
2396+ */
2397+ bool
2398+ ExecParallelPrepHashTableForUnmatched (HashJoinState * hjstate )
2399+ {
2400+ HashJoinTable hashtable = hjstate -> hj_HashTable ;
2401+ int curbatch = hashtable -> curbatch ;
2402+ ParallelHashJoinBatch * batch = hashtable -> batches [curbatch ].shared ;
2403+
2404+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBING );
2405+
2406+ /*
2407+ * It would not be deadlock-free to wait on the batch barrier, because it
2408+ * is in PHJ_BATCH_PROBING phase, and thus processes attached to it have
2409+ * already emitted tuples. Therefore, we'll hold a wait-free election:
2410+ * only one process can continue to the next phase, and all others detach
2411+ * from this batch. They can still go any work on other batches, if there
2412+ * are any.
2413+ */
2414+ if (!BarrierArriveAndDetachExceptLast (& batch -> batch_barrier ))
2415+ {
2416+ /* This process considers the batch to be done. */
2417+ hashtable -> batches [hashtable -> curbatch ].done = true;
2418+
2419+ /* Make sure any temporary files are closed. */
2420+ sts_end_parallel_scan (hashtable -> batches [curbatch ].inner_tuples );
2421+ sts_end_parallel_scan (hashtable -> batches [curbatch ].outer_tuples );
2422+
2423+ /*
2424+ * Track largest batch we've seen, which would normally happen in
2425+ * ExecHashTableDetachBatch().
2426+ */
2427+ hashtable -> spacePeak =
2428+ Max (hashtable -> spacePeak ,
2429+ batch -> size + sizeof (dsa_pointer_atomic ) * hashtable -> nbuckets );
2430+ hashtable -> curbatch = -1 ;
2431+ return false;
2432+ }
2433+
2434+ /* Now we are alone with this batch. */
2435+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_SCAN );
2436+ Assert (BarrierParticipants (& batch -> batch_barrier ) == 1 );
2437+
2438+ /*
2439+ * Has another process decided to give up early and command all processes
2440+ * to skip the unmatched scan?
2441+ */
2442+ if (batch -> skip_unmatched )
2443+ {
2444+ hashtable -> batches [hashtable -> curbatch ].done = true;
2445+ ExecHashTableDetachBatch (hashtable );
2446+ return false;
2447+ }
2448+
2449+ /* Now prepare the process local state, just as for non-parallel join. */
2450+ ExecPrepHashTableForUnmatched (hjstate );
2451+
2452+ return true;
2453+ }
2454+
23912455/*
23922456 * ExecScanHashTableForUnmatched
23932457 * scan the hash table for unmatched inner tuples
@@ -2462,6 +2526,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
24622526 return false;
24632527}
24642528
2529+ /*
2530+ * ExecParallelScanHashTableForUnmatched
2531+ * scan the hash table for unmatched inner tuples, in parallel join
2532+ *
2533+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
2534+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
2535+ * for the latter.
2536+ */
2537+ bool
2538+ ExecParallelScanHashTableForUnmatched (HashJoinState * hjstate ,
2539+ ExprContext * econtext )
2540+ {
2541+ HashJoinTable hashtable = hjstate -> hj_HashTable ;
2542+ HashJoinTuple hashTuple = hjstate -> hj_CurTuple ;
2543+
2544+ for (;;)
2545+ {
2546+ /*
2547+ * hj_CurTuple is the address of the tuple last returned from the
2548+ * current bucket, or NULL if it's time to start scanning a new
2549+ * bucket.
2550+ */
2551+ if (hashTuple != NULL )
2552+ hashTuple = ExecParallelHashNextTuple (hashtable , hashTuple );
2553+ else if (hjstate -> hj_CurBucketNo < hashtable -> nbuckets )
2554+ hashTuple = ExecParallelHashFirstTuple (hashtable ,
2555+ hjstate -> hj_CurBucketNo ++ );
2556+ else
2557+ break ; /* finished all buckets */
2558+
2559+ while (hashTuple != NULL )
2560+ {
2561+ if (!HeapTupleHeaderHasMatch (HJTUPLE_MINTUPLE (hashTuple )))
2562+ {
2563+ TupleTableSlot * inntuple ;
2564+
2565+ /* insert hashtable's tuple into exec slot */
2566+ inntuple = ExecStoreMinimalTuple (HJTUPLE_MINTUPLE (hashTuple ),
2567+ hjstate -> hj_HashTupleSlot ,
2568+ false); /* do not pfree */
2569+ econtext -> ecxt_innertuple = inntuple ;
2570+
2571+ /*
2572+ * Reset temp memory each time; although this function doesn't
2573+ * do any qual eval, the caller will, so let's keep it
2574+ * parallel to ExecScanHashBucket.
2575+ */
2576+ ResetExprContext (econtext );
2577+
2578+ hjstate -> hj_CurTuple = hashTuple ;
2579+ return true;
2580+ }
2581+
2582+ hashTuple = ExecParallelHashNextTuple (hashtable , hashTuple );
2583+ }
2584+
2585+ /* allow this loop to be cancellable */
2586+ CHECK_FOR_INTERRUPTS ();
2587+ }
2588+
2589+ /*
2590+ * no more unmatched tuples
2591+ */
2592+ return false;
2593+ }
2594+
24652595/*
24662596 * ExecHashTableReset
24672597 *
@@ -3793,6 +3923,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
37933923 accessor -> shared = shared ;
37943924 accessor -> preallocated = 0 ;
37953925 accessor -> done = false;
3926+ accessor -> outer_eof = false;
37963927 accessor -> inner_tuples =
37973928 sts_attach (ParallelHashJoinBatchInner (shared ),
37983929 hashtable -> hjstate -> worker_id ,
@@ -3838,25 +3969,63 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
38383969 {
38393970 int curbatch = hashtable -> curbatch ;
38403971 ParallelHashJoinBatch * batch = hashtable -> batches [curbatch ].shared ;
3972+ bool attached = true;
38413973
38423974 /* Make sure any temporary files are closed. */
38433975 sts_end_parallel_scan (hashtable -> batches [curbatch ].inner_tuples );
38443976 sts_end_parallel_scan (hashtable -> batches [curbatch ].outer_tuples );
38453977
3846- /* Detach from the batch we were last working on. */
3978+ /* After attaching we always get at least to PHJ_BATCH_PROBING. */
3979+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBING ||
3980+ BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_SCAN );
3981+
3982+ /*
3983+ * If we're abandoning the PHJ_BATCH_PROBING phase early without having
3984+ * reached the end of it, it means the plan doesn't want any more
3985+ * tuples, and it is happy to abandon any tuples buffered in this
3986+ * process's subplans. For correctness, we can't allow any process to
3987+ * execute the PHJ_BATCH_SCAN phase, because we will never have the
3988+ * complete set of match bits. Therefore we skip emitting unmatched
3989+ * tuples in all backends (if this is a full/right join), as if those
3990+ * tuples were all due to be emitted by this process and it has
3991+ * abandoned them too.
3992+ */
38473993 /*
38483994 * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware)
38493995 * If phs_lasj_has_null is true, that means we have found null when building hash table,
38503996 * there were no batches to detach.
38513997 */
3852- if (!hashtable -> parallel_state -> phs_lasj_has_null && BarrierArriveAndDetach (& batch -> batch_barrier ))
3998+ if (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBING &&
3999+ !hashtable -> parallel_state -> phs_lasj_has_null && /* CBDB_PARALLEL */
4000+ !hashtable -> batches [curbatch ].outer_eof )
4001+ {
4002+ /*
4003+ * This flag may be written to by multiple backends during
4004+ * PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN
4005+ * phase so requires no extra locking.
4006+ */
4007+ batch -> skip_unmatched = true;
4008+ }
4009+
4010+ /*
4011+ * Even if we aren't doing a full/right outer join, we'll step through
4012+ * the PHJ_BATCH_SCAN phase just to maintain the invariant that
4013+ * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
4014+ */
4015+ if (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBING &&
4016+ !hashtable -> parallel_state -> phs_lasj_has_null /* CBDB_PARALLEL */ )
4017+ attached = BarrierArriveAndDetachExceptLast (& batch -> batch_barrier );
4018+ if (attached && !hashtable -> parallel_state -> phs_lasj_has_null /* CBDB_PARALLEL */ &&
4019+ BarrierArriveAndDetach (& batch -> batch_barrier ))
38534020 {
38544021 /*
3855- * Technically we shouldn't access the barrier because we're no
3856- * longer attached, but since there is no way it's moving after
3857- * this point it seems safe to make the following assertion.
4022+ * We are not longer attached to the batch barrier, but we're the
4023+ * process that was chosen to free resources and it's safe to
4024+ * assert the current phase. The ParallelHashJoinBatch can't go
4025+ * away underneath us while we are attached to the build barrier,
4026+ * making this access safe.
38584027 */
3859- Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_DONE );
4028+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_FREE );
38604029
38614030 /* Free shared chunks and buckets. */
38624031 while (DsaPointerIsValid (batch -> chunks ))
0 commit comments