Every day the ingestion and processing of Near Real Time (NRT) data streams becomes more necessary. Business requirements are becoming more demanding in terms of processing times and availability of the latest data and this article aims to address this issue.
Using the AWS cloud and a serverless approach, this article will deploy an application capable of ingesting data streams and processing them in NRT, writing their result in a
LakeHouse in such a way that ACID (Atomic, Consistent, Isolated and Durable) operations can be performed on them. An architecture will be deployed in which data is ingested with Locust, processed with Flink and finally written in Hudi and JSON formats.
Locust is a Python framework to perform Load Testing in an easy and scalable way. The advantages offered by Locust are the ability to define this user behavior with a general purpose language and its ease of scalability.
Flink has become a reference framework in the field of distributed processing on data streams. It is characterized by its stream processing orientation (although it can also execute batch processes), its processing speed and its memory efficiency. There are other popular frameworks in the industry, such as Spark Streaming and Storm, the architecture section will discuss why Flink was ultimately chosen.
Finally, Hudi is a transactional file format that provides the capabilities of a database and DataWarehouse to the Data Lake. Hudi gives the ability to leave behind the concepts of batching and replace it with an incremental processing perspective. Like the other technologies used in this article, it is described in detail below.
All the code used in this article, both IaC and Python, can be found in our repository[1] on Github.
Multiple articles will use this one as a basis for discussing the following topics:
Below you can see the high-level architecture that will be deployed:
As you can see, Locust is used as a Load Testing tool to send synthetic data to our application. These will be ingested through a Kinesis Stream provisioned in On Demand mode, so the stream will scale automatically. The alternative to the On Demand mode is the Provisioned mode, where we must specify the number of shards (component in which the stream is divided), with which we want to provision the stream. The differences and particularities of these two modes will be explained in more detail in the Kinesis section.
The input stream is read by two Kinesis Analytics Flink applications. As mentioned in the next steps section, the reason to have two independent applications writing in Hudi and JSON respectively is to make a comparison in future articles in terms of efficiency. Finally the data will be hosted in S3, the AWS object storage service.
The particularity of the Kinesis Analytics Flink application is that it is serverless, that is, it abstracts the developer from the complexity of configuring and deploying a Flink cluster. This application must be assigned KPUs or Kinesis Processing Units and a jar with the Flink library and the necessary connectors to be able to deploy it correctly. All these concepts will be explained in the following sections.
The alternative to this serverless perspective with a managed service on AWS is the complete administration of the application by the developer, who can use tools such as Kubernetes or EKS (Kubernetes managed on AWS) to deploy this Flink application in a cluster. The advantages of this second alternative would be to be able to configure both the cluster (number of nodes, memory, CPU, hard disk, etc…) and the Flink application (disaster recovery management, metadata management, etc…) with a much greater degree of detail. In this article, the first alternative was chosen because of its simplicity and ease of use when learning about the Flink framework.
The first piece in the data ingestion pipeline is the Locust component written in Python. Unlike other frameworks available on the market such as JMeter, Locust gives us the ability to write simple code with Python instead of using a domain-specific language or user interface.
In addition, Locust is event-driven and uses greenlet[2], which gives it the ability to manage the capacity of several thousand users with a single processor thread. For example, in the case of JMeter, one thread is needed for each user, which poses a scalability problem for cases where a high number of users are needed.
Locust has several possibilities when it comes to running and scaling, being able to run locally for less data-intensive applications or deploy to a Kubernetes cluster by creating a Docker image from Locust code.
As for clients and systems to send data to, Locust provides a built-in HTTP client. In case you want to send events to other systems, like the one in this article, you can always write a custom client thanks to the advantage of being a Python framework.
In addition, Locust also provides a web interface so that you can check the progress of your data submission in real time. For all these reasons it has been decided to use this technology in this article.
For data ingestion, Kinesis Data Streams, a fully managed and serverless data streaming service offered by AWS, will be used. A Kinesis Stream consists of a logical grouping of shards, which represent the fundamental unit of capacity of a stream and are processed in parallel. Each shard provides the stream with 1 MB/s or 1,000 events per second write and 2 MB/s read. The events will be distributed among the stream shards according to their partitioning key, so it is important that the partitioning is homogeneous to avoid a bias in the distribution and occurrence of hot shards. There are two modes of capacity provisioning:
For simplicity, and because it is suitable for our use case, we will opt for the On Demand mode. This will automatically accommodate the number of shards to the amount of data generated by our Locust application.
To read and process the data ingested through Kinesis Data Streams, another service of the Kinesis family, Kinesis Data Analytics (KDA), will be used. This service is offered in two flavors:
A Flink application consists of a series of parallel processing tasks, also known as operators, which are connected in a Directed Acyclic Graph (DAG). The data stream is processed by this DAG, with each operator performing a specific operation on the data.
KDA allocates computing power for our application in the form of Kinesis Processing (KPUs), each equivalent to 1 vCPU and 4GB of RAM. The number of KPUs for the application is determined by specifying two parameters:
The total number of KPUs of the application is given by Parallelism / ParallelismPerKPU. It is possible to deploy this service with automatic autoscaling, which will automatically adjust the number of KPUs based on CPU consumption to accommodate demand.
The costs[3] of Amazon Kinesis Analytics are based on a pay-per-use model, based on the Kinesis Processing Units consumed. In addition, a cost is assumed for the storage used by the application and its backups.
Delving deeper into the Flink application, one of the most important features is the ability to be resilient to failures. To this end, Flink incorporates a checkpointing system whereby a snapshot of the application and its state is taken and stored in remote storage in case the application needs to be recovered.
The checkpointing process of a Flink application is designed to be resilient and efficient. Flink can make use of different backends to store the state of the application. The simplest would be the Java Virtual Machine’s own memory, and while this offers low latency and simpler management, scaling and capacity issues can quickly arise that make it undesirable for production environments. This is why it is common to use RocksDB as a backend for Flink, a high-performance, scalable and fault-tolerant key-value database. Additionally KDA stores these snapshots in S3 for an extra layer of durability.
For the purpose of this blog, a simple application has been developed for real-time data ingestion and subsequent saving to S3. Flink offers two APIs through which you can develop an application:
For this use case the Table API will be used for its simplicity, but it is equally compatible with the use of the DataStream API.
Deploying the application with Kinesis Data Analytics requires only to define the entry point of the application code and provide an uber jar with all the application dependencies. It is fitting to explain the dependencies used for this application, as it is usually one of the major pain points when developing a Flink application:
The application is prepared to write data both in JSON format and in Hudi MoR or CoW tables (which will be explained in detail in the next section). Both the application code and the infrastructure are available in the repository.
Hudi is presented as a source of Open Source storage at the data format level. Like other solutions such as Iceberg or Delta Lake, it offers some of their existing properties such as ACID (Atomicity, Consistency, Isolation and Durability) transaction support, processes focused on optimizing read/write tasks, incremental data updates and others that will be explained below. It is important to highlight that these could not be achieved by means of Avro and Parquet format files.
Hudi’s features are as follows:
When analyzing how Hudi proceeds to store the ingested events, these are grouped by partitions and these in turn are grouped into groups of files. The latter are assigned a unique file_id for each group in which the base file is found, in parquet format, which arises after an action, either a commit or compaction, and the log file which is where all the updates are registered (event version tracking).
Hudi offers 2 types of tables depending on the business need, this has an impact in terms of performance and limitation of certain functionalities as we will see in more detail:
A storage system whereby the tasks of updating, deleting or recording new data are performed directly on the log file (delta file) and a new snapshot is created that includes a complete copy of the updated dataset, including a new version of the base file and a delta file containing the changes made in that operation.
It is not until data compacting (scheduled or upon reaching a defined data size) that the delta files are combined with the most recent version of the complete dataset, creating a new complete file where the delta files that are no longer needed are removed and the index file is updated so that it can access the data in the compacted file.
This storage system is especially recommended for use cases where read tasks are more frequent than write tasks as it does not require additional data transformations when reading data.
The Timeline of the main files is shown below when the various writing tasks are performed:
task | NEW Base File | Delta File | Index File | Snapshot |
---|---|---|---|---|
In this case, separate delta files are not used as in the Copy-on-Write (COW) model. Instead, changes are written directly to the existing data files (base files). In tasks where record updates are performed, these new records are added to the base file, and in the case of deletions, these are marked as such in the base file, in both cases these changes are recorded in the index file, until compaction is performed. It is in this operation that all updates are applied to the records in the corresponding base file and deletes the previous versions of the updated records.
This alternative is specialized in performing queries of versioned historical data and NRT transformations and analysis of large volumes, since it is possible to do so without having to copy the data to another location on disk. In addition to being optimal for use cases where write tasks are concurrent as it is more efficient since it is not necessary to perform additional data transformations during the write, although it has a lower tolerance to failure since in case the log file is corrupted it can generate loss of data versions.
The Timeline of the main files is shown below when the various writing tasks are performed:
Task | NEW Base File | Delta File | Index File | Snapshot |
---|---|---|---|---|
As a summary, a comparison of the main performance metrics between Copy on-Write and Merge on-Read is made:
COW | MOR | |
---|---|---|
Higher | Lower | |
Higher | Lower | |
Lower | Slower before compaction Igual tras compactación |
Hudi not only offers different forms of storage, but also different ways of querying the stored information, again depending on both the business cases and the type of storage chosen:
In this article we have described how to deploy an application that ingests events in real time and forms a LakeHouse with a serverless architecture. With this we have sought an intermediate level of abstraction so that it is a simple application but with enough power to be able to be used in real production environments.
Deploying applications based on the combination of technologies such as Apache Flink and Hudi provides the ability to process large volumes of data in real time and in a scalable manner. This, combined with the guarantee provided by ACID transactions, makes the combination of Apache Flink and Apache Hudi a solid solution for data ingestion and processing in critical environments.
In spite of all the advantages described above, it is worth mentioning some drawbacks that have been detected in the development of this architecture. The biggest problem encountered has been the resolution of dependencies between Flink libraries and the necessary connectors, such as Hudi. The lack of community that exists today, although this will grow over time, was a considerable initial problem to be able to form the final package with all the necessary dependencies without conflicts between them. In addition, it is worth noting that less community has been perceived for the Python language than for Java or Scala. In this article Python was chosen as there was a stronger internal knowledge but in the case that the technology stack is closer to languages supported by the JVM (Java Virtual Machine) it would be advisable to use Scala or Java.
In the next articles we will go into more detail on the particularities that both Hudi and Flink have in order to customize and adjust the behavior of this application depending on the needs of our use case.
AWS Cloud Engineer
Empecé mi carrera laboral con el desarrollo, mantenimiento y administración de bases de datos multidimensionales y Data Lakes. A partir de ahí comencé a estar interesado en plataformas de datos y arquitecturas cloud, estando certificado 3 veces en AWS y 2 con Hashicorp.
Actualmente me encuentro trabajando como un Cloud Engineer desarrollando Data Lakes y DataWarehouses con AWS para un cliente relacionado con la organización de eventos deportivos a nivel mundial.
AWS Cloud Engineer
Comencé mi carrera como Data Scientist en distintos sectores (banca, consultoría,…) enfocado en la automatización de procesos y desarrollo de modelos. En los últimos años aposté por Bluetab motivado por el interés en especializarme como Data Engineer y comenzar a trabajar con los principales proveedores Cloud (AWS, GPC y Azure) en clientes como Olympics, específicamente en la optimización del procesamiento y almacenamiento del dato.
Colaborando activamente con el grupo de Práctica Cloud en investigaciones y desarrollo de blogs de tecnologías punteras e innovadoras tales como esta, fomentando así el continuo aprendizaje.
AWS Cloud Engineer
Dedicado al aprendizaje constante de nuevas tecnologías y su aplicación, disfrutando de utilizarlas en la resolución de desafíos tecnológicos. Desarrollo mi carrera como Cloud Engineer diseñando, implementando y manteniendo infraestructura en AWS.
Colaboro activamente en la Práctica Cloud, donde investigamos y experimentamos con nuevas tecnologías, buscando soluciones para los retos que enfrentan nuestros clientes.
Patron
Sponsor
© 2024 Bluetab Solutions Group, SL. All rights reserved.