venerdì 30 dicembre 2016

Introducing HerdDB a distributed JVM-embeddable database built on Apache BookKeeper

At Diennea we developed EmailSuccess, a powerfull MTA (Mail Transfer Agent), designed to deliver millions of email messages per hour to inboxes all around the world.
We have very particular requirements for the database layer of EmailSucces, we want a system which:
  • is ultra fast for writes
  • can be replicated among tens of machines. without any shared medium or SAN
  • can handle thousands of tables
  • can handle multi-table transactions
  • can handle multi-table queries
  • scale horizontally on the number of tables, simply adding macchines
  • can optionally run inside the same process of the JVM which runs the main service
  • can be controlled by using SQL language and the JDBC API
  • can support indexing of data
We have been using Apache HBase for long time for our internal business but HBase (and other Big-Data engines) do not satisfy our requirements.
So we designed a new Key-Value database which will be fast enough to handle the write load of the system and then we added an SQL Planner and a JDBC Driver.
We already have great experience of Apache BookKeeper and Apache ZooKeeper projects, as we use them to build sophisticated distributed services, for instance Majordodo (which is open source and you can find it on GitHub and Maven Central), and we decided to use BookKeeper as  write-ahead transaction log and ZookKeeper for group membership and coordination..

HerdDB overview

From the API point of view you can see HerdDB as a traditional SQL-based database, so your are going to issue CREATE TABLE, SELECT, INSERT, JOIN...statements and the system will do what you expect.
But internally it works as a Key-Value engine, accessing to data by the Primary Key is as fast a possible, both for reads and for writes. In fact the primary user of HerdDB, EmailSucess, uses it to store the state of every single email message.
On the Key-Value core we added the ability to run scans and multi-row updates, aggregate functions and so on, this way you can use it like any other SQL database.
The main unit of work for an HerdDB cluster is the tablespace, a tablespace is a group of tables. In the context of a tablespace you can run transactions, joins and subqueries which span multiple tables.
In a cluster for each tablespace a leader node is designated by the administrator (with some kind of auto-healing and auto leader reassignment in case of failure) and all the transactions on its tables are run on that node.
This system scales well by having many tablespaces and so the load can be spread among all the machines in the cluster.
Indexes are supported by using an implementation of the Block Range Index pattern (BRIN indexes), adapted to the way the HerdDB uses to store data.
The database can be accessed from outside the process by using TLS and authentication is performed using SASL with Kerberos or the simpler DIGEST-MD5 mechanism.

The write path

The most critical path for data access in HerdDB is the write path, in particular the INSERT and the UPDATE-by-PK data manipulation statements are the most important for us, together with the GET-by-PK.
The leader of the tablespace keeps in memory a data structure which holds all the PKs for a table in an hash table, and an in-memory buffer which contains all the dirty and/or recently accessed records.
When an INSERT reachs the the server the write is first logged to the log, then the map of valid PKs gets updated and the new record is stored in the memory buffer.
If an UPDATE is issued on the same PK (and this is our primary use case) the update is directly performed in memory, without hitting "data pages" disks, we only write to the log in order to achieve durability.
If a GET comes for the same PK we can read directly the "dirty" record from the buffer.
After a configurable timeout or when the system is running out of memory a checkpoint is performed and buffers are flushed to disk, creating immutable data pages, so usually all the work is in memory, writes are performed serially on the transaction log and when flushing to disk complete data pages are written, without ever modifiing existing files.
This kind of write pattern is very suitable of our use case: data files are always written or read entirely, leveraging the most of OS caches.

Replication and Apache BookKeeper

HerdDB leverages Apache BookKeeper ability to provide a distributed write ahead log, when a node is running as leader it writes each state change to BookKeeper, working as a replicated state machine.
Some features:
  • each write is guaranteed to be "durable" after the ack from BookKeeeper
  • each replica is guaranteed to read only entries for which the ack has been received from the writer (the Last-Add-Confirmed protocol)
  • each leader (the basic storage unit of BookKeeper) can be written only once
For each tablespace you can add a virtually unlimited number of replicas, each 'replica' node will 'tail' the transaction log and replay each data manipulation activity to its local copy of the data.
If a "new leader" comes in, BookKeeper will fence out the "old leader", preventing any further write to the ledger, this way the old leader will not be able to carry on its activity and change its local state: this will guarantee that every node will converge to the same consistent view of the system.
A very good explanation on how this can be done is provided in the BookKeeper tutorial.
Apache BookKeeper servers, called Bookies, can be run standalone but the preferred way is to run them inside the same JVM of the database, leveraging the ability to talk to the Bookie without passing from the network stack.

Getting started using docker

If you want to run Herd on your docker enabled laptop just run:
docker run --rm -it  -e server.port=7000 -p 7000:7000 \
             -e server.advertised.host=$(hostname) \
             --name herddb eolivelli/herd:latest
 
The above command will run HerdDB in foreground mode.
In order to launch the CLI from another terminal use:
docker exec -it herddb /bin/bash bin/herddb-cli.sh \
             -x jdbc:herddb:server:0.0.0.0:7000 -sc

