Motivation: Supporting high data ingestion rates and performing frequent aggregate queries
In real time analytics, a major requirement is to be able to ingest data at high rates, while at the same time compute aggregations over the real-time data. For instance, a common use case is ingesting data at high rates and computing KPIs or other metrics, over the ingested data. Examples of this use case are performance monitoring, IoT, eAdvertisement, smart grids, industry 4.0, etc. This kind of workload is troublesome for SQL databases because they are not efficient at ingesting data. Also, the aggregate analytical queries are very expensive because they need to traverse large amounts of data very frequently. NoSQL databases (see our blog post on NoSQL), in particular key-value data stores, are good at ingesting data more efficiently and handle that part of the workload. However, they are not good at analytical queries, if at all supported. As a result, it is very common to find solutions that need:
- To pre-compute aggregations at the application level to persist them later. This approach is arduous to develop and has limitations in both the durability guarantees (a crash will cause the loss of all aggregations) and its scalability (when the amount of data to aggregate requires more than one server).
- Or to combine different data management technologies to solve this use case. This yields complex architectures that require a lot of talent to be created successfully and are not only very hard to develop, but also very hard to maintain.
WHY ONLINE AGGREGATES
One differential key feature of LeanXcale is online aggregates. LeanXcale has developed a new technology based on a brand new semantic multi-version concurrency control (patent pending) that enables it to compute aggregates in an incremental and real-time manner, using aggregate tables. As data are ingested, it becomes possible to update the relevant aggregate tables, so aggregates are always pre-computed. Thus, the formerly complex developments or architectures become a basic architecture with almost costless queries, that read one or more rows of an aggregate table. Data ingestion becomes slightly more expensive but removes the cost of computing the aggregates. In the following example, we motivate the concept of online aggregates by showing that a NoSQL data store or a traditional SQL database cannot solve the problem. Then we explain how the new technology from LeanXcale succeeds at solving this use case.
USE CASE EXAMPLE: APPLICATION PERFORMANCE MONITORING (APM)
In application performance monitoring, it is very important to detect anomalies in the application behavior as soon as they happen, in order to reduce the Mean-Time-To-Repair (MTTR). Typically, the behavior is modelled by at least two metrics: number of invocations and average response time.
Since these events are asynchronous, they are aggregated frequently with a temporal criterium to simplify their management in dashboarding or to define the threshold of what is considered an anomaly. Commonly, it is also necessary to know these metrics at different aggregation levels: customer, application, application server, and endpoint (e.g., a REST API endpoint or any other user actionable unit).
In this example, we describe how to compute this aggregation hierarchy in real-time in a totally accurate and inexpensive way. We maintain an invocation table for tracking the endpoint invocations in real-time (see Table 1). Each invocation row contains the timestamp of the invocation, the response time, the endpoint, and any other information of interest that we do not detail here.
We need a reference to determine if the endpoint 251 invocation at 17:55:03 is working correctly. We use a simple model, taking as a threshold an anomalous response time two times the average response time of the previous period. In this case, in the period [17:50–17:55[ the response times recorded were 10 and 20, yielding an average of 15 ms. The response time at 17:55:03 is 121ms > 2x15ms=30 ms, so it is considered anomalous. More complex models take into consideration application, infrastructure, or temporal criteria.
In a SaaS platform, this would mean aggregating several million rows per second. Let’s think that this tool should be ready to aggregate invocations and calculate average response time for any generic endpoint.
USING AGGREGATE TABLES
One way to reduce the cost of the aggregation calculation would be to compute aggregates incrementally in aggregate tables. We could have an aggregate table per aggregation level. In our example, we have the invocation table and the threshold aggregation table. Although, as commented, in the real use case, more aggregation levels will be needed for different time periods and different aggregation levels according to the customer application hierarchy. This aggregation table (see
Then, every time an endpoint is invoked, an invocation row is added to the invocation table as part of the same transaction and the corresponding row of the EndpointAverageResponseTime aggregate table is updated. In the aggregation EndpointAverageResponseTime table, we would update the row with the associated endpoint and timestamp, updating the value by adding the new response time to the sumRespTime column and incrementing the number of invocations column by one. Table 1 depicts an example of a few rows of the table. After getting the metric: (251, 20200804 17:50:01, 10), and before getting the metric (251, 20200804 17:52:05, 20), the row for (251, 20200804, 17:50:00) in the EndpointAverageResponseTime aggregation table would be the one depicted in Figure 1.
Table 2) would have the endpointID (an integer) as key and the timestamp (day, month, year, hour, minute, second) of the start of each 5-minute interval. Then, every time an endpoint is invoked, an invocation row is added to the invocation table as part of the same transaction and the corresponding row of the EndpointAverageResponseTime aggregate table is updated. In the aggregation EndpointAverageResponseTime table, we would update the row with the associated endpoint and timestamp, updating the value by adding the new response time to the sumRespTime column and incrementing the number of invocations column by one. Table 1 depicts an example of a few rows of the table. After getting the metric: (251, 20200804 17:50:01, 10), and before getting the metric (251, 20200804 17:52:05, 20), the row for (251, 20200804, 17:50:00) in the EndpointAverageResponseTime aggregation table would be the one depicted in Figure 1.
After the metric (251, 20200804 17:52:05, 20) is inserted in the Invocation table, we would update the above row in the EndpointAverageResponseTime aggregation table by adding 20 to the sumRespTime column yielding 30 (10+20) and incrementing the invocatCount column by one; yielding 2 (1+1) that would result in the row shown in Figure 2.
PROBLEMS WHEN COMPUTING AGGREGATE TABLES WITH NOSQL DATA STORES
What happens if we implement the aggregate tables with a NoSQL data store? Let’s see. If there are two concurrent invocations over the same endpoint aggregation row (in fact, there will be thousands to millions of concurrent readings over the same endpoint), and both are allowed to proceed, one of them would be lost. This is called the “Lost Updates” anomaly [Özsu & Valduriez 2020]. To update the aggregation row for endpoint 252 and timestamp 20200804 17:50:00, we would execute a Get using (252, 20200804 17:50:00) as key and then execute a Put with (252, 20200804 17:50:00) as key with the aggregated value incremented with the response time and number of invocations. Let ‘s consider a concrete example. In the EndpointAverageResponseTime aggregation table, there is an aggregation row every five minutes for endpoint 252, say (252, 20200804 17:52:02, 13, 1). Now, two invocations happen concurrently for endpointID 252 (in fact thousands or millions of concurrent invocations, but we’ll focus on just two concurrent invocations) with response times 21 and 16 within the 5-minute interval [20200804 17:50:00, 20200804 17:55:00]. Now, we insert a new row into the invocation table for each of them: (252, 20200804 17:52:04, 21) and (252, 20200804 17:53:21, 16). For each invocation, we have to update the endpoint aggregation table. Let’s look at what happens in the EndpointAverageResponseTime aggregation table. Assume the following interleaving happens in the update of the endpoint: (252, 20200804 17:50:00) aggregation row in the EndpointAverageResponseTime aggregation table. The two updates do the Get with (252, 20200804 17:50:00) as key. Both will read the row (252, 20200804 17:50:00, 13, 1). One of them will do a Put(252, 20200804 17:50:00, 34, 2) and the other will do a Put(252, 20200804 17:50:00, 29, 2). One of them will be processed first and the other, second. Assume the first one processed is Put(252, 20200804 17:50:00, 34, 2). Now the data store will contain that row. Then, the data store processes Put(252, 20200804 17:50:00, 29, 2). What happened? The second Put erased the effect of the first Put. The reason is that the Put are blind writes, they write a value independently of what is already stored.