Database sharding with Apache ShardingSphere

Table of Contents

Scaling Databases with Sharding

As applications grow and the volume of data increases, a single database instance can become a bottleneck—limiting performance, scalability, and fault tolerance. Database sharding is a technique that addresses these issues by splitting a large dataset into smaller, more manageable pieces, known as shards. Each shard holds a subset of the total data and can be stored on a separate database instance or server.

Sharding improves scalability by enabling horizontal expansion; instead of upgrading a single powerful server (vertical scaling), you can distribute data across multiple nodes and scale out as needed. It also enhances performance by reducing the load on any individual database and localizing queries to smaller datasets.

Additionally, sharding can improve system availability and fault isolation. If one shard becomes unavailable, only a portion of the data is affected—minimizing the overall impact. However, sharding also introduces challenges, such as complex query routing, cross-shard joins, and data consistency management.

Implementing database sharding with Apache shardingsphere

Configuring DataSource for ShardingSphere

To begin with, we need to configure the HikariCP data source and its properties. The data source will be used for establishing database connections. The following method sets up the connection pool and initializes the datasource with necessary configurations.

1private static DataSource createDataSource(String dbName) {
2    HikariDataSource ds = new HikariDataSource();
3    ds.setDriverClassName("com.mysql.cj.jdbc.Driver"); // Use MySQL JDBC driver
4    Dotenv dotenv = Dotenv.load();
5    String JDBC_URL = dotenv.get("JDBC_URL");
6    ds.setJdbcUrl(JDBC_URL + dbName + "?serverTimezone=UTC");
7    
8    // Fetch username and password from environment variables
9    String password = dotenv.get("DB_PASS");
10    String username = dotenv.get("DB_USER");
11    ds.setUsername(username);
12    ds.setPassword(password);
13
14    // Connection Pool Settings
15    ds.setMaximumPoolSize(10); // Max number of connections in the pool
16    ds.setMinimumIdle(5); // Minimum number of idle connections
17    ds.setConnectionTimeout(30000); // Connection timeout (in ms)
18    ds.setIdleTimeout(600000); // Time after which idle connections are closed (in ms)
19    ds.setMaxLifetime(1800000); // Max connection lifetime (in ms)
20    
21    return ds;
22}

In this method, we configure the Hikari data source with parameters such as the JDBC URL, username, password, and pool settings. These settings ensure that the application can handle high concurrency and resource efficiency by reusing database connections.

1Map<String, DataSource> dataSourceMap = new HashMap<>();
2dataSourceMap.put("ds_0", createDataSource("ds_0"));
3dataSourceMap.put("ds_1", createDataSource("ds_1"));

Next, we create a map of the data sources where each data source is associated with a logical name. The names "ds_0" and "ds_1" represent different databases or database instances. This map is essential for ShardingSphere to identify and connect to the underlying databases efficiently.

After configuring the data sources, ShardingSphere will use this mapping to implement database sharding, routing requests to the appropriate database based on the sharding strategy you define.

Creating the ShardingRule Configuration

After setting up the data sources, the next step in configuring Apache ShardingSphere is to define the ShardingRule configuration. The ShardingRule governs how data is distributed across multiple databases and tables based on defined sharding strategies. This configuration determines how each table and database is sharded, enabling ShardingSphere to route queries to the appropriate physical resources.

1private static ShardingRuleConfiguration createShardingRule() {
2    ShardingRuleConfiguration shardingConfig = new ShardingRuleConfiguration();
3
4    // Defining sharding rules for the "user" table
5    ShardingTableRuleConfiguration useTableRule = new ShardingTableRuleConfiguration("user",
6            "ds_${0..1}.user_${0..1}"); // Define how "user" table is distributed across two databases and two tables
7    
8    // Database sharding strategy: Shard by "id" using modulo 2
9    useTableRule.setDatabaseShardingStrategy(new
10            StandardShardingStrategyConfiguration("id", "database_mod"));
11
12    // Table sharding strategy: Shard by "id" using modulo 2
13    useTableRule.setTableShardingStrategy(new StandardShardingStrategyConfiguration("id", "table_mod"));
14
15    // Add this table rule to the sharding configuration
16    shardingConfig.getTables().add(useTableRule);
17
18    // Configure sharding algorithm for databases
19    Properties databaseProps = new Properties();
20    databaseProps.setProperty("algorithm-expression", "ds_${ id.intdiv(4) % 2 }");
21    // This expression ensures that each group of 4 "user" records is alternated between two databases
22    shardingConfig.getShardingAlgorithms().put("database_mod", new AlgorithmConfiguration("INLINE", databaseProps));
23
24    // Configure sharding algorithm for tables
25    Properties tableProps = new Properties();
26    tableProps.setProperty("algorithm-expression", "user_${ id.intdiv(2) % 2 }");
27    // This expression ensures that each group of 2 records is placed into one of two "user" tables
28    shardingConfig.getShardingAlgorithms().put("table_mod", new AlgorithmConfiguration("INLINE", tableProps));
29
30    return shardingConfig;
31}

