diff --git a/docs/en_US/images/query_tool_server_cursor_execute_menu.png b/docs/en_US/images/query_tool_server_cursor_execute_menu.png new file mode 100644 index 00000000000..94f5f8c6990 Binary files /dev/null and b/docs/en_US/images/query_tool_server_cursor_execute_menu.png differ diff --git a/docs/en_US/preferences.rst b/docs/en_US/preferences.rst index 64d73c26784..038953430ae 100644 --- a/docs/en_US/preferences.rst +++ b/docs/en_US/preferences.rst @@ -552,6 +552,10 @@ Use the fields on the *Options* panel to manage editor preferences. will warn upon clicking the *Execute Query* button in the query tool. The warning will appear only if *Underline query at cursor?* is set to *False*. +* When the *Use server cursor?* switch is set to *True*, the dataset will be fetched + using a server-side cursor after the query is executed. + + .. image:: images/preferences_sql_results_grid.png :alt: Preferences sql results grid section :align: center diff --git a/docs/en_US/query_tool.rst b/docs/en_US/query_tool.rst index afe6d94e14c..fba69a8cc8e 100644 --- a/docs/en_US/query_tool.rst +++ b/docs/en_US/query_tool.rst @@ -558,3 +558,31 @@ To execute a macro, simply select the appropriate shortcut keys, or select it fr .. image:: images/query_output_data.png :alt: Query Tool Macros Execution :align: center + + +Server Side Cursor +****************** + +Server-side cursors allow partial retrieval of large datasets, making them particularly useful when working with +very large result sets. However, they may offer lower performance in typical, everyday usage scenarios. + +To enable server-side cursors: + +* Go to Preferences > Query Tool > Options and set "Use server cursor?" to True. +* Alternatively, you can enable it on a per-session basis via the Query Tool’s Execute menu. + +.. image:: images/query_tool_server_cursor_execute_menu.png + :alt: Query Tool Server Cursor + :align: center + + +Limitations: + +1. Transaction Requirement: Server-side cursors work only in transaction mode. +If enabled pgAdmin will automatically ensure queries run within a transaction. + +2. Limited Use Case: Use server-side cursors only when fetching large datasets. + +3. Pagination Limitation: In the Result Grid, the First and Last page buttons will be disabled, +as server-side cursors do not return a total row count. Consequently, the total number of rows +will not be displayed after execution. diff --git a/web/pgadmin/static/js/components/PgTree/FileTreeItem/index.tsx b/web/pgadmin/static/js/components/PgTree/FileTreeItem/index.tsx index 54741504654..ea7b2fcc487 100644 --- a/web/pgadmin/static/js/components/PgTree/FileTreeItem/index.tsx +++ b/web/pgadmin/static/js/components/PgTree/FileTreeItem/index.tsx @@ -134,6 +134,7 @@ export class FileTreeItem extends React.Component => { + this.props.changeDirectoryCount(FileOrDir.parent); if(FileOrDir._loaded !== true) { this.events.dispatch(FileTreeXEvent.onTreeEvents, window.event, 'added', FileOrDir); diff --git a/web/pgadmin/static/js/helpers/ObjectExplorerToolbar.jsx b/web/pgadmin/static/js/helpers/ObjectExplorerToolbar.jsx index aca503f8150..fe931951748 100644 --- a/web/pgadmin/static/js/helpers/ObjectExplorerToolbar.jsx +++ b/web/pgadmin/static/js/helpers/ObjectExplorerToolbar.jsx @@ -73,7 +73,7 @@ export default function ObjectExplorerToolbar() { } menuItem={menus['query_tool']} shortcut={browserPref?.sub_menu_query_tool} /> - } menuItem={menus['view_all_rows_context'] ?? + } menuItem={menus['view_all_rows_context'] ?? {label :gettext('All Rows')}} shortcut={browserPref?.sub_menu_view_data} /> } menuItem={menus['view_filtered_rows_context'] ?? { label : gettext('Filtered Rows...')}} /> diff --git a/web/pgadmin/tools/sqleditor/__init__.py b/web/pgadmin/tools/sqleditor/__init__.py index 25cdf624b44..cd4be02d321 100644 --- a/web/pgadmin/tools/sqleditor/__init__.py +++ b/web/pgadmin/tools/sqleditor/__init__.py @@ -146,7 +146,8 @@ def get_exposed_url_endpoints(self): 'sqleditor.get_new_connection_user', 'sqleditor._check_server_connection_status', 'sqleditor.get_new_connection_role', - 'sqleditor.connect_server' + 'sqleditor.connect_server', + 'sqleditor.server_cursor', ] def on_logout(self): @@ -203,9 +204,15 @@ def initialize_viewdata(trans_id, cmd_type, obj_type, sgid, sid, did, obj_id): """ if request.data: - filter_sql = json.loads(request.data) + _data = json.loads(request.data) else: - filter_sql = request.args or request.form + _data = request.args or request.form + + filter_sql = _data['filter_sql'] if 'filter_sql' in _data else None + server_cursor = _data['server_cursor'] if\ + 'server_cursor' in _data and ( + _data['server_cursor'] == 'true' or _data['server_cursor'] is True + ) else False # Create asynchronous connection using random connection id. conn_id = str(secrets.choice(range(1, 9999999))) @@ -242,8 +249,9 @@ def initialize_viewdata(trans_id, cmd_type, obj_type, sgid, sid, did, obj_id): command_obj = ObjectRegistry.get_object( obj_type, conn_id=conn_id, sgid=sgid, sid=sid, did=did, obj_id=obj_id, cmd_type=cmd_type, - sql_filter=filter_sql + sql_filter=filter_sql, server_cursor=server_cursor ) + except ObjectGone: raise except Exception as e: @@ -354,6 +362,8 @@ def panel(trans_id): if 'database_name' in params: params['database_name'] = ( underscore_escape(params['database_name'])) + params['server_cursor'] = params[ + 'server_cursor'] if 'server_cursor' in params else False return render_template( "sqleditor/index.html", @@ -485,6 +495,8 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, dbname=None, **kwargs): kwargs['auto_commit'] = pref.preference('auto_commit').get() if kwargs.get('auto_rollback', None) is None: kwargs['auto_rollback'] = pref.preference('auto_rollback').get() + if kwargs.get('server_cursor', None) is None: + kwargs['server_cursor'] = pref.preference('server_cursor').get() try: conn = manager.connection(conn_id=conn_id, @@ -544,6 +556,7 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, dbname=None, **kwargs): # Set the value of auto commit and auto rollback specified in Preferences command_obj.set_auto_commit(kwargs['auto_commit']) command_obj.set_auto_rollback(kwargs['auto_rollback']) + command_obj.set_server_cursor(kwargs['server_cursor']) # Set the value of database name, that will be used later command_obj.dbname = dbname if dbname else None @@ -909,8 +922,15 @@ def start_view_data(trans_id): update_session_grid_transaction(trans_id, session_obj) + if trans_obj.server_cursor: + conn.release_async_cursor() + conn.execute_void("BEGIN;") + # Execute sql asynchronously - status, result = conn.execute_async(sql) + status, result = conn.execute_async( + sql, + server_cursor=trans_obj.server_cursor) + else: status = False result = error_msg @@ -947,6 +967,7 @@ def start_query_tool(trans_id): ) connect = 'connect' in request.args and request.args['connect'] == '1' + is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect) if is_error: return make_json_response(success=0, errormsg=errmsg, @@ -1209,6 +1230,7 @@ def poll(trans_id): 'transaction_status': transaction_status, 'data_obj': data_obj, 'pagination': pagination, + 'server_cursor': trans_obj.server_cursor, } ) @@ -1837,27 +1859,17 @@ def check_and_upgrade_to_qt(trans_id, connect): 'conn_id': data.conn_id } is_error, errmsg, _, _ = _init_sqleditor( - trans_id, connect, data.sgid, data.sid, data.did, **kwargs) + trans_id, connect, data.sgid, data.sid, data.did, + **kwargs) return is_error, errmsg -@blueprint.route( - '/auto_commit/', - methods=["PUT", "POST"], endpoint='auto_commit' -) -@pga_login_required -def set_auto_commit(trans_id): - """ - This method is used to set the value for auto commit . - - Args: - trans_id: unique transaction id - """ +def set_pref_options(trans_id, operation): if request.data: - auto_commit = json.loads(request.data) + _data = json.loads(request.data) else: - auto_commit = request.args or request.form + _data = request.args or request.form connect = 'connect' in request.args and request.args['connect'] == '1' @@ -1876,13 +1888,18 @@ def set_auto_commit(trans_id): info='DATAGRID_TRANSACTION_REQUIRED', status=404) - if status and conn is not None and \ - trans_obj is not None and session_obj is not None: + if (status and conn is not None and + trans_obj is not None and session_obj is not None): res = None - # Call the set_auto_commit method of transaction object - trans_obj.set_auto_commit(auto_commit) + if operation == 'auto_commit': + # Call the set_auto_commit method of transaction object + trans_obj.set_auto_commit(_data) + elif operation == 'auto_rollback': + trans_obj.set_auto_rollback(_data) + elif operation == 'server_cursor': + trans_obj.set_server_cursor(_data) # As we changed the transaction object we need to # restore it and update the session variable. @@ -1896,56 +1913,48 @@ def set_auto_commit(trans_id): @blueprint.route( - '/auto_rollback/', - methods=["PUT", "POST"], endpoint='auto_rollback' + '/auto_commit/', + methods=["PUT", "POST"], endpoint='auto_commit' ) @pga_login_required -def set_auto_rollback(trans_id): +def set_auto_commit(trans_id): """ This method is used to set the value for auto commit . Args: trans_id: unique transaction id """ - if request.data: - auto_rollback = json.loads(request.data) - else: - auto_rollback = request.args or request.form - - connect = 'connect' in request.args and request.args['connect'] == '1' - - is_error, errmsg = check_and_upgrade_to_qt(trans_id, connect) - if is_error: - return make_json_response(success=0, errormsg=errmsg, - info=ERROR_MSG_FAIL_TO_PROMOTE_QT, - status=404) - - # Check the transaction and connection status - status, error_msg, conn, trans_obj, session_obj = \ - check_transaction_status(trans_id) + return set_pref_options(trans_id, 'auto_commit') - if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND: - return make_json_response(success=0, errormsg=error_msg, - info='DATAGRID_TRANSACTION_REQUIRED', - status=404) - if status and conn is not None and \ - trans_obj is not None and session_obj is not None: +@blueprint.route( + '/auto_rollback/', + methods=["PUT", "POST"], endpoint='auto_rollback' +) +@pga_login_required +def set_auto_rollback(trans_id): + """ + This method is used to set the value for auto rollback . - res = None + Args: + trans_id: unique transaction id + """ + return set_pref_options(trans_id, 'auto_rollback') - # Call the set_auto_rollback method of transaction object - trans_obj.set_auto_rollback(auto_rollback) - # As we changed the transaction object we need to - # restore it and update the session variable. - session_obj['command_obj'] = pickle.dumps(trans_obj, -1) - update_session_grid_transaction(trans_id, session_obj) - else: - status = False - res = error_msg +@blueprint.route( + '/server_cursor/', + methods=["PUT", "POST"], endpoint='server_cursor' +) +@pga_login_required +def set_server_cursor(trans_id): + """ + This method is used to set the value for server cursor. - return make_json_response(data={'status': status, 'result': res}) + Args: + trans_id: unique transaction id + """ + return set_pref_options(trans_id, 'server_cursor') @blueprint.route( @@ -2181,12 +2190,18 @@ def start_query_download_tool(trans_id): if not sql: sql = trans_obj.get_sql(sync_conn) if sql and query_commited: + if trans_obj.server_cursor: + sync_conn.release_async_cursor() + sync_conn.execute_void("BEGIN;") # Re-execute the query to ensure the latest data is included - sync_conn.execute_async(sql) + sync_conn.execute_async(sql, server_cursor=trans_obj.server_cursor) # This returns generator of records. status, gen, conn_obj = \ sync_conn.execute_on_server_as_csv(records=10) + if trans_obj.server_cursor and query_commited: + sync_conn.execute_void("COMMIT;") + if not status: return make_json_response( data={ diff --git a/web/pgadmin/tools/sqleditor/command.py b/web/pgadmin/tools/sqleditor/command.py index 02965b2c628..08d875752b7 100644 --- a/web/pgadmin/tools/sqleditor/command.py +++ b/web/pgadmin/tools/sqleditor/command.py @@ -365,6 +365,8 @@ def __init__(self, **kwargs): self.limit = 100 self.thread_native_id = None + self.server_cursor = kwargs['server_cursor'] if\ + 'server_cursor' in kwargs else None def get_primary_keys(self, *args, **kwargs): return None, None @@ -425,6 +427,9 @@ def get_thread_native_id(self): def set_thread_native_id(self, thread_native_id): self.thread_native_id = thread_native_id + def set_server_cursor(self, server_cursor): + self.server_cursor = server_cursor + class TableCommand(GridCommand): """ @@ -816,6 +821,7 @@ def __init__(self, **kwargs): self.table_has_oids = False self.columns_types = None self.thread_native_id = None + self.server_cursor = False def get_sql(self, default_conn=None): return None @@ -917,6 +923,9 @@ def set_auto_rollback(self, auto_rollback): def set_auto_commit(self, auto_commit): self.auto_commit = auto_commit + def set_server_cursor(self, server_cursor): + self.server_cursor = server_cursor + def __set_updatable_results_attrs(self, sql_path, table_oid, conn): # Set template path for sql scripts and the table object id diff --git a/web/pgadmin/tools/sqleditor/static/js/SQLEditorModule.js b/web/pgadmin/tools/sqleditor/static/js/SQLEditorModule.js index 609cc545c15..c58ca6f9cb6 100644 --- a/web/pgadmin/tools/sqleditor/static/js/SQLEditorModule.js +++ b/web/pgadmin/tools/sqleditor/static/js/SQLEditorModule.js @@ -125,7 +125,7 @@ export default class SQLEditor { priority: 101, label: gettext('All Rows'), permission: AllPermissionTypes.TOOLS_QUERY_TOOL, - }, { + },{ name: 'view_first_100_rows_context_' + supportedNode, node: supportedNode, module: this, diff --git a/web/pgadmin/tools/sqleditor/static/js/components/QueryToolComponent.jsx b/web/pgadmin/tools/sqleditor/static/js/components/QueryToolComponent.jsx index dcfa293fae1..47e42f95836 100644 --- a/web/pgadmin/tools/sqleditor/static/js/components/QueryToolComponent.jsx +++ b/web/pgadmin/tools/sqleditor/static/js/components/QueryToolComponent.jsx @@ -130,12 +130,14 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN connected_once: false, connection_status: null, connection_status_msg: '', + server_cursor: preferencesStore.getPreferencesForModule('sqleditor').server_cursor === true, params: { ...params, title: _.unescape(params.title), is_query_tool: params.is_query_tool == 'true', node_name: retrieveNodeName(selectedNodeInfo), - dbname: _.unescape(params.database_name) || getDatabaseLabel(selectedNodeInfo) + dbname: _.unescape(params.database_name) || getDatabaseLabel(selectedNodeInfo), + server_cursor: preferencesStore.getPreferencesForModule('sqleditor').server_cursor === true, }, connection_list: [{ sgid: params.sgid, @@ -318,7 +320,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN setQtStatePartial({ editor_disabled: false }); }; - const initializeQueryTool = (password, explainObject=null, macroSQL='', executeCursor=false, reexecute=false)=>{ + const initializeQueryTool = (password, explainObject=null, macroSQL='', executeCursor=false, executeServerCursor=false, reexecute=false)=>{ let selectedConn = _.find(qtState.connection_list, (c)=>c.is_selected); let baseUrl = ''; if(qtState.params.is_query_tool) { @@ -336,12 +338,14 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN ...qtState.params, }); } + eventBus.current.fireEvent(QUERY_TOOL_EVENTS.SERVER_CURSOR, executeServerCursor); api.post(baseUrl, qtState.params.is_query_tool ? { user: selectedConn.user, role: selectedConn.role, password: password, dbname: selectedConn.database_name - } : qtState.params.sql_filter) + } : {sql_filter: qtState.params.sql_filter, + server_cursor: qtState.params.server_cursor}) .then(()=>{ setQtStatePartial({ connected: true, @@ -350,7 +354,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN }); //this condition works if user is in View/Edit Data or user does not saved server or tunnel password and disconnected the server and executing the query if(!qtState.params.is_query_tool || reexecute) { - eventBus.current.fireEvent(QUERY_TOOL_EVENTS.TRIGGER_EXECUTION, explainObject, macroSQL, executeCursor); + eventBus.current.fireEvent(QUERY_TOOL_EVENTS.TRIGGER_EXECUTION, explainObject, macroSQL, executeCursor, executeServerCursor); let msg = `${selectedConn['server_name']}/${selectedConn['database_name']} - Database connected`; pgAdmin.Browser.notifier.success(_.escape(msg)); } @@ -856,6 +860,7 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN api: api, modal: modal, params: qtState.params, + server_cursor: qtState.server_cursor, preferences: qtState.preferences, mainContainerRef: containerRef, editor_disabled: qtState.editor_disabled, @@ -892,7 +897,11 @@ export default function QueryToolComponent({params, pgWindow, pgAdmin, selectedN }; }); }, - }), [qtState.params, qtState.preferences, containerRef.current, qtState.editor_disabled, qtState.eol, qtState.current_file]); + updateServerCursor: (state) => { + setQtStatePartial(state); + }, + }), [qtState.params, qtState.preferences, containerRef.current, qtState.editor_disabled, qtState.eol, qtState.current_file, qtState.server_cursor]); + const queryToolConnContextValue = React.useMemo(()=>({ connected: qtState.connected, @@ -952,6 +961,7 @@ QueryToolComponent.propTypes = { bgcolor: PropTypes.string, fgcolor: PropTypes.string, is_query_tool: PropTypes.oneOfType([PropTypes.bool, PropTypes.string]).isRequired, + server_cursor: PropTypes.oneOfType([PropTypes.bool, PropTypes.string]), user: PropTypes.string, role: PropTypes.string, server_name: PropTypes.string, diff --git a/web/pgadmin/tools/sqleditor/static/js/components/sections/MainToolBar.jsx b/web/pgadmin/tools/sqleditor/static/js/components/sections/MainToolBar.jsx index da0e64ad50a..5d54969e495 100644 --- a/web/pgadmin/tools/sqleditor/static/js/components/sections/MainToolBar.jsx +++ b/web/pgadmin/tools/sqleditor/static/js/components/sections/MainToolBar.jsx @@ -46,7 +46,7 @@ const StyledBox = styled(Box)(({theme}) => ({ ...theme.mixins.panelBorder.bottom, })); -function autoCommitRollback(type, api, transId, value) { +function changeQueryExecutionSettings(type, api, transId, value) { let url = url_for(`sqleditor.${type}`, { 'trans_id': transId, }); @@ -123,8 +123,11 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT const checkMenuClick = useCallback((e)=>{ setCheckedMenuItems((prev)=>{ let newVal = !prev[e.value]; - if(e.value === 'auto_commit' || e.value === 'auto_rollback') { - autoCommitRollback(e.value, queryToolCtx.api, queryToolCtx.params.trans_id, newVal) + if (e.value === 'server_cursor') { + queryToolCtx.updateServerCursor({server_cursor: newVal}); + } + if(e.value === 'auto_commit' || e.value === 'auto_rollback' || e.value === 'server_cursor') { + changeQueryExecutionSettings(e.value, queryToolCtx.api, queryToolCtx.params.trans_id, newVal) .catch ((error)=>{ newVal = prev[e.value]; eventBus.fireEvent(QUERY_TOOL_EVENTS.HANDLE_API_ERROR, error, { @@ -264,8 +267,8 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT }; useEffect(()=>{ if(isInTxn()) { - setDisableButton('commit', false); - setDisableButton('rollback', false); + setDisableButton('commit', queryToolCtx.params.server_cursor && !queryToolCtx.params.is_query_tool ?true:false); + setDisableButton('rollback', queryToolCtx.params.server_cursor && !queryToolCtx.params.is_query_tool ?true:false); setDisableButton('execute-options', true); } else { setDisableButton('commit', true); @@ -338,6 +341,7 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT explain_settings: queryToolPref.explain_settings, explain_wal: queryToolPref.explain_wal, open_in_new_tab: queryToolPref.open_in_new_tab, + server_cursor: queryToolPref.server_cursor, }); } } @@ -625,6 +629,8 @@ export function MainToolBar({containerRef, onFilterClick, onManageMacros, onAddT onClick={checkMenuClick}>{gettext('Auto commit?')} {gettext('Auto rollback on error?')} + {gettext('Use server cursor?')} { const from = (pageNo-1) * pagination.page_size + 1; const to = from + pagination.page_size - 1; - eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, from, to); + eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, from, to, serverCursor); clearSelection(); }; @@ -205,16 +205,16 @@ function PaginationInputs({pagination, totalRowCount, clearSelection}) { /> : {gettext('Showing rows: %s to %s', inputs.from, inputs.to)}} - {editPageRange && eventBus.fireEvent(QUERY_TOOL_EVENTS.FETCH_WINDOW, inputs.from, inputs.to)} disabled={errorInputs.from || errorInputs.to} icon={} />} - setEditPageRange((prev)=>!prev)} icon={editPageRange ? : } - /> + />}
 
