In-memory distributed computing platform for real-time stream processing and data storage with SQL capabilities
—
Hazelcast provides a distributed SQL engine that allows you to query data across the cluster using standard SQL syntax. The SQL service supports querying maps, accessing streaming data, and performing distributed joins and aggregations.
The main interface for executing SQL operations in Hazelcast.
import com.hazelcast.sql.SqlService;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlStatement;
public interface SqlService {
// Execute SQL with inline parameters
SqlResult execute(String sql, Object... arguments);
// Execute SQL statement object
SqlResult execute(SqlStatement statement);
// Execute DML statements (INSERT, UPDATE, DELETE)
long executeUpdate(String sql, Object... arguments);
}HazelcastInstance hz = Hazelcast.newHazelcastInstance();
SqlService sql = hz.getSql();Represents a prepared SQL statement with parameters and execution options.
import com.hazelcast.sql.SqlStatement;
import com.hazelcast.sql.SqlExpectedResultType;
import java.util.concurrent.TimeUnit;
public class SqlStatement {
// Statement creation
public static SqlStatement of(String sql);
public static SqlStatement of(String sql, Object... parameters);
// Configuration methods
public SqlStatement setSql(String sql);
public String getSql();
public SqlStatement setParameters(List<?> parameters);
public SqlStatement addParameter(Object parameter);
public List<Object> getParameters();
public SqlStatement setTimeoutMillis(long timeoutMillis);
public long getTimeoutMillis();
public SqlStatement setCursorBufferSize(int cursorBufferSize);
public int getCursorBufferSize();
public SqlStatement setExpectedResultType(SqlExpectedResultType expectedResultType);
public SqlExpectedResultType getExpectedResultType();
public SqlStatement setSchema(String schema);
public String getSchema();
}import com.hazelcast.sql.SqlExpectedResultType;
public enum SqlExpectedResultType {
ANY, // Any result type is acceptable
ROWS, // Expect query results (SELECT)
UPDATE_COUNT // Expect update count (INSERT, UPDATE, DELETE)
}// Simple query execution
SqlResult result = sql.execute("SELECT name, age FROM employees WHERE age > ?", 25);
// Using SqlStatement
SqlStatement statement = SqlStatement.of("SELECT * FROM products WHERE category = ? AND price > ?")
.addParameter("electronics")
.addParameter(100.0)
.setTimeoutMillis(30000);
SqlResult result = sql.execute(statement);
// DML operations
long updateCount = sql.executeUpdate("UPDATE employees SET salary = salary * 1.1 WHERE department = ?", "Engineering");
System.out.println("Updated " + updateCount + " employees");Represents the result of a SQL query execution.
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.sql.SqlRowMetadata;
import java.util.Iterator;
public interface SqlResult extends Iterable<SqlRow>, AutoCloseable {
// Metadata
SqlRowMetadata getRowMetadata();
// Row iteration
Iterator<SqlRow> iterator();
// Update count for DML operations
long updateCount();
boolean isUpdateCountValid();
// Resource management
void close();
}Represents a single row in the result set.
import com.hazelcast.sql.SqlRow;
public interface SqlRow {
// Get by column index
<T> T getObject(int columnIndex);
// Get by column name
<T> T getObject(String columnName);
// Metadata
SqlRowMetadata getMetadata();
}Provides metadata about the result set structure.
import com.hazelcast.sql.SqlRowMetadata;
import com.hazelcast.sql.SqlColumnMetadata;
import java.util.List;
public interface SqlRowMetadata {
// Column information
int getColumnCount();
List<SqlColumnMetadata> getColumns();
SqlColumnMetadata getColumn(int index);
// Column lookup
int findColumn(String columnName);
}Provides metadata about individual columns.
import com.hazelcast.sql.SqlColumnMetadata;
import com.hazelcast.sql.SqlColumnType;
public interface SqlColumnMetadata {
String getName();
SqlColumnType getType();
boolean isNullableUnknown();
}import com.hazelcast.sql.SqlColumnType;
public enum SqlColumnType {
VARCHAR,
BOOLEAN,
TINYINT,
SMALLINT,
INTEGER,
BIGINT,
DECIMAL,
REAL,
DOUBLE,
DATE,
TIME,
TIMESTAMP,
TIMESTAMP_WITH_TIME_ZONE,
OBJECT,
NULL,
JSON
}// SELECT queries
SqlResult result = sql.execute("SELECT * FROM employees");
// Process results
for (SqlRow row : result) {
String name = row.getObject("name");
Integer age = row.getObject("age");
String department = row.getObject("department");
System.out.println(name + ", " + age + ", " + department);
}
// Always close results
result.close();
// Using try-with-resources
try (SqlResult result = sql.execute("SELECT COUNT(*) as employee_count FROM employees")) {
for (SqlRow row : result) {
Long count = row.getObject("employee_count");
System.out.println("Total employees: " + count);
}
}// Parameterized queries prevent SQL injection
String department = "Engineering";
int minAge = 25;
try (SqlResult result = sql.execute(
"SELECT name, age, salary FROM employees WHERE department = ? AND age >= ?",
department, minAge)) {
for (SqlRow row : result) {
System.out.println(String.format("%s (%d): $%.2f",
row.getObject("name"),
row.<Integer>getObject("age"),
row.<Double>getObject("salary")));
}
}// Aggregation with grouping
try (SqlResult result = sql.execute(
"SELECT department, COUNT(*) as emp_count, AVG(salary) as avg_salary " +
"FROM employees " +
"GROUP BY department " +
"HAVING COUNT(*) > ?"
, 5)) {
System.out.println("Department Statistics:");
for (SqlRow row : result) {
String dept = row.getObject("department");
Long count = row.getObject("emp_count");
Double avgSalary = row.getObject("avg_salary");
System.out.println(String.format("%s: %d employees, avg salary: $%.2f",
dept, count, avgSalary));
}
}// Window functions for ranking
try (SqlResult result = sql.execute(
"SELECT name, department, salary, " +
" RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank, " +
" ROW_NUMBER() OVER (ORDER BY salary DESC) as overall_rank " +
"FROM employees")) {
for (SqlRow row : result) {
System.out.println(String.format("%s (%s): Salary $%.2f, Dept Rank: %d, Overall Rank: %d",
row.<String>getObject("name"),
row.<String>getObject("department"),
row.<Double>getObject("salary"),
row.<Long>getObject("dept_rank"),
row.<Long>getObject("overall_rank")));
}
}// Insert single record
long insertCount = sql.executeUpdate(
"INSERT INTO employees (id, name, age, department, salary) VALUES (?, ?, ?, ?, ?)",
1001, "Alice Johnson", 28, "Engineering", 75000.0);
System.out.println("Inserted " + insertCount + " record(s)");
// Insert multiple records with batch
SqlStatement insertStatement = SqlStatement.of(
"INSERT INTO employees (id, name, age, department, salary) VALUES (?, ?, ?, ?, ?)");
// Note: Batch operations would typically be done in a loop or with bulk insert
long count1 = sql.executeUpdate(insertStatement.getSql(), 1002, "Bob Smith", 32, "Marketing", 68000.0);
long count2 = sql.executeUpdate(insertStatement.getSql(), 1003, "Carol Davis", 29, "Engineering", 72000.0);// Update with conditions
long updateCount = sql.executeUpdate(
"UPDATE employees SET salary = salary * ? WHERE department = ? AND age < ?",
1.15, "Engineering", 30);
System.out.println("Updated " + updateCount + " employees");
// Conditional update with CASE
sql.executeUpdate(
"UPDATE employees SET bonus = " +
"CASE " +
" WHEN salary > 80000 THEN salary * 0.15 " +
" WHEN salary > 60000 THEN salary * 0.10 " +
" ELSE salary * 0.05 " +
"END " +
"WHERE department = ?", "Sales");// Delete with conditions
long deleteCount = sql.executeUpdate(
"DELETE FROM employees WHERE age > ? AND department = ?",
65, "Retired");
System.out.println("Deleted " + deleteCount + " employees");
// Delete with subquery
sql.executeUpdate(
"DELETE FROM employees WHERE id IN " +
"(SELECT id FROM employees WHERE salary < (SELECT AVG(salary) * 0.5 FROM employees))");// Setup: Assume we have employees and departments maps
// employees: {id, name, dept_id, salary}
// departments: {id, name, budget}
// Inner join
try (SqlResult result = sql.execute(
"SELECT e.name as employee_name, d.name as department_name, e.salary " +
"FROM employees e " +
"INNER JOIN departments d ON e.dept_id = d.id " +
"WHERE e.salary > ?", 70000)) {
for (SqlRow row : result) {
System.out.println(String.format("%s works in %s, earns $%.2f",
row.<String>getObject("employee_name"),
row.<String>getObject("department_name"),
row.<Double>getObject("salary")));
}
}
// Left join with aggregation
try (SqlResult result = sql.execute(
"SELECT d.name as department, COUNT(e.id) as employee_count, " +
" COALESCE(AVG(e.salary), 0) as avg_salary " +
"FROM departments d " +
"LEFT JOIN employees e ON d.id = e.dept_id " +
"GROUP BY d.name " +
"ORDER BY employee_count DESC")) {
for (SqlRow row : result) {
System.out.println(String.format("%s: %d employees, avg salary: $%.2f",
row.<String>getObject("department"),
row.<Long>getObject("employee_count"),
row.<Double>getObject("avg_salary")));
}
}// Query streaming data from map journals or topics
// Note: This requires appropriate configuration for streaming
try (SqlResult result = sql.execute(
"SELECT * FROM TABLE(IMPOSE_ORDER(" +
" TABLE(stream_from_map_journal('events', 'event_timestamp')), " +
" DESCRIPTOR(event_timestamp), " +
" INTERVAL '1' SECOND" +
"))")) {
for (SqlRow row : result) {
// Process streaming data
System.out.println("Event: " + row.getObject("event_data"));
}
}// Working with JSON data
try (SqlResult result = sql.execute(
"SELECT " +
" JSON_EXTRACT(user_data, '$.name') as name, " +
" JSON_EXTRACT(user_data, '$.preferences.theme') as theme " +
"FROM user_profiles " +
"WHERE JSON_EXTRACT(user_data, '$.active') = true")) {
for (SqlRow row : result) {
String name = row.getObject("name");
String theme = row.getObject("theme");
System.out.println("User " + name + " prefers " + theme + " theme");
}
}import com.hazelcast.sql.HazelcastSqlException;
public class HazelcastSqlException extends HazelcastException {
public int getCode();
public String getMessage();
public String getSuggestion();
public Member getOriginatingMember();
}try {
SqlResult result = sql.execute("SELECT * FROM non_existent_table");
// Process results...
} catch (HazelcastSqlException e) {
System.err.println("SQL Error " + e.getCode() + ": " + e.getMessage());
if (e.getSuggestion() != null) {
System.err.println("Suggestion: " + e.getSuggestion());
}
// Handle specific error codes
switch (e.getCode()) {
case 1000: // Generic error
// Handle generic error
break;
case 2000: // Parsing error
System.err.println("SQL syntax error");
break;
case 3000: // Data conversion error
System.err.println("Data type conversion failed");
break;
default:
System.err.println("Unhandled SQL error");
}
} catch (Exception e) {
System.err.println("Unexpected error: " + e.getMessage());
}// Use indexes for better performance
// This should be done during map configuration, not via SQL
// But you can check index usage in execution plans
// Parameterized queries for prepared statement benefits
SqlStatement statement = SqlStatement.of("SELECT * FROM employees WHERE department = ?")
.setCursorBufferSize(1000) // Optimize for large result sets
.setTimeoutMillis(30000); // Set appropriate timeout
// Process in batches for large results
try (SqlResult result = sql.execute(statement.addParameter("Engineering"))) {
int batchSize = 0;
for (SqlRow row : result) {
// Process row
batchSize++;
// Process in batches
if (batchSize % 1000 == 0) {
System.out.println("Processed " + batchSize + " rows");
}
}
}// Always close SqlResult to free resources
public List<Employee> getEmployees(String department) {
List<Employee> employees = new ArrayList<>();
try (SqlResult result = sql.execute(
"SELECT id, name, salary FROM employees WHERE department = ?", department)) {
for (SqlRow row : result) {
employees.add(new Employee(
row.<Integer>getObject("id"),
row.<String>getObject("name"),
row.<Double>getObject("salary")
));
}
}
return employees;
}
// For streaming queries, handle properly
public void processStreamingData() {
SqlStatement streamingQuery = SqlStatement.of(
"SELECT * FROM events_stream WHERE event_type = ?")
.addParameter("user_action")
.setCursorBufferSize(100); // Smaller buffer for streaming
try (SqlResult result = sql.execute(streamingQuery)) {
for (SqlRow row : result) {
// Process streaming event
processEvent(row);
// Add break condition for long-running streams
if (shouldStop()) {
break;
}
}
}
}// Configure maps for optimal SQL performance
Config config = new Config();
MapConfig mapConfig = new MapConfig("employees");
// Add indexes for commonly queried fields
mapConfig.addIndexConfig(new IndexConfig(IndexType.SORTED, "age"));
mapConfig.addIndexConfig(new IndexConfig(IndexType.HASH, "department"));
mapConfig.addIndexConfig(new IndexConfig(IndexType.BITMAP, "active"));
config.addMapConfig(mapConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);public List<Employee> getEmployeesPage(int offset, int limit) {
List<Employee> employees = new ArrayList<>();
try (SqlResult result = sql.execute(
"SELECT * FROM employees ORDER BY name LIMIT ? OFFSET ?",
limit, offset)) {
for (SqlRow row : result) {
employees.add(mapRowToEmployee(row));
}
}
return employees;
}public boolean employeeExists(int employeeId) {
try (SqlResult result = sql.execute(
"SELECT 1 FROM employees WHERE id = ? LIMIT 1", employeeId)) {
return result.iterator().hasNext();
}
}public DepartmentStats getDepartmentStats(String department) {
try (SqlResult result = sql.execute(
"SELECT " +
" COUNT(*) as total_count, " +
" COUNT(CASE WHEN age < 30 THEN 1 END) as young_count, " +
" COUNT(CASE WHEN salary > 80000 THEN 1 END) as high_earners, " +
" AVG(salary) as avg_salary, " +
" MIN(salary) as min_salary, " +
" MAX(salary) as max_salary " +
"FROM employees WHERE department = ?", department)) {
if (result.iterator().hasNext()) {
SqlRow row = result.iterator().next();
return new DepartmentStats(
row.<Long>getObject("total_count"),
row.<Long>getObject("young_count"),
row.<Long>getObject("high_earners"),
row.<Double>getObject("avg_salary"),
row.<Double>getObject("min_salary"),
row.<Double>getObject("max_salary")
);
}
}
return null;
}Install with Tessl CLI
npx tessl i tessl/maven-com-hazelcast--hazelcast