-
Notifications
You must be signed in to change notification settings - Fork 48
Added update statement in JobOperator #1217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -81,6 +81,7 @@ case class JobOperator( | |||
|
|||
val statementExecutionManager = | |||
instantiateStatementExecutionManager(commandContext, resultIndex, osClient) | |||
statementExecutionManager.updateStatement(statement) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we inclose this in isWarmpoolEnabled
flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks for pointing this.
1ca7533
to
8905681
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add UT and IT to verify this change
I tried adding IT for non-warmpool jobs, but I am unable to test if statementExecutionManager.updateStatement(statement) is called for non-warmpool jobs since it is an intermittent state. I am getting a success state for the non-warm pool job, but it doesn't test the line that I added. It's not possible to write a JobOperator unit test without making code changes in JobOperator, as there are multiple methods which are initialised inside JobOperator, and they can't be mocked. Do let me know if you have any approach which I can take in writing UTs. PowerMockito can do this, but we haven't imported PowerMockito in the OpenSearch-Spark code. |
For UT, you can create JobOperator instance and override For IT, you can verify the statement change via getting it from the index itself |
This is the code change i made and I am getting NPE
Do let me know how I can resolve this. |
You need to mock the opensearch client call to return a fake doc. |
Even after mocking OSClient Method, I am getting same error.
|
Hi noCharger, Is there any reason that we didn't have any unit tests for JobOperator before this? Since I am creating the JobOperator unit test, I am facing multiple issues in mocking osClient. Since you have written the majority of JobOperator, did you face any such issues? If not, then why didn't we have any unit tests for job operator before this? |
You can refer to https://github.com/opensearch-project/opensearch-spark/blob/9aad67dc9e1f899f04509d3fe2dc709652ad4b92/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala and https://github.com/opensearch-project/opensearch-spark/blob/9aad67dc9e1f899f04509d3fe2dc709652ad4b92/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala |
Hi @noCharger , In the above example, sessionManager is passed a variable, and hence we are able to use osClient, which is passed in sessionManager, but in our case sessionManager is not passed, and hence we are unable to mock osClient. Reference : |
According to the stack trace, the invoke is from SingleStatementExecutionManager, which accepts osClient as an input. You can pass the mock. Line 19 in 25767b6
|
Even after entering the SingleStatementExecutionManager, I am getting the same error. The SingleStatementExecutionManager is created inside the JobOperator, and the osClient it uses is also instantiated within the JobOperator. This means that even if I mock osClient, the mock won’t be used, since mocking only works when the object is either passed into the function we want to test or injected into the class externally. |
This is because SingleStatementExecutionManager created is never used in your test case. If you check my previous comment, you need to override instantiateStatementExecutionManager #1217 (comment) |
Signed-off-by: Koustubh <thorkous@amazon.com>
Updated the test. |
mockQueryResultWriter | ||
} | ||
|
||
override def start(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why override start()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no other way to mock the query result writer and mock data frame, as they are initialised inside jobOperator only, and hence I had to do this.
override def writeDataFrameToOpensearch( | ||
resultData: DataFrame, | ||
resultIndex: String, | ||
osClient: OSClient): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it necessary to override this instead of mock the results?
(jobOperator.writeDataFrameToOpensearch _)
.expects(*, *, *)
.returning(())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't mock jobOperator, because we are testing that, and hence that would not be possible. writeDataFrameToOpenSearch internal calls osClient.doesIndexExist(resultIndex). I tried adding mockOSClient and mocking this step, but it's not mocking this for me.
Signed-off-by: Koustubh <thorkous@amazon.com> Co-authored-by: Koustubh <thorkous@amazon.com> (cherry picked from commit 57dccb8) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Koustubh <thorkous@amazon.com> Co-authored-by: Koustubh <thorkous@amazon.com> (cherry picked from commit 57dccb8) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Description
Added update statement in JobOperator. For warmpool jobs, we are persisting query status in WarmpoolJob.scala line 70 but we are not persisting query status for non-warmpool jobs.
Related Issues
List any issues this PR will resolve, e.g. Resolves [...].
Check List
--signoff
backport 0.x
label if it is a stable change which won't break existing featureBy submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.