{gettext('Page No:')} @@ -228,15 +228,16 @@ function PaginationInputs({pagination, totalRowCount, clearSelection}) { value={inputs.pageNo} onChange={(value)=>onInputChange('pageNo', value)} onKeyDown={onInputKeydownPageNo} + disabled={serverCursor} error={errorInputs['pageNo']} /> {gettext('of')} {pagination.page_count}
 
- goToPage(1)} icon={}/> + goToPage(1)} icon={}/> goToPage(pagination.page_no-1)} icon={}/> - goToPage(pagination.page_no+1)} icon={}/> - goToPage(pagination.page_count)} icon={} /> + goToPage(pagination.page_no+1)} icon={}/> + goToPage(pagination.page_count)} icon={} /> ); @@ -245,6 +246,7 @@ PaginationInputs.propTypes = { pagination: PropTypes.object, totalRowCount: PropTypes.number, clearSelection: PropTypes.func, + serverCursor: PropTypes.bool, }; export function ResultSetToolbar({query, canEdit, totalRowCount, pagination, allRowsSelect}) { const eventBus = useContext(QueryToolEventsContext); @@ -450,7 +452,7 @@ export function ResultSetToolbar({query, canEdit, totalRowCount, pagination, all {totalRowCount > 0 && - + } { eventBus.registerListener(QUERY_TOOL_EVENTS.CURSOR_ACTIVITY, (newPos)=>{ @@ -82,6 +83,9 @@ export function StatusBar({eol, handleEndOfLineChange}) { eventBus.registerListener(QUERY_TOOL_EVENTS.SELECTED_ROWS_COLS_CELL_CHANGED, (rows)=>{ setSelectedRowsCount(rows); }); + eventBus.registerListener(QUERY_TOOL_EVENTS.SERVER_CURSOR, (server_cursor)=>{ + setServerCursor(server_cursor); + }); }, []); useEffect(()=>{ @@ -111,7 +115,7 @@ export function StatusBar({eol, handleEndOfLineChange}) { return ( - {gettext('Total rows: %s', rowsCount)} + {serverCursor && gettext('Query executed with server cursor')} {!serverCursor && gettext('Total rows: %s', rowsCount)} {lastTaskText && {lastTaskText} {hours.toString().padStart(2, '0')}:{minutes.toString().padStart(2, '0')}:{seconds.toString().padStart(2, '0')}.{msec.toString().padStart(3, '0')} } diff --git a/web/pgadmin/tools/sqleditor/static/js/show_view_data.js b/web/pgadmin/tools/sqleditor/static/js/show_view_data.js index e67fa8574e9..b576e0099b0 100644 --- a/web/pgadmin/tools/sqleditor/static/js/show_view_data.js +++ b/web/pgadmin/tools/sqleditor/static/js/show_view_data.js @@ -77,7 +77,8 @@ export function showViewData( connectionData, treeIdentifier, transId, - filter=false + filter=false, + server_cursor=false ) { const node = pgBrowser.tree.findNodeByDomElement(treeIdentifier); if (node === undefined || !node.getData()) { @@ -100,7 +101,7 @@ export function showViewData( return; } - const gridUrl = generateUrl(transId, connectionData, node.getData(), parentData); + const gridUrl = generateUrl(transId, connectionData, node.getData(), parentData, server_cursor); const queryToolTitle = generateViewDataTitle(pgBrowser, treeIdentifier); if(filter) { @@ -109,7 +110,7 @@ export function showViewData( showFilterDialog(pgBrowser, treeIdentifier, queryToolMod, transId, gridUrl, queryToolTitle, validateUrl); } else { - queryToolMod.launch(transId, gridUrl, false, queryToolTitle); + queryToolMod.launch(transId, gridUrl, false, queryToolTitle, {server_cursor: server_cursor}); } } @@ -145,7 +146,7 @@ export function retrieveNodeName(parentData) { return ''; } -function generateUrl(trans_id, connectionData, nodeData, parentData) { +function generateUrl(trans_id, connectionData, nodeData, parentData, server_cursor=false) { let url_endpoint = url_for('sqleditor.panel', { 'trans_id': trans_id, }); @@ -157,7 +158,8 @@ function generateUrl(trans_id, connectionData, nodeData, parentData) { +`&sgid=${parentData.server_group._id}` +`&sid=${parentData.server._id}` +`&did=${parentData.database._id}` - +`&server_type=${parentData.server.server_type}`; + +`&server_type=${parentData.server.server_type}` + +`&server_cursor=${server_cursor}`; if(!parentData.server.username && parentData.server.user?.name) { url_endpoint += `&user=${parentData.server.user?.name}`; diff --git a/web/pgadmin/tools/sqleditor/tests/test_server_cursor.py b/web/pgadmin/tools/sqleditor/tests/test_server_cursor.py new file mode 100644 index 00000000000..6f3f914c9aa --- /dev/null +++ b/web/pgadmin/tools/sqleditor/tests/test_server_cursor.py @@ -0,0 +1,111 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2025, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## +from pgadmin.utils.route import BaseTestGenerator +from pgadmin.browser.server_groups.servers.databases.tests import utils as \ + database_utils +from regression.python_test_utils import test_utils +import json +from pgadmin.utils import server_utils +import secrets +import config +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll + + +class TestExecuteServerCursor(BaseTestGenerator): + """ + This class validates download csv + """ + scenarios = [ + ( + 'Execute with server cursor', + dict( + sql='SELECT 1', + init_url='/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}', + ) + ) + ] + + def setUp(self): + self._db_name = 'server_cursor_' + str( + secrets.choice(range(10000, 65535))) + self._sid = self.server_information['server_id'] + + server_utils.connect_server(self, self._sid) + + self._did = test_utils.create_database( + self.server, self._db_name + ) + + # This method is responsible for initiating query hit at least once, + # so that download csv works + def initiate_sql_query_tool(self, trans_id, sql_query): + + # This code is to ensure to create a async cursor so that downloading + # csv can work. + # Start query tool transaction + + url = '/sqleditor/query_tool/start/{0}'.format(trans_id) + response = self.tester.post(url, data=json.dumps({"sql": sql_query}), + content_type='html/json') + self.assertEqual(response.status_code, 200) + + return async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format(trans_id)) + + def set_server_cursor(self, server_cursor): + _url = '/sqleditor/server_cursor/{0}'.format(self.trans_id) + res = self.tester.post(_url, data=json.dumps(server_cursor)) + self.assertEqual(res.status_code, 200) + + def runTest(self): + + db_con = database_utils.connect_database(self, + test_utils.SERVER_GROUP, + self._sid, + self._did) + if db_con["info"] != "Database connected.": + raise Exception("Could not connect to the database.") + + # Initialize query tool + self.trans_id = str(secrets.choice(range(1, 9999999))) + url = self.init_url.format( + self.trans_id, test_utils.SERVER_GROUP, self._sid, self._did) + res = self.tester.post(url, data=json.dumps({ + "dbname": self._db_name + })) + self.assertEqual(res.status_code, 200) + + self.set_server_cursor(True) + + response = self.initiate_sql_query_tool(self.trans_id, self.sql) + + self.assertEqual(response.status_code, 200) + _resp = json.loads(response.data.decode()) + self.assertTrue(_resp['data']['server_cursor']) + + self.set_server_cursor(False) + + # Close query tool + url = '/sqleditor/close/{0}'.format(self.trans_id) + response = self.tester.delete(url) + self.assertEqual(response.status_code, 200) + + database_utils.disconnect_database(self, self._sid, self._did) + + def tearDown(self): + main_conn = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) + test_utils.drop_database(main_conn, self._db_name) diff --git a/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py b/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py index edf6bc2f778..df0c1893b87 100644 --- a/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py +++ b/web/pgadmin/tools/sqleditor/utils/query_tool_preferences.py @@ -80,6 +80,17 @@ def register_query_tool_preferences(self): 'Tool tabs.') ) + self.server_cursor = self.preference.register( + 'Options', 'server_cursor', + gettext("Use server cursor?"), 'boolean', False, + category_label=PREF_LABEL_OPTIONS, + help_str=gettext('If set to True, the dataset will be fetched using a' + ' server-side cursor after the query is executed.' + ' This allows controlled data transfer to the client,' + ' enabling examination of large datasets without' + ' loading them entirely into memory.') + ) + self.show_prompt_save_query_changes = self.preference.register( 'Options', 'prompt_save_query_changes', gettext("Prompt to save unsaved query changes?"), 'boolean', True, diff --git a/web/pgadmin/tools/sqleditor/utils/start_running_query.py b/web/pgadmin/tools/sqleditor/utils/start_running_query.py index b11113c0f7b..c888667cbc8 100644 --- a/web/pgadmin/tools/sqleditor/utils/start_running_query.py +++ b/web/pgadmin/tools/sqleditor/utils/start_running_query.py @@ -101,7 +101,7 @@ def execute(self, sql, trans_id, http_session, connect=False): session_obj, effective_sql_statement, trans_id, - transaction_object + transaction_object, ) can_edit = transaction_object.can_edit() @@ -137,18 +137,20 @@ def __execute_query(self, conn, session_obj, sql, trans_id, trans_obj): StartRunningQuery.save_transaction_in_session(session_obj, trans_id, trans_obj) - # If auto commit is False and transaction status is Idle - # then call is_begin_not_required() function to check BEGIN - # is required or not. + if trans_obj.server_cursor and sql != 'COMMIT;' and sql != 'ROLLBACK;': + conn.release_async_cursor() if StartRunningQuery.is_begin_required_for_sql_query(trans_obj, - conn, sql): + conn, sql + ): conn.execute_void("BEGIN;") is_rollback_req = StartRunningQuery.is_rollback_statement_required( trans_obj, conn) + trans_obj.set_thread_native_id(None) + @copy_current_request_context def asyn_exec_query(conn, sql, trans_obj, is_rollback_req, app): @@ -156,9 +158,15 @@ def asyn_exec_query(conn, sql, trans_obj, is_rollback_req, # and formatted_error is True. with app.app_context(): try: - _, _ = conn.execute_async(sql) - # # If the transaction aborted for some reason and - # # Auto RollBack is True then issue a rollback to cleanup. + if trans_obj.server_cursor and (sql == 'COMMIT;' or + sql == 'ROLLBACK;'): + conn.execute_void(sql) + else: + _, _ = conn.execute_async( + sql, server_cursor=trans_obj.server_cursor) + # If the transaction aborted for some reason and + # Auto RollBack is True then issue a rollback + # to cleanup. if is_rollback_req: conn.execute_void("ROLLBACK;") except Exception as e: @@ -178,10 +186,12 @@ def asyn_exec_query(conn, sql, trans_obj, is_rollback_req, @staticmethod def is_begin_required_for_sql_query(trans_obj, conn, sql): - return (not trans_obj.auto_commit and - conn.transaction_status() == TX_STATUS_IDLE and - is_begin_required(sql) - ) + + return ((trans_obj.server_cursor and trans_obj.auto_commit) or ( + not trans_obj.auto_commit and + conn.transaction_status() == TX_STATUS_IDLE and + is_begin_required(sql) + )) @staticmethod def is_rollback_statement_required(trans_obj, conn): diff --git a/web/pgadmin/utils/driver/psycopg3/connection.py b/web/pgadmin/utils/driver/psycopg3/connection.py index d72736ebb86..51dcb8b6eb1 100644 --- a/web/pgadmin/utils/driver/psycopg3/connection.py +++ b/web/pgadmin/utils/driver/psycopg3/connection.py @@ -17,6 +17,7 @@ import secrets import datetime import asyncio +import copy from collections import deque import psycopg from flask import g, current_app @@ -30,7 +31,7 @@ from pgadmin.utils.exception import ConnectionLost, CryptKeyMissing from pgadmin.utils import get_complete_file_path from ..abstract import BaseConnection -from .cursor import DictCursor, AsyncDictCursor +from .cursor import DictCursor, AsyncDictCursor, AsyncDictServerCursor from .typecast import register_global_typecasters,\ register_string_typecasters, register_binary_typecasters, \ register_array_to_string_typecasters, ALL_JSON_TYPES @@ -186,6 +187,7 @@ def __init__(self, manager, conn_id, db, **kwargs): self.use_binary_placeholder = use_binary_placeholder self.array_to_string = array_to_string self.qtLiteral = get_driver(config.PG_DEFAULT_DRIVER).qtLiteral + self._autocommit = True super(Connection, self).__init__() @@ -358,6 +360,7 @@ async def connectdbserver(): prepare_threshold=manager.prepare_threshold ) pg_conn = asyncio.run(connectdbserver()) + pg_conn.server_cursor_factory = AsyncDictServerCursor else: pg_conn = psycopg.Connection.connect( connection_string, @@ -704,9 +707,10 @@ def __cursor(self, server_cursor=False, scrollable=False): self.conn_id.encode('utf-8') ), None) - if self.connected() and cur and not cur.closed and \ - (not server_cursor or (server_cursor and cur.name)): - return True, cur + if self.connected() and cur and not cur.closed: + if not server_cursor or ( + server_cursor and type(cur) is AsyncDictServerCursor): + return True, cur if not self.connected(): errmsg = "" @@ -732,8 +736,10 @@ def __cursor(self, server_cursor=False, scrollable=False): if server_cursor: # Providing name to cursor will create server side cursor. cursor_name = "CURSOR:{0}".format(self.conn_id) + self.conn.server_cursor_factory = AsyncDictServerCursor cur = self.conn.cursor( - name=cursor_name + name=cursor_name, + scrollable=scrollable ) else: cur = self.conn.cursor(scrollable=scrollable) @@ -893,7 +899,10 @@ def handle_null_values(results, replace_nulls_with): def gen(conn_obj, trans_obj, quote='strings', quote_char="'", field_separator=',', replace_nulls_with=None): - cur.scroll(0, mode='absolute') + try: + cur.scroll(0, mode='absolute') + except Exception as e: + print(str(e)) results = cur.fetchmany(records) if not results: yield gettext('The query executed did not return any data.') @@ -1037,7 +1046,15 @@ def execute_scalar(self, query, params=None, return True, None - def execute_async(self, query, params=None, formatted_exception_msg=True): + def release_async_cursor(self): + if self.__async_cursor and not self.__async_cursor.closed: + try: + self.__async_cursor.close_cursor() + except Exception as e: + print("EXception==", str(e)) + + def execute_async(self, query, params=None, formatted_exception_msg=True, + server_cursor=False): """ This function executes the given query asynchronously and returns result. @@ -1048,10 +1065,11 @@ def execute_async(self, query, params=None, formatted_exception_msg=True): formatted_exception_msg: if True then function return the formatted exception message """ - self.__async_cursor = None self.__async_query_error = None - status, cur = self.__cursor(scrollable=True) + + status, cur = self.__cursor(scrollable=True, + server_cursor=server_cursor) if not status: return False, str(cur) @@ -1501,7 +1519,7 @@ def poll(self, formatted_exception_msg=False, no_result=False): else: status = 1 - if not cur: + if not cur or cur.closed: return False, self.CURSOR_NOT_FOUND result = None @@ -1533,7 +1551,6 @@ def poll(self, formatted_exception_msg=False, no_result=False): result = [] try: result = cur.fetchall(_tupples=True) - except psycopg.ProgrammingError: result = None except psycopg.Error: diff --git a/web/pgadmin/utils/driver/psycopg3/cursor.py b/web/pgadmin/utils/driver/psycopg3/cursor.py index bdb25a06393..6b00dc8ace0 100644 --- a/web/pgadmin/utils/driver/psycopg3/cursor.py +++ b/web/pgadmin/utils/driver/psycopg3/cursor.py @@ -15,10 +15,9 @@ import asyncio from collections import OrderedDict -import psycopg from flask import g, current_app -from psycopg import Cursor as _cursor, AsyncCursor as _async_cursor -from typing import Any, Sequence +from psycopg import (Cursor as _cursor, AsyncCursor as _async_cursor, + AsyncServerCursor as _async_server_cursor) from psycopg.rows import dict_row, tuple_row from psycopg._encodings import py_codecs as encodings from .encoding import configure_driver_encodings @@ -220,6 +219,7 @@ class AsyncDictCursor(_async_cursor): def __init__(self, *args, **kwargs): self._odt_desc = None _async_cursor.__init__(self, *args, row_factory=dict_row) + self.cursor = _async_cursor def _dict_tuple(self, tup): """ @@ -234,8 +234,8 @@ def _ordered_description(self): Transform the regular description to wrapper object, which handles duplicate column name. """ - self._odt_desc = _async_cursor.__getattribute__(self, 'description') - pgresult = _async_cursor.__getattribute__(self, 'pgresult') + self._odt_desc = self.cursor.__getattribute__(self, 'description') + pgresult = self.cursor.__getattribute__(self, 'pgresult') desc = self._odt_desc if desc is None or len(desc) == 0: @@ -289,21 +289,21 @@ async def _execute(self, query, params=None): if params is not None and len(params) == 0: params = None - return await _async_cursor.execute(self, query, params) + return await self.cursor.execute(self, query, params) def executemany(self, query, params=None): """ Execute many function of regular cursor. """ self._odt_desc = None - return _async_cursor.executemany(self, query, params) + return self.cursor.executemany(self, query, params) async def _close_cursor(self): """ Close the cursor. """ - await _async_cursor.close(self) + await self.cursor.close(self) def close_cursor(self): """ @@ -328,13 +328,13 @@ async def _fetchmany(self, size=None): """ Fetch many tuples as ordered dictionary list. """ - return await _async_cursor.fetchmany(self, size) + return await self.cursor.fetchmany(self, size) async def _fetchall(self): """ Fetch all tuples as ordered dictionary list. """ - return await _async_cursor.fetchall(self) + return await self.cursor.fetchall(self) def fetchall(self, _tupples=False): """ @@ -353,7 +353,7 @@ async def _fetchone(self): """ Fetch all tuples as ordered dictionary list. """ - return await _async_cursor.fetchone(self) + return await self.cursor.fetchone(self) def fetchone(self): """ @@ -382,7 +382,7 @@ async def _scrollcur(self, position, mode): """ Fetch all tuples as ordered dictionary list. """ - return await _async_cursor.scroll(self, position, mode=mode) + return await self.cursor.scroll(self, position, mode=mode) def scroll(self, position, mode="absolute"): """ @@ -395,3 +395,15 @@ def get_rowcount(self): return self.pgresult.ntuples else: return -1 + + +class AsyncDictServerCursor(AsyncDictCursor, _async_server_cursor): + + def __init__(self, *args, name=None, **kwargs): + self._odt_desc = None + _async_server_cursor.__init__(self, name=name, *args, + row_factory=dict_row) + self.cursor = _async_server_cursor + + def get_rowcount(self): + return 1