-
Notifications
You must be signed in to change notification settings - Fork 593
HDDS-14379. Implement basic Hadoop OM client proxy provider to read from followers #9641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
cc: @greenwich |
|
@ivandika3 , thanks for working on this! I am reviewing this. The change is quite big. Need some time. |
szetszwo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivandika3 , thanks a lot for working on this and adding a lot of tests! Please see the comments inlined.
BTW, filed HDDS-14455 and HDDS-14470 for improving the current code.
| protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) { | ||
| if (omProxyInfo.proxy == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronized is for omProxyInfo.proxy. We probably should do it in OMProxyInfo. Let me fix it in HDDS-14470.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will wait until HDDS-14470 is merged.
| new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol)); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to wrappedProxy. Then it would only suppress the warning there but not the entire method.
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFollowerReadFailoverProxyProvider.java
@@ -103,7 +103,6 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(
new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol));
}
- @SuppressWarnings("unchecked")
public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol,
HadoopRpcOMFailoverProxyProvider<T> failoverProxy) throws IOException {
this.protocolClass = protocol;
@@ -119,6 +118,7 @@ public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T>
combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo);
}
combinedInfo.append(']');
+ @SuppressWarnings("unchecked")
T wrappedProxy = (T) Proxy.newProxyInstance(
FollowerReadInvocationHandler.class.getClassLoader(),
new Class<?>[] {protocol}, new FollowerReadInvocationHandler());There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| StringBuilder combinedInfo = new StringBuilder("["); | ||
| for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) { | ||
| if (i > 0) { | ||
| combinedInfo.append(','); | ||
| } | ||
| combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo); | ||
| } | ||
| combinedInfo.append(']'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may use "map/reduce".
final String combinedInfo = "[" + failoverProxy.getOMProxies().stream()
.map(a -> a.proxyInfo)
.reduce((a, b) -> a + ", " + b).orElse("") + "]";There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For simplicity, let support only OzoneManagerProtocolPB for now? Then, we can remove the followerReadEnabled and simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Could you clarify on this?
Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.
Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.
Please let me know what you think.
| private static OMRequest parseOMRequest(Object[] args) throws Throwable { | ||
| if (args == null || args.length < 2 || !(args[1] instanceof Message)) { | ||
| LOG.error("Request failed since OM request is null and cannot be parsed"); | ||
| // Throws a non-retriable exception to prevent retry and failover | ||
| // See the HddsUtils#shouldNotFailoverOnRpcException used in | ||
| // OMFailoverProxyProviderBase#shouldFailover | ||
| throw wrapInServiceException( | ||
| new RpcNoSuchProtocolException("OM request is null and cannot be parsed")); | ||
| } | ||
| final Message theRequest = (Message) args[1]; | ||
| return (OMRequest) theRequest; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's provide a more specific error message:
private static OMRequest parseOMRequest(Object[] args) throws ServiceException {
final String error = args == null ? "args == null"
: args.length < 2 ? "args.length == " + args.length + " < 2"
: !(args[1] instanceof OMRequest) ? "Non-OMRequest: " + args[1].getClass()
: null;
if (error != null) {
// Throws a non-retriable exception to prevent retry and failover
// See the HddsUtils#shouldNotFailoverOnRpcException used in
// OMFailoverProxyProviderBase#shouldFailover
throw wrapInServiceException(new RpcNoSuchProtocolException("Failed to parseOMRequest: " + error));
}
return (OMRequest) args[1];
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this to handle null args[1]. I also changed this to use normal if else blocks since I personally find chained ternary operators hard to read.
| public Object invoke(Object proxy, final Method method, final Object[] args) | ||
| throws Throwable { | ||
| lastProxy = null; | ||
| if (method.getDeclaringClass() == Object.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check (method.getDeclaringClass() != OzoneManagerProtocolPB.class) instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, better. Updated, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted this due to failing test.
The actual Method#getDeclaringClass is org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface so all submitRequest will go to this block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I tried to change the proxy to be pass through if it's not OzoneManagerProtocolPB.class
From
return method.invoke(this, args);to
return method.invoke(proxy, args);However, it seems because org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos$OzoneManagerService$BlockingInterface is not concrete class, it throws exception when usineg Object#toString.
Caused by: java.lang.NoSuchMethodException: org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB.toString()
Not sure if there is a way to handle all cases.
| private static Throwable wrapInServiceException(Throwable e) { | ||
| if (e instanceof ServiceException) { | ||
| return e; | ||
| } | ||
| return new ServiceException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throws ServiceException.
private static void throwServiceException(Throwable e) throws ServiceException {
throw e instanceof ServiceException ? (ServiceException) e : new ServiceException(e);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.
| * | ||
| * @return parsed OM request. | ||
| */ | ||
| private static OMRequest parseOMRequest(Object[] args) throws Throwable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throws ServiceException instead of Throwable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| * Whether reading from follower is enabled. If this is false, all read | ||
| * requests will still go to OM leader. | ||
| */ | ||
| private volatile boolean followerReadEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename it to useFollowerRead to avoid confusion with the conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| OMRequest omRequest = parseOMRequest(args); | ||
| if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) { | ||
| int failedCount = 0; | ||
| for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check followerReadEnabled: followerReadEnabled && i < failoverProxy.getOmNodesInOrder().size()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
ivandika3
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo Thanks for the review.
| protected synchronized ProxyInfo<T> createOMProxyIfNeeded(OMProxyInfo<T> omProxyInfo) { | ||
| if (omProxyInfo.proxy == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will wait until HDDS-14470 is merged.
| * Whether reading from follower is enabled. If this is false, all read | ||
| * requests will still go to OM leader. | ||
| */ | ||
| private volatile boolean followerReadEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| new HadoopRpcOMFailoverProxyProvider<>(configuration, ugi, omServiceId, protocol)); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| StringBuilder combinedInfo = new StringBuilder("["); | ||
| for (int i = 0; i < failoverProxy.getOMProxies().size(); i++) { | ||
| if (i > 0) { | ||
| combinedInfo.append(','); | ||
| } | ||
| combinedInfo.append(failoverProxy.getOMProxies().get(i).proxyInfo); | ||
| } | ||
| combinedInfo.append(']'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| * | ||
| * @return parsed OM request. | ||
| */ | ||
| private static OMRequest parseOMRequest(Object[] args) throws Throwable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| private static OMRequest parseOMRequest(Object[] args) throws Throwable { | ||
| if (args == null || args.length < 2 || !(args[1] instanceof Message)) { | ||
| LOG.error("Request failed since OM request is null and cannot be parsed"); | ||
| // Throws a non-retriable exception to prevent retry and failover | ||
| // See the HddsUtils#shouldNotFailoverOnRpcException used in | ||
| // OMFailoverProxyProviderBase#shouldFailover | ||
| throw wrapInServiceException( | ||
| new RpcNoSuchProtocolException("OM request is null and cannot be parsed")); | ||
| } | ||
| final Message theRequest = (Message) args[1]; | ||
| return (OMRequest) theRequest; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| public Object invoke(Object proxy, final Method method, final Object[] args) | ||
| throws Throwable { | ||
| lastProxy = null; | ||
| if (method.getDeclaringClass() == Object.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, better. Updated, thanks.
| OMRequest omRequest = parseOMRequest(args); | ||
| if (followerReadEnabled && OmUtils.shouldSendToFollower(omRequest)) { | ||
| int failedCount = 0; | ||
| for (int i = 0; i < failoverProxy.getOmNodesInOrder().size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| private static Throwable wrapInServiceException(Throwable e) { | ||
| if (e instanceof ServiceException) { | ||
| return e; | ||
| } | ||
| return new ServiceException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated. However, retVal needs to be initialized to null since the compiler cannot detect that throwServiceException always throws exception (since it might think that it simply returns). So I have initialized retVal to null.
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public HadoopRpcOMFollowerReadFailoverProxyProvider(String omServiceId, Class<T> protocol, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Could you clarify on this?
Currently there are guards to ensure that follower read only supports OzoneManagerProtocolPB. One is during construction by disabling useFollowerRead if it's not OzoneManagerProtocolPB. Second is during FollowerReadInvocationHandler.
Do you mean to make HadoopRpcOMFollowerReadFailoverProxyProvider to be HadoopRpcOMFollowerReadFailoverProxyProvider<OzoneManagerProtocolPB> implements FailoverProxy<OzoneManagerProtocolPB>? I believe this might mean that we need to replace T generic type parameter with OzoneManagerProtocolPB. This adds verbosity and reduces the generic benefit, not sure if this is a good tradeoff for removing userFollowerRead.
Please let me know what you think.
# Conflicts: # hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java # hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmTestUtil.java # hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithAllRunning.java # hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
This reverts commit d804991.
What changes were proposed in this pull request?
This task is to come up with the basic implementation of follower read client proxy as a baseline before further performance improvements. The idea is to simply pick an OM node in random (which can be a leader or follower) and use it to submit read requests. The read requests need to keep sending to that OM node unless the OM is down which triggers failover. Write requests should be sent to the OM leader directly.
Further improvements such as followers affinity or picking OM based on latency, applied index, etc will be implemented in follow up tasks. The main focus of this patch is to ensure that long-lived client (e.g. S3G Ozone clients) will stick to the OM follower once it picks it as the current proxy. In the previous leader proxy provider implementation, the client only read from followers until a new write request triggers OMNotLeaderException and the failover causes proxy to always be pointing to the leader.
The implementation is to introduce HadoopRpcOMFollowerReadProxyProvider which wraps
HadoopRpcOMFailoverProxyProvider. FollowerReadProxyProvider tracks a different currentOmNodeId from HadoopRpcOMFailoverProxyProvider. FollowerReadInvocationHandler will check whether the request is a read request (using OmUtils#isReadOnly) and if so forwards it to its current proxy. If it's a write request, the request if forwarded to HadoopRpcOMFailoverProxyProvider to be sent to the leader.
So the proxy hierarchy (each with its own InvocationHandler) is
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14379
How was this patch tested?
UT and IT.
Clean CI: https://github.com/ivandika3/ozone/actions/runs/21102966224