import logging
import uuid
from threading import RLock
from hazelcast.errors import HazelcastError
from hazelcast.future import Future, ImmediateFuture
from hazelcast.invocation import Invocation
from hazelcast.util import (
UUIDUtil,
to_millis,
check_true,
get_attr_name,
try_to_get_error_message,
check_is_number,
check_is_int,
try_to_get_enum_value,
)
from hazelcast import six
_logger = logging.getLogger(__name__)
[docs]class SqlService(object):
"""A service to execute SQL statements.
The service allows you to query data stored in a
:class:`Map <hazelcast.proxy.map.Map>`.
Warnings:
The service is in beta state. Behavior and API might change
in future releases.
**Querying an IMap**
Every Map instance is exposed as a table with the same name in the
``partitioned`` schema. The ``partitioned`` schema is included into
a default search path, therefore a Map could be referenced in an
SQL statement with or without the schema name.
**Column resolution**
Every table backed by a Map has a set of columns that are resolved
automatically. Column resolution uses Map entries located on the
member that initiates the query. The engine extracts columns from a
key and a value and then merges them into a single column set.
In case the key and the value have columns with the same name, the
key takes precedence.
Columns are extracted from objects as follows (which happens on the
server-side):
- For non-Portable objects, public getters and fields are used to
populate the column list. For getters, the first letter is converted
to lower case. A getter takes precedence over a field in case of naming
conflict.
- For :class:`Portable <hazelcast.serialization.api.Portable>` objects,
field names used in the
:func:`write_portable() <hazelcast.serialization.api.Portable.write_portable>`
method are used to populate the column list.
The whole key and value objects could be accessed through special fields
``__key`` and ``this``, respectively. If key (value) object has fields,
then the whole key (value) field is exposed as a normal field. Otherwise the
field is hidden. Hidden fields can be accessed directly, but are not returned
by ``SELECT * FROM ...`` queries.
Consider the following key/value model: ::
class PersonKey(Portable):
def __init__(self, person_id=None, department_id=None):
self.person_id = person_id
self.department_id = department_id
def write_portable(self, writer):
writer.write_long("person_id", self.person_id)
writer.write_long("department_id", self.department_id)
...
class Person(Portable):
def __init__(self, name=None):
self.name = name
def write_portable(self, writer):
writer.write_string("name", self.name)
...
This model will be resolved to the following table columns:
- person_id ``BIGINT``
- department_id ``BIGINT``
- name ``VARCHAR``
- __key ``OBJECT`` (hidden)
- this ``OBJECT`` (hidden)
**Consistency**
Results returned from Map query are weakly consistent:
- If an entry was not updated during iteration, it is guaranteed to be
returned exactly once
- If an entry was modified during iteration, it might be returned zero,
one or several times
**Usage**
When a query is executed, an :class:`SqlResult` is returned. You may get
row iterator from the result. The result must be closed at the end. The
iterator will close the result automatically when it is exhausted given
that no error is raised during the iteration. The code snippet below
demonstrates a typical usage pattern: ::
client = hazelcast.HazelcastClient()
result = client.sql.execute("SELECT * FROM person")
for row in result:
print(row.get_object("person_id"))
print(row.get_object("name"))
...
See the documentation of the :class:`SqlResult` for more information about
different iteration methods.
Notes:
When an SQL statement is submitted to a member, it is parsed and
optimized by the ``hazelcast-sql`` module. The ``hazelcast-sql`` must
be in the classpath, otherwise an exception will be thrown. If you're
using the ``hazelcast-all`` or ``hazelcast-enterprise-all`` packages, the
``hazelcast-sql`` module is included in them by default. If not, i.e., you
are using ``hazelcast`` or ``hazelcast-enterprise``, then you need to have
``hazelcast-sql`` in the classpath. If you are using the Docker image,
the SQL module is included by default.
"""
def __init__(self, internal_sql_service):
self._service = internal_sql_service
[docs] def execute(self, sql, *params):
"""Convenient method to execute a distributed query with the given
parameters.
Converts passed SQL string and parameters into an :class:`SqlStatement`
object and invokes :func:`execute_statement`.
Args:
sql (str): SQL string.
*params: Query parameters that will be passed to
:func:`SqlStatement.add_parameter`.
Returns:
SqlResult: The execution result.
Raises:
HazelcastSqlError: In case of execution error.
AssertionError: If the SQL parameter is not a string.
ValueError: If the SQL parameter is an empty string.
"""
return self._service.execute(sql, *params)
[docs] def execute_statement(self, statement):
"""Executes an SQL statement.
Args:
statement (SqlStatement): Statement to be executed
Returns:
SqlResult: The execution result.
Raises:
HazelcastSqlError: In case of execution error.
"""
return self._service.execute_statement(statement)
class _SqlQueryId(object):
"""Cluster-wide unique query ID."""
__slots__ = ("member_id_high", "member_id_low", "local_id_high", "local_id_low")
def __init__(self, member_id_high, member_id_low, local_id_high, local_id_low):
self.member_id_high = member_id_high
"""int: Most significant bits of the UUID of the member
that the query will route to.
"""
self.member_id_low = member_id_low
"""int: Least significant bits of the UUID of the member
that the query will route to."""
self.local_id_high = local_id_high
"""int: Most significant bits of the UUID of the local id."""
self.local_id_low = local_id_low
"""int: Least significant bits of the UUID of the local id."""
@classmethod
def from_uuid(cls, member_uuid):
"""Generates a local random UUID and creates a query id
out of it and the given member UUID.
Args:
member_uuid (uuid.UUID): UUID of the member.
Returns:
_SqlQueryId: Generated unique query id.
"""
local_id = uuid.uuid4()
member_msb, member_lsb = UUIDUtil.to_bits(member_uuid)
local_msb, local_lsb = UUIDUtil.to_bits(local_id)
return cls(member_msb, member_lsb, local_msb, local_lsb)
class _SqlError(object):
"""Server-side error that is propagated to the client."""
__slots__ = ("code", "message", "originating_member_uuid")
def __init__(self, code, message, originating_member_uuid):
self.code = code
"""_SqlErrorCode: The error code."""
self.message = message
"""str: The error message."""
self.originating_member_uuid = originating_member_uuid
"""uuid.UUID: UUID of the member that caused or initiated an error condition."""
class _SqlPage(object):
"""A finite set of rows returned to the user."""
__slots__ = ("_column_types", "_columns", "_is_last")
def __init__(self, column_types, columns, last):
self._column_types = column_types
self._columns = columns
self._is_last = last
@property
def row_count(self):
"""int: Number of rows in the page."""
# Each column should have equal number of rows.
# Just check the first one.
return len(self._columns[0])
@property
def column_count(self):
"""int: Number of columns."""
return len(self._column_types)
@property
def is_last(self):
"""bool: Whether this is the last page or not."""
return self._is_last
def get_value(self, column_index, row_index):
"""
Args:
column_index (int):
row_index (int):
Returns:
The value with the given indexes.
"""
return self._columns[column_index][row_index]
[docs]class SqlColumnType(object):
VARCHAR = 0
"""
Represented by ``str``.
"""
BOOLEAN = 1
"""
Represented by ``bool``.
"""
TINYINT = 2
"""
Represented by ``int``.
"""
SMALLINT = 3
"""
Represented by ``int``.
"""
INTEGER = 4
"""
Represented by ``int``.
"""
BIGINT = 5
"""
Represented by ``int`` (for Python 3) or ``long`` (for Python 2).
"""
DECIMAL = 6
"""
Represented by ``str``.
"""
REAL = 7
"""
Represented by ``float``.
"""
DOUBLE = 8
"""
Represented by ``float``.
"""
DATE = 9
"""
Represented by ``str`` with the ``YYYY-MM-DD`` format.
"""
TIME = 10
"""
Represented by ``str`` with the ``HH:MM:SS[.ffffff]`` format.
"""
TIMESTAMP = 11
"""
Represented by ``str`` with the ``YYYY-MM-DDTHH:MM:SS[.ffffff]`` format.
"""
TIMESTAMP_WITH_TIME_ZONE = 12
"""
Represented by ``str`` with the
``YYYY-MM-DDTHH:MM:SS[.ffffff](+|-)HH:MM[:SS]`` format.
"""
OBJECT = 13
"""
Could be represented by any Python class.
"""
NULL = 14
"""
The type of the generic SQL ``NULL`` literal.
The only valid value of ``NULL`` type is ``None``.
"""
class _SqlErrorCode(object):
GENERIC = -1
"""
Generic error.
"""
CONNECTION_PROBLEM = 1001
"""
A network connection problem between members, or between a client and a member.
"""
CANCELLED_BY_USER = 1003
"""
Query was cancelled due to user request.
"""
TIMEOUT = 1004
"""
Query was cancelled due to timeout.
"""
PARTITION_DISTRIBUTION = 1005
"""
A problem with partition distribution.
"""
MAP_DESTROYED = 1006
"""
An error caused by a concurrent destroy of a map.
"""
MAP_LOADING_IN_PROGRESS = 1007
"""
Map loading is not finished yet.
"""
PARSING = 1008
"""
Generic parsing error.
"""
INDEX_INVALID = 1009
"""
An error caused by an attempt to query an index that is not valid.
"""
DATA_EXCEPTION = 2000
"""
An error with data conversion or transformation.
"""
[docs]class HazelcastSqlError(HazelcastError):
"""Represents an error occurred during the SQL query execution."""
def __init__(self, originating_member_uuid, code, message, cause):
super(HazelcastSqlError, self).__init__(message, cause)
self._originating_member_uuid = originating_member_uuid
# TODO: This is private API, might be good to make it public or
# remove this information altogether.
self._code = code
@property
def originating_member_uuid(self):
"""uuid.UUID: UUID of the member that caused or initiated an error condition."""
return self._originating_member_uuid
[docs]class SqlRow(object):
"""One of the rows of an SQL query result."""
__slots__ = ("_row_metadata", "_row")
def __init__(self, row_metadata, row):
self._row_metadata = row_metadata
self._row = row
[docs] def get_object(self, column_name):
"""Gets the value in the column indicated by the column name.
Column name should be one of those defined in :class:`SqlRowMetadata`,
case-sensitive. You may also use :func:`SqlRowMetadata.find_column` to
test for column existence.
The type of the returned value depends on the SQL type of the column.
No implicit conversions are performed on the value.
Args:
column_name (str):
Returns:
Value of the column.
Raises:
ValueError: If a column with the given name does not exist.
AssertionError: If the column name is not a string.
See Also:
:attr:`metadata`
:func:`SqlRowMetadata.find_column`
:attr:`SqlColumnMetadata.type`
:attr:`SqlColumnMetadata.name`
"""
index = self._row_metadata.find_column(column_name)
if index == SqlRowMetadata.COLUMN_NOT_FOUND:
raise ValueError("Column '%s' doesn't exist" % column_name)
return self._row[index]
[docs] def get_object_with_index(self, column_index):
"""Gets the value of the column by index.
The class of the returned value depends on the SQL type of the column.
No implicit conversions are performed on the value.
Args:
column_index (int): Zero-based column index.
Returns:
Value of the column.
Raises:
IndexError: If the column index is out of bounds.
AssertionError: If the column index is not an integer.
See Also:
:attr:`metadata`
:attr:`SqlColumnMetadata.type`
"""
check_is_int(column_index, "Column index must be an integer")
return self._row[column_index]
@property
def metadata(self):
"""SqlRowMetadata: The row metadata."""
return self._row_metadata
def __repr__(self):
return "[%s]" % ", ".join(
"%s %s=%s"
% (
self._row_metadata.get_column(i).name,
get_attr_name(SqlColumnType, self._row_metadata.get_column(i).type),
self._row[i],
)
for i in range(self._row_metadata.column_count)
)
class _ExecuteResponse(object):
"""Represent the response of the first execute request."""
__slots__ = ("row_metadata", "row_page", "update_count")
def __init__(self, row_metadata, row_page, update_count):
self.row_metadata = row_metadata
"""SqlRowMetadata: Row metadata or None, if the response only
contains update count."""
self.row_page = row_page
"""_SqlPage: First page of the query response or None, if the
response only contains update count.
"""
self.update_count = update_count
"""int: Update count or -1 if the result is a rowset."""
class _IteratorBase(object):
"""Base class for the blocking and Future-producing
iterators to use."""
__slots__ = (
"row_metadata",
"fetch_fn",
"deserialize_fn",
"page",
"row_count",
"position",
"is_last",
)
def __init__(self, row_metadata, fetch_fn, deserialize_fn):
self.row_metadata = row_metadata
"""SqlRowMetadata: Row metadata."""
self.fetch_fn = fetch_fn
"""function: Fetches the next page. It produces a Future[_SqlPage]."""
self.deserialize_fn = deserialize_fn
"""function: Deserializes the value."""
self.page = None
"""_SqlPage: Current page."""
self.row_count = 0
"""int: Number of rows in the current page."""
self.position = 0
"""int: Index of the next row in the page."""
self.is_last = False
"""bool: Whether this is the last page or not."""
def on_next_page(self, page):
"""
Called when a new page is fetched or on the
initialization of the iterator to update its
internal state.
Args:
page (_SqlPage):
"""
self.page = page
self.row_count = page.row_count
self.is_last = page.is_last
self.position = 0
def _get_current_row(self):
"""
Returns:
list: The row pointed by the current position.
"""
# The column might contain user objects so we have to deserialize it.
# Deserialization is no-op if the value is not Data.
return [
self.deserialize_fn(self.page.get_value(i, self.position))
for i in range(self.page.column_count)
]
class _FutureProducingIterator(_IteratorBase):
"""An iterator that produces infinite stream of Futures. It is the
responsibility of the user to either call them in blocking fashion,
or call ``next`` only if the current call to next did not raise
``StopIteration`` error (possibly with callback-based code).
"""
def __iter__(self):
return self
def next(self):
# Defined for backward-compatibility with Python 2.
return self.__next__()
def __next__(self):
return self._has_next().continue_with(self._has_next_continuation)
def _has_next_continuation(self, future):
"""Based on the call to :func:`_has_next`, either
raises ``StopIteration`` error or gets the current row
and returns it.
Args:
future (hazelcast.future.Future):
Returns:
SqlRow:
"""
has_next = future.result()
if not has_next:
# Iterator is exhausted, raise this to inform the user.
# If the user continues to call next, we will continuously
# raise this.
raise StopIteration
row = self._get_current_row()
self.position += 1
return SqlRow(self.row_metadata, row)
def _has_next(self):
"""Returns a Future indicating whether there are more rows
left to iterate.
Returns:
hazelcast.future.Future:
"""
if self.position == self.row_count:
# We exhausted the current page.
if self.is_last:
# This was the last page, no row left
# on the server side.
return ImmediateFuture(False)
# It seems that there are some rows left on the server.
# Fetch them, and then return.
return self.fetch_fn().continue_with(self._fetch_continuation)
# There are some elements left in the current page.
return ImmediateFuture(True)
def _fetch_continuation(self, future):
"""After a new page is fetched, updates the internal state
of the iterator and returns whether or not there are some
rows in the fetched page.
Args:
future (hazelcast.future.Future):
Returns:
hazelcast.future.Future:
"""
page = future.result()
self.on_next_page(page)
return self._has_next()
class _BlockingIterator(_IteratorBase):
"""An iterator that blocks when the current page is exhausted
and we need to fetch a new page from the server. Otherwise,
it returns immediately with an object.
This version is more performant than the Future-producing
counterpart in a sense that, it does not box everything with
a Future object.
"""
def __iter__(self):
return self
def next(self):
# Defined for backward-compatibility with Python 2.
return self.__next__()
def __next__(self):
if not self._has_next():
# No more rows are left.
raise StopIteration
row = self._get_current_row()
self.position += 1
return SqlRow(self.row_metadata, row)
def _has_next(self):
while self.position == self.row_count:
# We exhausted the current page.
if self.is_last:
# No more rows left on the server.
return False
# Block while waiting for the next page.
page = self.fetch_fn().result()
# Update the internal state with the next page.
self.on_next_page(page)
# There are some rows left in the current page.
return True
[docs]class SqlResult(object):
"""SQL query result.
Depending on the statement type it represents a stream of
rows or an update count.
To iterate over the stream of rows, there are two possible options.
The first, and the easiest one is to iterate over the rows
in a blocking fashion. ::
result = client.sql.execute("SELECT ...")
for row in result:
# Process the row.
print(row)
The second option is to use the non-blocking API with callbacks. ::
result = client.sql.execute("SELECT ...")
it = result.iterator() # Future of iterator
def on_iterator_response(iterator_future):
iterator = iterator_future.result()
def on_next_row(row_future):
try:
row = row_future.result()
# Process the row.
print(row)
# Iterate over the next row.
next(iterator).add_done_callback(on_next_row)
except StopIteration:
# Exhausted the iterator. No more rows are left.
pass
next(iterator).add_done_callback(on_next_row)
it.add_done_callback(on_iterator_response)
When in doubt, use the blocking API shown in the first code sample.
Note that, iterators can be requested at most once per SqlResult.
One can call :func:`close` method of a result object to
release the resources associated with the result on the server side.
It might also be used to cancel query execution on the server side
if it is still active.
When the blocking API is used, one might also use ``with``
statement to automatically close the query even if an exception
is thrown in the iteration. ::
with client.sql.execute("SELECT ...") as result:
for row in result:
# Process the row.
print(row)
To get the number of rows updated by the query, use the
:func:`update_count`. ::
update_count = client.sql.execute("SELECT ...").update_count().result()
One does not have to call :func:`close` in this case, because the result
will already be closed in the server-side.
"""
def __init__(self, sql_service, connection, query_id, cursor_buffer_size, execute_future):
self._sql_service = sql_service
"""_InternalSqlService: Reference to the SQL service."""
self._connection = connection
"""hazelcast.connection.Connection: Reference to the connection
that the execute request is made to."""
self._query_id = query_id
"""_SqlQueryId: Unique id of the SQL query."""
self._cursor_buffer_size = cursor_buffer_size
"""int: Size of the cursor buffer measured in the number of rows."""
self._lock = RLock()
"""RLock: Protects the shared access to instance variables below."""
self._execute_response = Future()
"""Future: Will be resolved with :class:`_ExecuteResponse` once the
execute request is resolved."""
self._iterator_requested = False
"""bool: Flag that shows whether an iterator is already requested."""
self._closed = False
"""bool: Flag that shows whether the query execution is still active
on the server side. When ``True``, there is no need to send the "close"
request to the server."""
self._fetch_future = None
"""Future: Will be set, if there are more pages to fetch on the server
side. It should be set to ``None`` once the fetch is completed."""
execute_future.add_done_callback(self._handle_execute_response)
[docs] def iterator(self):
"""Returns the iterator over the result rows.
The iterator may be requested only once.
The returned Future results with:
- :class:`HazelcastSqlError`: In case of an SQL execution error.
- **ValueError**: If the result only contains an update count, or the
iterator is already requested.
Returns:
Future[Iterator[Future[SqlRow]]]: Iterator that produces Future
of :class:`SqlRow` s. See the class documentation for the correct
way to use this.
"""
return self._get_iterator(False)
[docs] def is_row_set(self):
"""Returns whether this result has rows to iterate.
The returned Future results with:
- :class:`HazelcastSqlError`: In case of an SQL execution error.
Returns:
Future[bool]:
"""
def continuation(future):
response = future.result()
# By design, if the row_metadata (or row_page) is None,
# we only got the update count.
return response.row_metadata is not None
return self._execute_response.continue_with(continuation)
[docs] def update_count(self):
"""Returns the number of rows updated by the statement or ``-1`` if this
result is a row set. In case the result doesn't contain rows but the
update count isn't applicable or known, ``0`` is returned.
The returned Future results with:
- :class:`HazelcastSqlError`: In case of an SQL execution error.
Returns:
Future[int]:
"""
def continuation(future):
response = future.result()
# This will be set to -1, when we got row set on the client side.
# See _on_execute_response.
return response.update_count
return self._execute_response.continue_with(continuation)
[docs] def close(self):
"""Release the resources associated with the query result.
The query engine delivers the rows asynchronously. The query may
become inactive even before all rows are consumed. The invocation
of this command will cancel the execution of the query on all members
if the query is still active. Otherwise it is no-op. For a result
with an update count it is always no-op.
The returned Future results with:
- :class:`HazelcastSqlError`: In case there is an error closing the
result.
Returns:
Future[None]:
"""
with self._lock:
if self._closed:
# Do nothing if the result is already closed.
return ImmediateFuture(None)
error = HazelcastSqlError(
self._sql_service.get_client_id(),
_SqlErrorCode.CANCELLED_BY_USER,
"Query was cancelled by the user",
None,
)
if not self._execute_response.done():
# If the cancellation is initiated before the first response is
# received, then throw cancellation errors on the dependent
# methods (update count, row metadata, iterator).
self._on_execute_error(error)
if not self._fetch_future:
# Make sure that all subsequent fetches will fail.
self._fetch_future = Future()
self._on_fetch_error(error)
def wrap_error_on_failure(f):
# If the close request is failed somehow,
# wrap it in a HazelcastSqlError.
try:
return f.result()
except Exception as e:
raise self._sql_service.re_raise(e, self._connection)
self._closed = True
# Send the close request
return self._sql_service.close(self._connection, self._query_id).continue_with(
wrap_error_on_failure
)
def __iter__(self):
# Get blocking iterator, and wait for the
# first page.
return self._get_iterator(True).result()
def _get_iterator(self, should_get_blocking):
"""Gets the iterator after the execute request finishes.
Args:
should_get_blocking (bool): Whether to get a blocking iterator.
Returns:
Future[Iterator]:
"""
def continuation(future):
response = future.result()
with self._lock:
if not response.row_metadata:
# Can't get an iterator when we only have update count
raise ValueError("This result contains only update count")
if self._iterator_requested:
# Can't get an iterator when we already get one
raise ValueError("Iterator can be requested only once")
self._iterator_requested = True
if should_get_blocking:
iterator = _BlockingIterator(
response.row_metadata,
self._fetch_next_page,
self._sql_service.deserialize_object,
)
else:
iterator = _FutureProducingIterator(
response.row_metadata,
self._fetch_next_page,
self._sql_service.deserialize_object,
)
# Pass the first page information to the iterator
iterator.on_next_page(response.row_page)
return iterator
return self._execute_response.continue_with(continuation)
def _fetch_next_page(self):
"""Fetches the next page, if there is no fetch request
in-flight.
Returns:
Future[_SqlPage]:
"""
with self._lock:
if self._fetch_future:
# A fetch request is already in-flight, return it.
return self._fetch_future
future = Future()
self._fetch_future = future
self._sql_service.fetch(
self._connection, self._query_id, self._cursor_buffer_size
).add_done_callback(self._handle_fetch_response)
# Need to return future, not self._fetch_future, because through
# some unlucky timing, we might call _handle_fetch_response
# before returning, which could set self._fetch_future to
# None.
return future
def _handle_fetch_response(self, future):
"""Handles the result of the fetch request, by either:
- setting it to exception, so that the future calls to
fetch fails immediately.
- setting it to next page, and setting self._fetch_future
to None so that the next fetch request might actually
try to fetch something from the server.
Args:
future (Future): The response from the server for
the fetch request.
"""
try:
response = future.result()
response_error = self._handle_response_error(response["error"])
if response_error:
# There is a server side error sent to client.
self._on_fetch_error(response_error)
return
# The result contains the next page, as expected.
self._on_fetch_response(response["row_page"])
except Exception as e:
# Something went bad, we couldn't get response from
# the server, invocation failed.
self._on_fetch_error(self._sql_service.re_raise(e, self._connection))
def _on_fetch_error(self, error):
"""Sets the fetch future with exception, but not resetting it
so that the next fetch request fails immediately.
Args:
error (Exception): The error.
"""
with self._lock:
self._fetch_future.set_exception(error)
def _on_fetch_response(self, page):
"""Sets the fetch future with the next page,
resets it, and if this is the last page,
marks the result as closed.
Args:
page (_SqlPage): The next page.
"""
with self._lock:
future = self._fetch_future
self._fetch_future = None
if page.is_last:
# This is the last page, there is nothing
# more on the server.
self._closed = True
# Resolving the future before resetting self._fetch_future
# might result in an infinite loop for non-blocking iterators
future.set_result(page)
def _handle_execute_response(self, future):
"""Handles the result of the execute request, by either:
- setting it to an exception so that the dependent methods
(iterator, update_count etc.) fails immediately
- setting it to an execute response
Args:
future (Future):
"""
try:
response = future.result()
response_error = self._handle_response_error(response["error"])
if response_error:
# There is a server-side error sent to the client.
self._on_execute_error(response_error)
return
row_metadata = response["row_metadata"]
if row_metadata is not None:
# The result contains some rows, not an update count.
row_metadata = SqlRowMetadata(row_metadata)
self._on_execute_response(row_metadata, response["row_page"], response["update_count"])
except Exception as e:
# Something went bad, we couldn't get response from
# the server, invocation failed.
self._on_execute_error(self._sql_service.re_raise(e, self._connection))
@staticmethod
def _handle_response_error(error):
"""If the error is not ``None``, return it as
:class:`HazelcastSqlError` so that we can raise
it to user.
Args:
error (_SqlError): The error or ``None``.
Returns:
HazelcastSqlError: If the error is not ``None``,
``None`` otherwise.
"""
if error:
return HazelcastSqlError(error.originating_member_uuid, error.code, error.message, None)
return None
def _on_execute_error(self, error):
"""Called when the first execute request is failed.
Args:
error (HazelcastSqlError): The wrapped error that can
be raised to the user.
"""
with self._lock:
if self._closed:
# User might be already cancelled it.
return
self._execute_response.set_exception(error)
def _on_execute_response(self, row_metadata, row_page, update_count):
"""Called when the first execute request is succeeded.
Args:
row_metadata (SqlRowMetadata): The row metadata. Might be ``None``
if the response only contains the update count.
row_page (_SqlPage): The first page of the result. Might be
``None`` if the response only contains the update count.
update_count (int): The update count.
"""
with self._lock:
if self._closed:
# User might be already cancelled it.
return
if row_metadata:
# Result contains the row set for the query.
# Set the update count to -1.
response = _ExecuteResponse(row_metadata, row_page, -1)
if row_page.is_last:
# This is the last page, close the result.
self._closed = True
self._execute_response.set_result(response)
else:
# Result only contains the update count.
response = _ExecuteResponse(None, None, update_count)
self._execute_response.set_result(response)
# There is nothing more we can get from the server.
self._closed = True
def __enter__(self):
# The execute request is already sent.
# There is nothing more to do.
return self
def __exit__(self, exc_type, exc_value, traceback):
# Ignoring the possible exception details
# since we close the query regardless of that.
self.close().result()
class _InternalSqlService(object):
"""Internal SQL service that offers more public API
than the one exposed to the user.
"""
def __init__(self, connection_manager, serialization_service, invocation_service):
self._connection_manager = connection_manager
self._serialization_service = serialization_service
self._invocation_service = invocation_service
def execute(self, sql, *params):
"""Constructs a statement and executes it.
Args:
sql (str): SQL string.
*params: Query parameters.
Returns:
SqlResult: The execution result.
"""
statement = SqlStatement(sql)
for param in params:
statement.add_parameter(param)
return self.execute_statement(statement)
def execute_statement(self, statement):
"""Executes the given statement.
Args:
statement (SqlStatement): The statement to execute.
Returns:
SqlResult: The execution result.
"""
# Get a random Data member (non-lite member)
connection = self._connection_manager.get_random_connection(True)
if not connection:
# Either the client is not connected to the cluster, or
# there are no data members in the cluster.
raise HazelcastSqlError(
self.get_client_id(),
_SqlErrorCode.CONNECTION_PROBLEM,
"Client is not currently connected to the cluster.",
None,
)
try:
# Create a new, unique query id.
query_id = _SqlQueryId.from_uuid(connection.remote_uuid)
# Serialize the passed parameters.
serialized_params = [
self._serialization_service.to_data(param) for param in statement.parameters
]
request = sql_execute_codec.encode_request(
statement.sql,
serialized_params,
# to_millis expects None to produce -1
to_millis(None if statement.timeout == -1 else statement.timeout),
statement.cursor_buffer_size,
statement.schema,
statement.expected_result_type,
query_id,
)
invocation = Invocation(
request, connection=connection, response_handler=sql_execute_codec.decode_response
)
result = SqlResult(
self, connection, query_id, statement.cursor_buffer_size, invocation.future
)
self._invocation_service.invoke(invocation)
return result
except Exception as e:
raise self.re_raise(e, connection)
def deserialize_object(self, obj):
return self._serialization_service.to_object(obj)
def fetch(self, connection, query_id, cursor_buffer_size):
"""Fetches the next page of the query execution.
Args:
connection (hazelcast.connection.Connection): Connection
that the first execute request, hence the fetch request
must route to.
query_id (_SqlQueryId): Unique id of the query.
cursor_buffer_size (int): Size of cursor buffer. Same as
the one used in the first execute request.
Returns:
Future: Decoded fetch response.
"""
request = sql_fetch_codec.encode_request(query_id, cursor_buffer_size)
invocation = Invocation(
request, connection=connection, response_handler=sql_fetch_codec.decode_response
)
self._invocation_service.invoke(invocation)
return invocation.future
def get_client_id(self):
"""
Returns:
uuid.UUID: Unique client UUID.
"""
return self._connection_manager.client_uuid
def re_raise(self, error, connection):
"""Returns the error wrapped as the :class:`HazelcastSqlError`
so that it can be raised to the user.
Args:
error (Exception): The error to reraise.
connection (hazelcast.connection.Connection): Connection
that the query requests are routed to. If it is not
live, we will inform the user about the possible
cluster topology change.
Returns:
HazelcastSqlError: The reraised error.
"""
if not connection.live:
return HazelcastSqlError(
self.get_client_id(),
_SqlErrorCode.CONNECTION_PROBLEM,
"Cluster topology changed while a query was executed: Member cannot be reached: %s"
% connection.remote_address,
error,
)
if isinstance(error, HazelcastSqlError):
return error
return HazelcastSqlError(
self.get_client_id(), _SqlErrorCode.GENERIC, try_to_get_error_message(error), error
)
def close(self, connection, query_id):
"""Closes the remote query cursor.
Args:
connection (hazelcast.connection.Connection): Connection
that the first execute request, hence the close request
must route to.
query_id (_SqlQueryId): The query id to close.
Returns:
Future:
"""
request = sql_close_codec.encode_request(query_id)
invocation = Invocation(request, connection=connection)
self._invocation_service.invoke(invocation)
return invocation.future
[docs]class SqlExpectedResultType(object):
"""The expected statement result type."""
ANY = 0
"""
The statement may produce either rows or an update count.
"""
ROWS = 1
"""
The statement must produce rows. An exception is thrown is the statement produces an update count.
"""
UPDATE_COUNT = 2
"""
The statement must produce an update count. An exception is thrown is the statement produces rows.
"""
[docs]class SqlStatement(object):
"""Definition of an SQL statement.
This object is mutable. Properties are read once before the execution
is started. Changes to properties do not affect the behavior of already
running statements.
"""
TIMEOUT_NOT_SET = -1
TIMEOUT_DISABLED = 0
DEFAULT_TIMEOUT = TIMEOUT_NOT_SET
DEFAULT_CURSOR_BUFFER_SIZE = 4096
def __init__(self, sql):
self.sql = sql
self._parameters = []
self._timeout = SqlStatement.DEFAULT_TIMEOUT
self._cursor_buffer_size = SqlStatement.DEFAULT_CURSOR_BUFFER_SIZE
self._schema = None
self._expected_result_type = SqlExpectedResultType.ANY
@property
def sql(self):
"""str: The SQL string to be executed.
The setter raises:
- **AssertionError**: If the SQL parameter is not a string.
- **ValueError**: If the SQL parameter is an empty string.
"""
return self._sql
@sql.setter
def sql(self, sql):
check_true(isinstance(sql, six.string_types), "SQL must be a string")
if not sql.strip():
raise ValueError("SQL cannot be empty")
self._sql = sql
@property
def schema(self):
"""str: The schema name. The engine will try to resolve the
non-qualified object identifiers from the statement in the given
schema. If not found, the default search path will be used, which
looks for objects in the predefined schemas ``partitioned`` and
``public``.
The schema name is case sensitive. For example, ``foo`` and ``Foo``
are different schemas.
The default value is ``None`` meaning only the default search path is
used.
The setter raises:
- **AssertionError**: If the schema is not a string or ``None``.
"""
return self._schema
@schema.setter
def schema(self, schema):
check_true(
isinstance(schema, six.string_types) or schema is None,
"Schema must be a string or None",
)
self._schema = schema
@property
def parameters(self):
"""list: Sets the statement parameters.
You may define parameter placeholders in the statement with the ``?``
character. For every placeholder, a parameter value must be provided.
When the setter is called, the content of the parameters list is copied.
Subsequent changes to the original list don't change the statement parameters.
The setter raises:
- **AssertionError**: If the parameter is not a list.
"""
return self._parameters
@parameters.setter
def parameters(self, parameters):
check_true(isinstance(parameters, list), "Parameters must be a list")
self._parameters = list(parameters)
@property
def timeout(self):
"""float or int: The execution timeout in seconds.
If the timeout is reached for a running statement, it will be
cancelled forcefully.
Zero value means no timeout. :const:`TIMEOUT_NOT_SET` means that
the value from the server-side config will be used. Other negative
values are prohibited.
Defaults to :const:`TIMEOUT_NOT_SET`.
The setter raises:
- **AssertionError**: If the timeout is not an integer or float.
- **ValueError**: If the timeout is negative and not equal to
:const:`TIMEOUT_NOT_SET`.
"""
return self._timeout
@timeout.setter
def timeout(self, timeout):
check_is_number(timeout, "Timeout must be an integer or float")
if timeout < 0 and timeout != SqlStatement.TIMEOUT_NOT_SET:
raise ValueError("Timeout must be non-negative or -1, not %s" % timeout)
self._timeout = timeout
@property
def cursor_buffer_size(self):
"""int: The cursor buffer size (measured in the number of rows).
When a statement is submitted for execution, a :class:`SqlResult`
is returned as a result. When rows are ready to be consumed,
they are put into an internal buffer of the cursor. This parameter
defines the maximum number of rows in that buffer. When the threshold
is reached, the backpressure mechanism will slow down the execution,
possibly to a complete halt, to prevent out-of-memory.
Only positive values are allowed.
The default value is expected to work well for most workloads. A bigger
buffer size may give you a slight performance boost for queries with
large result sets at the cost of increased memory consumption.
Defaults to :const:`DEFAULT_CURSOR_BUFFER_SIZE`.
The setter raises:
- **AssertionError**: If the cursor buffer size is not an integer.
- **ValueError**: If the cursor buffer size is not positive.
"""
return self._cursor_buffer_size
@cursor_buffer_size.setter
def cursor_buffer_size(self, cursor_buffer_size):
check_is_int(cursor_buffer_size, "Cursor buffer size must an integer")
if cursor_buffer_size <= 0:
raise ValueError("Cursor buffer size must be positive, not %s" % cursor_buffer_size)
self._cursor_buffer_size = cursor_buffer_size
@property
def expected_result_type(self):
"""SqlExpectedResultType: The expected result type.
The setter raises:
- **TypeError**: If the expected result type does not equal to one of
the values or names of the members of the
:class:`SqlExpectedResultType`.
"""
return self._expected_result_type
@expected_result_type.setter
def expected_result_type(self, expected_result_type):
self._expected_result_type = try_to_get_enum_value(
expected_result_type, SqlExpectedResultType
)
[docs] def add_parameter(self, parameter):
"""Adds a single parameter to the end of the parameters list.
Args:
parameter: The parameter.
See Also:
:attr:`parameters`
:func:`clear_parameters`
"""
self._parameters.append(parameter)
[docs] def clear_parameters(self):
"""Clears statement parameters."""
self._parameters = []
[docs] def copy(self):
"""Creates a copy of this instance.
Returns:
SqlStatement: The new copy.
"""
copied = SqlStatement(self.sql)
copied.parameters = list(self.parameters)
copied.timeout = self.timeout
copied.cursor_buffer_size = self.cursor_buffer_size
copied.schema = self.schema
copied.expected_result_type = self.expected_result_type
return copied
def __repr__(self):
return (
"SqlStatement(schema=%s, sql=%s, parameters=%s, timeout=%s,"
" cursor_buffer_size=%s, expected_result_type=%s)"
% (
self.schema,
self.sql,
self.parameters,
self.timeout,
self.cursor_buffer_size,
self._expected_result_type,
)
)
# These are imported at the bottom of the page to get rid of the
# cyclic import errors.
from hazelcast.protocol.codec import sql_execute_codec, sql_fetch_codec, sql_close_codec