Then you can issue SQL queries:
herd: SELECT * FROM systables
herd: CREATE TABLE mytable (mykey string primary key, myvalue int) 
herd: INSERT INTO  mytable values('a',1)
herd: SELECT * FROM  mytable
 

Follow the instructions at https://github.com/diennea/herddb/tree/master/herddb-docker
You can find basic documentation on GitHub Wiki

Status of the project

HerdDB is Apache2 licensed, at the time of writing this post we are still in ALPHA, but you can already start playing with HerdDB by downloading it from GitHub and building it locally.
We are going to release on Maven Central as we release the first stable version.
Feel free to report bugs on the GitHub bug tracker, to file Pull Requests to contribute with improvements or different use cases.
We will soon setup mailing lists and the official reference documentation

giovedì 17 novembre 2016

Apache Kafka Streams and JCache with BlazingCache

Developing and setting up a distributed stream processing pipeline is quite simple using Apache Kafka Streams, in fact Kafka Streams runtime automatically scales well without changes in your code.

Sometimes you have to transform your data by adding value taken from external resources, like WebServices, Databases, Paid services, which is going to slow down all of your processing pipeline.

Consider for instance the case of looking up GeoLocation information for several IP address while processing a stream of hits on a web site, for each IP address you have to call an external service and wait for a response.

In this cases cache may help and you would propably start by using the JCache (JSR107) API, anyway,  in an auto-scalable architecture you are rather going to need a distributed cache solution.

The Apache Kafka ecosystem (along with HBase, Hadoop, MapReduce...) is based on ZooKeeper, and BlazingCache can be the perfect companion for Kafka Streams applications as BlazingCache exploits ZooKeeper as coordination service. Furthermore BlazingCache provides an unique architecture for this kind of lightweight processing nodes architectures.

In BlazingCache, cached data is retained only locally to the JVM actually using it, with the platform adding the little bit of coordination needed to guarantee consistency between each cache instance. Thus, there is no need for multicast or other advanced discovery/peer-to-peer services which are ofter difficult to setup in this changing world of containers and cloud based services.

BlazingCache has been designed to work very well in environments in which data is strongly sensitive to location, where is very likely that your cached data is partitioned among processing nodes.
This is very common in multi-tenancy applications and several applications of Kafka Streams tend to partition data to different nodes, as the normal load distribution is bound to Kafka partitioning.

This simple example, contributed by Nicolò Boschi, https://github.com/nicoloboschi/kafkastreamsblazingcache is a good starting point to understand how simple it is to use a distributed JCache provider with Kafka Streams.

CachingProvider cachingProvider = Caching.getCachingProvider();
try (javax.cache.CacheManager cacheManager = cachingProvider.getCacheManager(
    cachingProvider.getDefaultURI(),
    cachingProvider.getDefaultClassLoader(),
    cacheConfiguration);) {
    try (Cache<String, String> cache
        = cacheManager.createCache("ip-location-cache", new MutableConfiguration<>());) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> startLines = builder.stream(stringSerde, stringSerde, input_topic);
        MockGeoLocationService geoLocationService = new MockGeoLocationService();
        KStream<String, LogLine> finalLines = startLines
            .mapValues((value) -> {
                return LogLine.parse(value);
            })
            .mapValues((LogLine logLine) -> {
                String ip = logLine.getIp();
                if (ip != null) {
                    String location = cache.get(ip);
                    if (location == null) {
                        location = geoLocationService.findLocation(ip);
                        cache.put(ip, location);
                    }
                    logLine.setLocation(location);
                }
                return logLine;
            });
        finalLines.to(stringSerde, logLineSerde, output_topic);
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
        Thread.sleep(5000L);
        streams.close();
    }
}

Some useful links:
- BlazingCache docs: https://blazingcache.readme.io/

lunedì 16 maggio 2016

BlazingCache 1.8.0 is out


BlazingCache,  is a very "simple" but powerful Distributed Cache, designed with the goal to coordinate many local caches in a cluster of cooperating JVMs.

At Diennea we have lots of JVMs hosting our customers’ tasks, the load of each being assigned only to a little portion of the whole cluster (look at Majordodo for instance). In this case, data locality is fundamental so that it is better to cache such data as nearest as possible to the code processing it.

BlazingCache aims at coordinating many local caches, ensuring that every change to data is propagated to every other node which holds data.

A single BlazingCache coordinator server can handle hundreds of client with a minimal setup

What's new on 1.8 ?


With 1.8.0 release BlazingCache introduces the ability to keep direct references to local Java plain objects.
 
In its initial design, BlazingCache only stored byte[] objects and the Client had always to serialize/deserialize its own model objects.
The original motivation for storing only byte[] is that this way the Client is always forced to clone cached objects, avoiding side effects due to modifications of shared objects.

The key to this new feature are the new putObject/fetchObject methods:


    try (CacheClient client1 = CacheClientBuilder
                .newBuilder()               
                .mode(CacheClientBuilder.Mode.LOCAL)
                .build()) {
            client1.start();
            client1.waitForConnection(10000);

            MyBean myBean = new MyBean();
            client1.putObject("key", myBean, 6000);
           
            MyBean myBean2 = client1.fetchObject("key");
           
            assertSame(myBean,myBean2);
     }