In the code above, we define a sharding rule for the "user" table, which will be distributed across two databases (ds_0 and ds_1) and two tables within each database (user_0 and user_1).

ShardingTableRuleConfiguration

The ShardingTableRuleConfiguration constructor, takes two parameters:

Logical Table Name: The first parameter is the logical table name (in this case, "user"), which represents the table your application interacts with. It is a logical abstraction and does not refer to any specific physical table or database.
Physical Table Rule Expression: The second parameter is a string representing the rule for mapping the logical table to multiple physical tables and databases. In this example, ds_{0..1}.user_{0..1}is the rule that defines how the logical "user" table is distributed across two databases (ds_0 and ds_1) and two tables within each database (user_0 and user_1).

The Database Sharding Strategy is defined using the StandardShardingStrategyConfiguration for the "id" column. The strategy in the example uses the modulo operation (% 2) to distribute data evenly between two databases. This is useful when you want to balance your data across multiple physical databases to improve scalability and performance.

Similarly, the Table Sharding Strategy is defined to distribute data within each database using the same "id" column, but this time using modulo 2 again to decide between two tables within a single database.

AlgorithmConfiguration

The AlgorithmConfiguration class in ShardingSphere is used to define the custom algorithm expressions that guide the sharding logic. With AlgorithmConfiguration, you can define your own sharding algorithm using expressions or scripting languages like Groovy or JavaScript. These expressions can then be evaluated dynamically at runtime to calculate the target database or table based on the values of the sharding key (e.g., "id").

For example, the INLINE algorithm used in the provided configuration allows you to define a custom inline expression (such as ds_{id % 2}) to calculate which database and table the data should be routed to. This approach provides a powerful and flexible way to implement your sharding rules.

To sum up, this configuration enables ShardingSphere to automatically route queries to the correct database and table based on the sharding logic defined in the algorithm-expression.

Applying the Sharding Rule Configuration

After creating our ShardingRuleConfiguration, we now need to configure the ShardingSphere data source. This data source allows us to integrate sharding into our application, managing multiple data sources (or shards) and enabling automatic distribution of data across them. In this section, we will create the ShardingSphere data source and test it by inserting data into our sharded tables.

The following code demonstrates how we configure ShardingSphere's data source with multiple shards and a sharding rule, and then test it by inserting some sample data into the user table.

1// Create the ShardingRuleConfiguration, which specifies how to shard the data
2ShardingRuleConfiguration shardingConfig = createShardingRule();
3
4// Create the ShardingSphere data source by passing the configuration
5DataSource shardingDataSource = ShardingSphereDataSourceFactory.createDataSource(
6    dataSourceMap, // A map of data sources (ds_0, ds_1, etc.)
7    List.of(shardingConfig), // List of sharding rule configurations
8    new Properties() // Additional properties, can be used for configurations like auto-create-table
9);
10
11// Test the sharding setup by inserting data into the sharded tables
12testSharding(shardingDataSource);

In this example:

ShardingRuleConfiguration: Defines how the data is sharded (e.g., by a specific column like id, using mod or range sharding strategies).
ShardingSphereDataSourceFactory: This factory creates a ShardingSphere data source that integrates the defined sharding rules and multiple data sources (e.g., ds_0, ds_1) for data distribution.
testSharding: A method that connects to the ShardingSphere-managed data source and tests the configuration by inserting data.
1private static void testSharding(DataSource source) {
2    try (Connection conn = source.getConnection()) {
3        // Insert test data into the sharded user table
4        insertTestData(conn);
5    } catch (SQLException e) {
6        System.out.println(e.getMessage());
7    }
8}
9
10private static void insertTestData(Connection conn) throws SQLException {
11    // Insert 10 users, with ids from 1 to 10
12    for (int i = 1; i <= 10; i++) {
13        // Prepare the SQL statement for inserting user data
14        PreparedStatement ps = conn.prepareStatement("INSERT INTO user (id, name) VALUES(?, ?)");
15        ps.setLong(1, i); // Set the user id
16        ps.setString(2, "USER_" + i); // Set the user name
17        ps.executeUpdate(); // Execute the insert statement
18        System.out.println("Inserted User_" + i);
19    }
20}
When you run this code, ShardingSphere will decide, based on the id field , whether to insert the data into user_0 or user_1, across the two configured data sources (ds_0 and ds_1).

If you have configured auto-table creation for ShardingSphere, it will automatically create the tables user_0 and user_1 in the respective databases (if they do not already exist). Otherwise, you may need to manually create these tables in your database before running this code.