Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ private static AbstractTable buildTable(Map<String, Object> properties) {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = typeFactory.builder();
addLeafFields(builder, typeFactory, properties, "");
// Virtual row ID column — always present in parquet files, computed by analytics backend.
// Only add if not already in the mapping.
if (!properties.containsKey("__row_id__")) {
builder.add("__row_id__", typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.BIGINT), true));
}
return builder.build();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.analytics.spi;

import org.apache.arrow.memory.BufferAllocator;
import org.opensearch.analytics.backend.EngineResultStream;
import org.opensearch.index.engine.exec.IndexReaderProvider;

import java.util.List;

/**
Expand Down Expand Up @@ -119,4 +123,23 @@ default void configureFilterDelegation(FilterDelegationHandle handle, BackendExe
* Called after {@link #configureFilterDelegation}. Pass {@code null} to clear.
*/
default void setDelegationThreadTracker(DelegationThreadTracker tracker) {}

/**
* QTF fetch phase: reads specific rows by global row ID.
* Row IDs are passed as a BigIntVector for zero-copy transfer to native.
*
* @param reader the index reader for the target shard
* @param rowIdVector Arrow BigIntVector containing global row IDs
* @param columns column names to read
* @param allocator Arrow buffer allocator for result import
* @return a result stream containing the requested rows
*/
default EngineResultStream fetchByRowIds(
IndexReaderProvider.Reader reader,
org.apache.arrow.vector.BigIntVector rowIdVector,
String[] columns,
BufferAllocator allocator
) {
throw new UnsupportedOperationException("fetchByRowIds not implemented for [" + name() + "]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,64 @@ private ShardScanExecutionContext buildContext(
return ctx;
}

/**
* QTF fetch phase: retrieves specific rows by global row ID.
*/
public org.opensearch.analytics.exec.action.FetchByRowIdsResponse executeFetchByRowIds(
org.opensearch.analytics.exec.action.FetchByRowIdsRequest request,
IndexShard shard,
AnalyticsShardTask task
) {
try {
IndexReaderProvider readerProvider = shard.getReaderProvider();
if (readerProvider == null) {
throw new IllegalStateException("No ReaderProvider on " + shard.shardId());
}
try (GatedCloseable<Reader> gatedReader = readerProvider.acquireReader()) {
long[] rowIds = request.getRowIds();
org.apache.arrow.vector.BigIntVector rowIdVector = new org.apache.arrow.vector.BigIntVector("__row_id__", allocator);
rowIdVector.allocateNew(rowIds.length);
for (int i = 0; i < rowIds.length; i++) {
rowIdVector.set(i, rowIds[i]);
}
rowIdVector.setValueCount(rowIds.length);

AnalyticsSearchBackendPlugin backend = backends.values().iterator().next();
EngineResultStream stream = backend.fetchByRowIds(gatedReader.get(), rowIdVector, request.getColumns(), allocator);

// Serialize stream to Arrow IPC bytes
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
org.apache.arrow.vector.ipc.WriteChannel channel =
new org.apache.arrow.vector.ipc.WriteChannel(java.nio.channels.Channels.newChannel(baos));
org.apache.arrow.vector.types.pojo.Schema schema = null;
int totalRows = 0;
java.util.Iterator<org.opensearch.analytics.backend.EngineResultBatch> it = stream.iterator();
while (it.hasNext()) {
org.opensearch.analytics.backend.EngineResultBatch batch = it.next();
org.apache.arrow.vector.VectorSchemaRoot root = batch.getArrowRoot();
try {
if (schema == null) {
schema = root.getSchema();
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(channel, schema);
}
try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch rb =
new org.apache.arrow.vector.VectorUnloader(root).getRecordBatch()) {
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(channel, rb);
}
totalRows += root.getRowCount();
} finally {
root.close();
}
}
if (schema != null) {
org.apache.arrow.vector.ipc.ArrowStreamWriter.writeEndOfStream(
channel, org.apache.arrow.vector.ipc.message.IpcOption.DEFAULT);
}
rowIdVector.close();
return new org.opensearch.analytics.exec.action.FetchByRowIdsResponse(baos.toByteArray(), totalRows);
}
} catch (Exception e) {
throw new RuntimeException("Failed to execute fetch-by-row-ids on " + shard.shardId(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.analytics.exec;

import org.opensearch.analytics.backend.EngineResultBatch;
import org.opensearch.analytics.exec.action.FetchByRowIdsAction;
import org.opensearch.analytics.exec.action.FetchByRowIdsRequest;
import org.opensearch.analytics.exec.action.FetchByRowIdsResponse;
import org.opensearch.analytics.exec.action.FragmentExecutionAction;
import org.opensearch.analytics.exec.action.FragmentExecutionArrowResponse;
import org.opensearch.analytics.exec.action.FragmentExecutionRequest;
Expand Down Expand Up @@ -69,6 +72,7 @@ public AnalyticsSearchTransportService(
this.transportService = streamTransportService;
this.clusterService = clusterService;
registerStreamingFragmentHandler(this.transportService, searchService, indicesService);
registerFetchHandler(this.transportService, searchService, indicesService);
}

private static void registerStreamingFragmentHandler(
Expand Down Expand Up @@ -189,4 +193,67 @@ public void handleException(TransportException e) {
}
});
}

// ── QTF Fetch ────────────────────────────────────────────────────────────────

private static void registerFetchHandler(
StreamTransportService transportService,
AnalyticsSearchService searchService,
IndicesService indicesService
) {
transportService.registerRequestHandler(
FetchByRowIdsAction.NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
FetchByRowIdsRequest::new,
(request, channel, task) -> {
IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id());
FetchByRowIdsResponse response = searchService.executeFetchByRowIds(request, shard, (AnalyticsShardTask) task);
channel.sendResponse(response);
}
);
}

public void dispatchFetch(
FetchByRowIdsRequest request,
DiscoveryNode targetNode,
StreamingResponseListener<FetchByRowIdsResponse> listener,
Task parentTask
) {
try {
Transport.Connection connection = getConnection(null, targetNode.getId());
transportService.sendChildRequest(
connection,
FetchByRowIdsAction.NAME,
request,
parentTask,
TransportRequestOptions.EMPTY,
new TransportResponseHandler<FetchByRowIdsResponse>() {
@Override
public FetchByRowIdsResponse read(StreamInput in) throws IOException {
return new FetchByRowIdsResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(FetchByRowIdsResponse response) {
listener.onStreamResponse(response, true);
}

@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}
}
);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class QueryContext {
private final List<AnalyticsOperationListener> operationListeners;
private volatile BufferAllocator bufferAllocator;
private boolean closed; // guarded by `this`
private final List<org.opensearch.analytics.planner.dag.ShardExecutionTarget> resolvedShardTargets = new java.util.ArrayList<>();

public QueryContext(QueryDAG dag, Executor searchExecutor, AnalyticsQueryTask parentTask) {
this(dag, searchExecutor, parentTask, DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS, DEFAULT_PER_QUERY_MEMORY_LIMIT, List.of());
Expand Down Expand Up @@ -144,6 +145,10 @@ public void closeBufferAllocator() {
}
}

public List<org.opensearch.analytics.planner.dag.ShardExecutionTarget> getResolvedShardTargets() {
return resolvedShardTargets;
}

// ─── Test factories ────────────────────────────────────────────────

/** Creates a test context with a synchronous executor. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private PlanWalker createWalker(
opListener.onQueryFailure(queryId, e);
listener.onFailure(e);
});

return new PlanWalker(config, stageExecutionBuilder, wrapped);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.exec.action;

import org.opensearch.action.ActionType;

/**
* Transport action for QTF fetch phase: fetches specific rows by global row ID.
*/
public class FetchByRowIdsAction extends ActionType<FetchByRowIdsResponse> {

public static final String NAME = "indices:data/read/analytics/fetch_by_row_ids";
public static final FetchByRowIdsAction INSTANCE = new FetchByRowIdsAction();

private FetchByRowIdsAction() {
super(NAME, FetchByRowIdsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.exec.action;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.opensearch.arrow.flight.transport.ArrowBatchResponse;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Streaming Arrow response for the QTF fetch phase.
* Carries a single Arrow batch from the data node back to the coordinator
* via the streaming transport — zero-copy, no IPC serialization.
*
* @opensearch.internal
*/
public class FetchByRowIdsArrowResponse extends ArrowBatchResponse {

public FetchByRowIdsArrowResponse(VectorSchemaRoot root) {
super(root);
}

public FetchByRowIdsArrowResponse(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.exec.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.analytics.exec.task.AnalyticsShardTask;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.Map;

/**
* Transport request for QTF fetch phase.
* Carries global row IDs and column names to the data node for targeted row retrieval.
*/
public class FetchByRowIdsRequest extends ActionRequest {

private final String queryId;
private final ShardId shardId;
private final long[] rowIds;
private final String[] columns;

public FetchByRowIdsRequest(String queryId, ShardId shardId, long[] rowIds, String[] columns) {
this.queryId = queryId;
this.shardId = shardId;
this.rowIds = rowIds;
this.columns = columns;
}

public FetchByRowIdsRequest(StreamInput in) throws IOException {
super(in);
this.queryId = in.readString();
this.shardId = new ShardId(in);
this.rowIds = in.readLongArray();
this.columns = in.readStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(queryId);
shardId.writeTo(out);
out.writeLongArray(rowIds);
out.writeStringArray(columns);
}

public String getQueryId() {
return queryId;
}

public ShardId getShardId() {
return shardId;
}

public long[] getRowIds() {
return rowIds;
}

public String[] getColumns() {
return columns;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new AnalyticsShardTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
public String getDescription() {
return "fetch_by_row_ids{query=" + queryId + ", shard=" + shardId + ", rows=" + rowIds.length + "}";
}
}
Loading
Loading