Check out documentation at https://blazingcache.readme.io/


mercoledì 2 marzo 2016

Majordodo - a Distributed Resource Manager built on top of Apache BookKeeper

At Diennea we offer a Software-As-A-Service platform to build applications whose primary purpose is to implement complex direct digital marketing applications, expecially for deliverying email and text messages.

One of our primary business requirements is the ability to provide access to shared resources to a lot of users but giving to every one a configurable amount of resources (multitenancy), in terms of CPU, RAM, SQL/HBase Databases and (distributed) filesystem-like storage.

Other existing projects like Apache Hadoop YARN or Apache Mesos do not provide a fine grained way to allocate resources to tenants; Majordodo has been designed to deal with thousands of users which request executions of micro tasks, it is just like having a very big distributed ThreadPool with complex resource allocation facilities, which can be reconfigured at runtime. Guaranteed Service Level can be changed at runtime even for tasks which have already been submitted to the system.

Majordodo tasks can be very simple tasks, such as sending a single email message or long running batch operations which can continue running for hours.

When a task is assigned to a specific machine (a 'Worker' in Majordodo words) the Broker will follow its execution, monitor it and eventually fail over the execution to another Worker in case of machine failure.

Majordodo has been designed to deal with Worker machines which can fail at any time, which is a fundamental aspect in elastic deployments: to this end, in Majordodo, tasks get simply resubmitted to other machines in a transparent way, according to service level configuration.

Majordodo clients submit tasks to a Broker service using a simple HTTP JSON-based API supporting transactions and the 'slots' facility.

Workers discover the actual leader Broker and keep one and only one active TCP connection to it. Broker-to-Worker and Broker-to-Broker protocol has been designed to support asynchronous messaging and one connection per Worker is enough for task state management. The networking stack scales well up to hundreds of Workers with a minimal overhead on the Broker (thanks to Netty).

Majordodo is built upon Apache BookKeeper and Apache Zookeeper, leveraging these powerful systems to implement replication and face all the usual distributed computing issues.

Majordodo and Zookeeper

Majordodo Clients use Zookeeper to discover active Brokers on the network.

On the Broker side Majordodo uses Zookeeper for many situations: it uses it directly to address leader election, to advertise the presence of services on the network and to keep metadata about BookKeeper ledgers. BookKeeper in turn uses Zookeeper for Bookie discovery and for ledger metadata storage.

Among all the Brokers one is elected as 'leader', clients can connect to any of the Brokers but only the leader can change the 'status' of the system, like accepting task submissions, and handling Workers connections.
 Zookeeper is used to manage a shared view of the list of BookKeeper ledgers. The leader Broker creates new ledgers and drops unused ledgers, keeping on Zookeeper the list of actual ledgers.
Zookeeper allows the Broker to manage this kind of metadata in a safe manner, using CAS (compare and set) operations. Upon accessing the ledger list, the Broker can issue a conditional modification operation requesting it to fail if another Broker took control.

Majordodo and BookKeeper

Apache BookKeeper is a replicated log service which allows Majordodo to implement a distributed commit log with a shared nothing architecture: no shared disk or database is needed to make all the Brokers share the same view of the global status of the system.
The basic unit of work is the Ledger which is an ordered sequence of log entries, each entry being identified by a sequence number.
BookKeeper is ideal for replicating the state of Brokers, the leader Broker has a global view of the status of the system in memory and logs every change to a Ledger.
BookKeeper is used as a write-ahead commit log, that is that every change to the status is written to the log and then it is applied to the in-memory status. The other Brokers (we name them 'followers') tail the log and apply each change to their own copy of the status.
A very good explanation on how this can be done is provided in the BookKeeper tutorial.


Another interesting feature of BookKeeper is that ledgers can only be written once, and if another clients opens the ledger for reading it can automatically 'fence' the writer so as to allow no more writes on that ledger .
In case of leadership change, for instance in case of temporary network failures, the 'old' leader Broker is not able to log entries any more and thus it cannot 'change' the global status of the system in memory.

A shared-nothing architecture

The only shared structures between Brokers are the Zookeeper filesystem and the BookKeeper ledgers, but logs cannot be retained forever, accordingly each Broker must periodically take a snapshot of its own in-memory view of the status and persist it to disk in order to recover quickly and in order to let BookKeeper release space and resources.

When a Broker boots it loads a consistent snapshot of the status of the system at a given time and then starts to replay the log from the time (ledger offset) at which the snapshot was taken. If no local snapshot is available the booting Broker discovers an active Broker in the network and downloads a valid snapshot from the network.

As in Majordodo there is no shared disk or storage service the deletion of ledgers must be coordinated in some way.
We delete old ledgers after a configurable amount of time, for instance when a ledger is not used and has been created 48 hours in the past. When a 'follower' Broker remains offline for more than 48 hours at the time of the boot it need to find another Broker on the network and download a snapshot, otherwise the boot will fail.