Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
How to Run Apache Spark on Kubernetes in Less Than 5 Minutes
Essential Guidelines for Building Optimized ETL Data Pipelines in the Cloud With Azure Data Factory
The evolution of data management architectures from warehouses to lakes and now to lakehouses represents a significant shift in how businesses handle large datasets. The data lakehouse model combines the best of both worlds, offering the cost-effectiveness and flexibility of data lakes with the robust functionality of data warehouses. This is achieved through innovative table formats that provide a metadata layer, enabling more intelligent interaction between storage and compute resources. How Did We Get to Open Table Formats? Hive: The Original Table Format Running analytics on Hadoop data lakes initially required complex Java jobs using the MapReduce framework, which was not user-friendly for many analysts. To address this, Facebook developed Hive in 2009, allowing users to write SQL instead of MapReduce jobs. Hive converts SQL statements into executable MapReduce jobs. It introduced the Hive table format and Hive Metastore to track tables. A table is defined as all files within a specified directory (or prefixes for object storage), with partitions as subdirectories. The Hive Metastore tracks these directory paths, enabling query engines to locate the relevant data. Benefits of the Hive Table Format Efficient queries: Techniques like partitioning and bucketing enabled faster queries by avoiding full table scans.File format agnostic: Supported various file formats (e.g., Apache Parquet, Avro, CSV/TSV) without requiring data transformation.Atomic changes: Allowed atomic changes to individual partitions via the Hive Metastore.Standardization: Became the de facto standard, compatible with most data tools. Limitations of the Hive Table Format Inefficient file-level changes: No mechanism for atomic file swaps, only partition-level updates.Lack of multi-partition transactions: No support for atomic updates across multiple partitions, leading to potential data inconsistencies.Concurrent updates: Limited support for concurrent updates, especially with non-Hive tools.Slow query performance: Time-consuming file and directory listings slowed down queries.Partitioning challenges: Derived partition columns could lead to full table scans if not properly filtered.Inconsistent table statistics: Asynchronous jobs often result in outdated or unavailable table statistics, hindering query optimization.Object storage throttling: Performance issues with large numbers of files in a single partition due to object storage throttling. As datasets and use cases grew, these limitations highlighted the need for newer table formats. Modern table formats offer key improvements over the Hive table format: ACID transactions: Ensure transactions are fully completed or canceled, unlike legacy formats.Concurrent writers: Safely handle multiple writers, maintaining data consistency.Enhanced statistics: Provide better table statistics and metadata, enabling more efficient query planning and reduced file scanning. With that context, this document explores the popular Open Table Format: Apache Iceberg. What Is Apache Iceberg? Apache Iceberg is a table format created in 2017 by Netflix to address performance and consistency issues with the Hive table format. It became open source in 2018 and is now supported by many organizations, including Apple, AWS, and LinkedIn. Netflix identified that tracking tables as directories limited consistency and concurrency. They developed Iceberg with goals of: Consistency: Ensuring atomic updates across partitions.Performance: Reducing query planning time by avoiding excessive file listings.Ease of use: Providing intuitive partitioning without requiring knowledge of physical table structure.Evolvability: Allowing safe schema and partitioning updates without rewriting the entire table.Scalability: Supporting petabyte-scale data. Iceberg defines tables as a canonical list of files, not directories, and includes support libraries for integration with compute engines like Apache Spark and Apache Flink. Metadata Tree Components in Apache Iceberg Manifest file: Lists data files with their locations and key metadata for efficient execution plans.Manifest list: Defines a table snapshot as a list of manifest files with statistics for efficient execution plans.Metadata file: Defines the table’s structure, including schema, partitioning, and snapshots.Catalog: Tracks the table location, mapping table names to the latest metadata file, similar to the Hive Metastore. Various tools, including the Hive Metastore, can serve as a catalog. Key Features ACID Transactions Apache Iceberg uses optimistic concurrency control to ensure ACID guarantees, even with multiple readers and writers. This approach assumes transactions won’t conflict and checks for conflicts only when necessary, minimizing locking and improving performance. Transactions either commit fully or fail, with no partial states. Concurrency guarantees are managed by the catalog, which has built-in ACID guarantees, ensuring atomic transactions and data correctness. Without this, conflicting updates from different systems could lead to data loss. A pessimistic concurrency model, which uses locks to prevent conflicts, may be added in the future. Partition Evolution Before Apache Iceberg, changing a table’s partitioning often required rewriting the entire table, which was costly at scale. Alternatively, sticking with the existing partitioning sacrificed performance improvements. With Apache Iceberg, you can update the table’s partitioning without rewriting the data. Since partitioning is metadata-driven, changes are quick and inexpensive. For example, a table initially partitioned by month can evolve to day partitions, with new data written in day partitions and queries planned accordingly. Hidden Partitioning Users often don’t know or need to know how a table is physically partitioned. For example, querying by a timestamp field might seem intuitive, but if the table is partitioned by event_year, event_month, and event_day, it can lead to a full table scan. Apache Iceberg solves this by allowing partitioning based on a column and an optional transform (e.g., bucket, truncate, year, month, day, hour). This eliminates the need for extra partitioning columns, making queries more intuitive and efficient. In the figure below, assuming the table uses day partitioning, the query would result in a full table scan in Hive due to a separate “day” column for partitioning. In Iceberg, the metadata tracks the partitioning as “the transformed value of CURRENT_DATE,” allowing the query to use the partitioning when filtering by CURRENT_DATE. Time Travel Apache Iceberg offers immutable snapshots, enabling queries on the table’s historical state, known as time travel. This is useful for tasks like end-of-quarter reporting or reproducing ML model outputs at a specific point in time, without duplicating data. Version Rollback Iceberg’s snapshot isolation allows querying data as it is and reverting the table to any previous snapshot, making it easy to undo mistakes. Schema Evolution Iceberg supports robust schema evolution, enabling changes like adding/removing columns, renaming columns, or changing data types (e.g., updating an int column to a long column). Adoption One of the best things about Iceberg is its vast adoption by many different engines. In the diagram below, you can see many different technologies can work with the same set of data as long as they use the open-source Iceberg API. As you can see, the popularity and work that each engine has done is a great indicator of the popularity and usefulness that this exciting technology brings. Conclusion This post covered the evolution of data management towards data lakehouses, the key issues addressed by open table formats, and an introduction to the high-level architecture of Apache Iceberg, a leading open table format.
Efficient data processing is crucial for businesses and organizations that rely on big data analytics to make informed decisions. One key factor that significantly affects the performance of data processing is the storage format of the data. This article explores the impact of different storage formats, specifically Parquet, Avro, and ORC on query performance and costs in big data environments on Google Cloud Platform (GCP). This article provides benchmarks, discusses cost implications, and offers recommendations on selecting the appropriate format based on specific use cases. Introduction to Storage Formats in Big Data Data storage formats are the backbone of any big data processing environment. They define how data is stored, read, and written directly impacting storage efficiency, query performance, and data retrieval speeds. In the big data ecosystem, columnar formats like Parquet and ORC and row-based formats like Avro are widely used due to their optimized performance for specific types of queries and processing tasks. Parquet: Parquet is a columnar storage format optimized for read-heavy operations and analytics. It is highly efficient in terms of compression and encoding, making it ideal for scenarios where read performance and storage efficiency are prioritized.Avro: Avro is a row-based storage format designed for data serialization. It is known for its schema evolution capabilities and is often used for write-heavy operations where data needs to be serialized and deserialized quickly.ORC (Optimized Row Columnar): ORC is a columnar storage format similar to Parquet but optimized for both read and write operations, ORC is highly efficient in terms of compression, which reduces storage costs and speeds up data retrieval. Research Objective The primary objective of this research is to assess how different storage formats (Parquet, Avro, ORC) affect query performance and costs in big data environments. This article aims to provide benchmarks based on various query types and data volumes to help data engineers and architects choose the most suitable format for their specific use cases. Experimental Setup To conduct this research, we used a standardized setup on Google Cloud Platform (GCP) with Google Cloud Storage as the data repository and Google Cloud Dataproc for running Hive and Spark-SQL jobs. The data used in the experiments was a mix of structured and semi-structured datasets to mimic real-world scenarios. Key Components Google Cloud Storage: Used to store the datasets in different formats (Parquet, Avro, ORC)Google Cloud Dataproc: A managed Apache Hadoop and Apache Spark service used to run Hive and Spark-SQL jobs.Datasets: Three datasets of varying sizes (10GB, 50GB, 100GB) with mixed data types. Python # Initialize PySpark and set up Google Cloud Storage as file system from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigDataQueryPerformance") \ .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5") \ .getOrCreate() # Configure the access to Google Cloud Storage spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") spark.conf.set("fs.gs.auth.service.account.enable", "true") spark.conf.set("google.cloud.auth.service.account.json.keyfile", "/path/to/your-service-account-file.json") Test Queries Simple SELECT queries: Basic retrieval of all columns from a tableFilter queries: SELECT queries with WHERE clauses to filter specific rowsAggregation queries: Queries involving GROUP BY and aggregate functions like SUM, AVG, etc..Join queries: Queries joining two or more tables on a common key Results and Analysis 1. Simple SELECT Queries Parquet: It performed exceptionally well due to its columnar storage format, which allowed for fast scanning of specific columns. Parquet files are highly compressed, reducing the amount of data read from disk, which resulted in faster query execution times. Python # Simple SELECT query on Parquet file parquet_df.select("column1", "column2").show() Avro: Avro performed moderately well. Being a row-based format, Avro required reading the entire row, even when only specific columns were needed. This increases the I/O operations, leading to slower query performance compared to Parquet and ORC. SQL -- Simple SELECT query on Avro file in Hive CREATE EXTERNAL TABLE avro_table STORED AS AVRO LOCATION 'gs://your-bucket/dataset.avro'; SELECT column1, column2 FROM avro_table; ORC: ORC showed similar performance to Parquet, with slightly better compression and optimized storage techniques that enhanced read speeds. ORC files are also columnar, making them suitable for SELECT queries that only retrieve specific columns. Python # Simple SELECT query on ORC file orc_df.select("column1", "column2").show() 2. Filter Queries Parquet: Parquet maintained its performance advantage due to its columnar nature and the ability to skip irrelevant columns quickly. However, performance was slightly impacted by the need to scan more rows to apply filters. Python # Filter query on Parquet file filtered_parquet_df = parquet_df.filter(parquet_df.column1 == 'some_value') filtered_parquet_df.show() Avro: The performance decreased further due to the need to read entire rows and apply filters across all columns, increasing processing time. SQL -- Filter query on Avro file in Hive SELECT * FROM avro_table WHERE column1 = 'some_value'; ORC: This outperformed Parquet slightly in filter queries due to its predicate pushdown feature, which allows filtering directly at the storage level before the data is loaded into memory. Python # Filter query on ORC file filtered_orc_df = orc_df.filter(orc_df.column1 == 'some_value') filtered_orc_df.show() 3. Aggregation Queries Parquet: Parquet performed well, but slightly less efficient than ORC. The columnar format benefits aggregation operations by quickly accessing required columns, but Parquet lacks some of the built-in optimizations that ORC offers. Python # Aggregation query on Parquet file agg_parquet_df = parquet_df.groupBy("column1").agg({"column2": "sum", "column3": "avg"}) agg_parquet_df.show() Avro: Avro lagged behind due to its row-based storage, which required scanning and processing all columns for each row, increasing the computational overhead. SQL -- Aggregation query on Avro file in Hive SELECT column1, SUM(column2), AVG(column3) FROM avro_table GROUP BY column1; ORC: ORC outperformed both Parquet and Avro in aggregation queries. ORC's advanced indexing and built-in compression algorithms enabled faster data access and reduced I/O operations, making it highly suitable for aggregation tasks. Python # Aggregation query on ORC file agg_orc_df = orc_df.groupBy("column1").agg({"column2": "sum", "column3": "avg"}) agg_orc_df.show() 4. Join Queries Parquet: Parquet performed well, but not as efficiently as ORC in join operations due to its less optimized data reading for join conditions. Python # Join query between Parquet and ORC files joined_df = parquet_df.join(orc_df, parquet_df.key == orc_df.key) joined_df.show() ORC: ORC excelled in join queries, benefitting from advanced indexing and predicate pushdown capabilities, which minimized data scanned and processed during join operations. Python # Join query between two ORC files joined_orc_df = orc_df.join(other_orc_df, orc_df.key == other_orc_df.key) joined_orc_df.show() Avro: Avro struggled significantly with join operations, primarily due to the high overhead of reading full rows and the lack of columnar optimizations for join keys. SQL -- Join query between Parquet and Avro files in Hive SELECT a.column1, b.column2 FROM parquet_table a JOIN avro_table b ON a.key = b.key; Impact of Storage Format on Costs 1. Storage Efficiency and Cost Parquet and ORC (columnar formats) Compression and storage cost: Both Parquet and ORC are columnar storage formats that offer high compression ratios, especially for datasets with many repetitive or similar values within columns. This high compression reduces the overall data size, which in turn lowers storage costs, particularly in cloud environments where storage is billed per GB.Optimal for analytics workloads: Due to their columnar nature, these formats are ideal for analytical workloads where only specific columns are frequently queried. This means less data is read from storage, reducing both I/O operations and associated costs.Avro (row-based format) Compression and storage cost: Avro typically provides lower compression ratios than columnar formats like Parquet and ORC because it stores data row by row. This can lead to higher storage costs, especially for large datasets with many columns, as all data in a row must be read, even if only a few columns are needed.Better for write-heavy workloads: While Avro might result in higher storage costs due to lower compression, it is better suited for write-heavy workloads where data is continuously being written or appended. The cost associated with storage may be offset by the efficiency gains in data serialization and deserialization. 2. Data Processing Performance and Cost Parquet and ORC (columnar formats) Reduced processing costs: These formats are optimized for read-heavy operations, which makes them highly efficient for querying large datasets. Because they allow reading only the relevant columns needed for a query, they reduce the amount of data processed. This leads to lower CPU usage and faster query execution times, which can significantly reduce computational costs in a cloud environment where compute resources are billed based on usage.Advanced features for cost optimization: ORC, in particular, includes features like predicate push-down and built-in statistics, which enable the query engine to skip reading unnecessary data. This further reduces I/O operations and speeds up query performance, optimizing costs.Avro (row-based formats) Higher processing costs: Since Avro is a row-based format, it generally requires more I/O operations to read entire rows even when only a few columns are needed. This can lead to increased computational costs due to higher CPU usage and longer query execution times, especially in read-heavy environments.Efficient for streaming and serialization: Despite higher processing costs for queries, Avro is well suited for streaming and serialization tasks where fast write speeds and schema evolution are more critical. 3. Cost Analysis With Pricing details To quantify the cost impact of each storage format, we conducted an experiment using GCP. We calculated the costs associated with both storage and data processing for each format based on GCP's pricing models.Google Cloud storage costs Storage cost: This is calculated based on the amount of data stored in each format. GCP charges per GB per month for data stored in Google Cloud Storage. Compression ratios achieved by each format directly impact these costs. Columnar formats like Parquet and ORC typically have better compression ratios than row-based formats like Avro, resulting in lower storage costs.Here is a sample of how storage costs were calculated: Parquet: High compression resulted in reduced data size, lowering storage costsORC: Similar to Parquet, ORC's advanced compression also reduced storage costs effectivelyAvro: Lower compression efficiency led to higher storage costs compared to Parquet and ORC Python # Example of how to save data back to Google Cloud Storage in different formats # Save DataFrame as Parque parquet_df.write.parquet("gs://your-bucket/output_parquet") # Save DataFrame as Avro avro_df.write.format("avro").save("gs://your-bucket/output_avro") # Save DataFrame as ORC orc_df.write.orc("gs://your-bucket/output_orc") Data processing costs Data processing costs were calculated based on the compute resources required to perform various queries using Dataproc on GCP. GCP charges for dataproc usage based on the size of the cluster and the duration for which the resources are used.Compute costs: Parquet and ORC: Due to their efficient columnar storage, these formats reduced the amount of data read and processed, leading to lower compute costs. Faster query execution times also contributed to cost savings, especially for complex queries involving large datasets.Avro: Avro required more compute resources due to its row-based format, which increased the amount of data read and processed. This led to higher costs, particularly for read-heavy operations. Conclusion The choice of storage format in big data environments significantly impacts both query performance and cost. The above research and experiment demonstrate the following key points: Parquet and ORC: These columnar formats provide excellent compression, which reduces storage costs. Their ability to efficiently read only the necessary columns greatly enhances query performance and reduces data processing costs. ORC slightly outperforms Parquet in certain query types due to its advanced indexing and optimization features, making it an excellent choice for mixed workloads that require both high read and write performance.Avro: While Avro is not as efficient in terms of compression and query performance as Parquet and ORC, it excels in use cases requiring fast write operations and schema evolution. This format is ideal for scenarios involving data serialization and streaming where write performance and flexibility are prioritized over read efficiency.Cost efficiency: In a cloud environment like GCP, where costs are closely tied to storage and compute usage, choosing the right format can lead to significant cost savings. For analytics workloads that are predominantly read-heavy, Parquet and ORC are the most cost-effective options. For applications that require rapid data ingestion and flexible schema management, Avro is a suitable choice despite its higher storage and compute costs. Recommendations Based on our analysis, we recommend the following: For read-heavy analytical workloads: Use Parquet or ORC. These formats provide superior performance and cost efficiency due to their high compression and optimized query performance.For write-heavy workloads and serialization: Use Avro. It is better suited for scenarios where fast writes and schema evolution are critical, such as data streaming and messaging systems.For mixed workloads: ORC offers balanced performance for both read and write operations, making it an ideal choice for environments where data workloads vary. Final Thoughts Selecting the right storage format for big data environments is crucial for optimizing both performance and cost. Understanding the strengths and weaknesses of each format allows data engineers to tailor their data architecture to specific use cases, maximizing efficiency and minimizing expenses. As data volumes continue to grow, making informed decisions about storage formats will become increasingly important for maintaining scalable and cost-effective data solutions. By carefully evaluating the performance benchmarks and cost implications presented in this article, organizations can choose the storage format that best aligns with their operational needs and financial goals.
Snowflake is a leading cloud-native data warehouse. Integration patterns include batch data integration, Zero ETL, and near real-time data ingestion with Apache Kafka. This blog post explores the different approaches and discovers their trade-offs. Following industry recommendations, it is suggested to avoid anti-patterns like Reverse ETL and instead use data streaming to enhance the flexibility, scalability, and maintainability of enterprise architecture. Blog Series: Snowflake and Apache Kafka Snowflake is a leading cloud-native data warehouse. Its usability and scalability made it a prevalent data platform in thousands of companies. This blog series explores different data integration and ingestion options, including traditional ETL/iPaaS and data streaming with Apache Kafka. The discussion covers why point-to-point Zero-ETL is only a short-term win, why Reverse ETL is an anti-pattern for real-time use cases, and when a Kappa Architecture and shifting data processing “to the left” into the streaming layer helps to build transactional and analytical real-time and batch use cases in a reliable and cost-efficient way. Snowflake: Transitioning from a Cloud-Native Data Warehouse to a Data Cloud for Everything Snowflake is a leading cloud-based data warehousing platform (CDW) that allows organizations to store and analyze large volumes of data in a scalable and efficient manner. It works with cloud providers such as Amazon Web Services (AWS), Microsoft Azure, and Google Cloud Platform (GCP). Snowflake provides a fully managed and multi-cluster, multi-tenant architecture, making it easy for users to scale and manage their data storage and processing needs. The Origin: A Cloud Data Warehouse Snowflake provides a flexible and scalable solution for managing and analyzing large datasets in a cloud environment. It has gained popularity for its ease of use, performance, and ability to handle diverse workloads with its separation of computing and storage. Source: Snowflake Reporting and analytics are the major use cases. Snowflake earns its reputation for simplicity and ease of use. It uses SQL for querying, making it familiar to users with SQL skills. The platform abstracts many of the complexities of traditional data warehousing, reducing the learning curve. The Future: One 'Data Cloud' for Everything? Snowflake is much more than a data warehouse. Product innovation and several acquisitions strengthen the product portfolio. Several acquired companies focus on different topics related to the data management space, including search, privacy, data engineering, generative AI, and more. The company transitions into a "Data Cloud" (that's Snowflake’s current marketing term). Quote from Snowflake's website: "The Data Cloud is a global network that connects organizations to the data and applications most critical to their business. The Data Cloud enables a wide range of possibilities, from breaking down silos within an organization to collaborating over content with partners and customers and even integrating external data and applications for fresh insights. Powering the Data Cloud is Snowflake’s single platform. Its unique architecture connects businesses globally, at practically any scale to bring data and workloads together." Source: Snowflake Well, we will see what the future brings. Today, Snowflake's main use case is Cloud Data Warehouse, similar to SAP focusing on ERP or Databricks on data lake and ML/AI. I am always skeptical when a company tries to solve every problem and use case within a single platform. A technology has sweet spots for some use cases but brings trade-offs for other use cases from a technical and cost perspective. Snowflake Trade-Offs: Cloud-Only, Cost, and More While Snowflake is a powerful and widely used data cloud-native platform, it's important to consider some potential disadvantages: Cost: While Snowflake's architecture allows for scalability and flexibility, it can also result in costs that may be higher than anticipated. Users should carefully manage and monitor their resource consumption to avoid unexpected expenses. "DBT'ing" all the data sets at rest again and again increases the TCO significantly.Cloud-only: On-premise and hybrid architectures are not possible. As a cloud-based service, Snowflake relies on a stable and fast internet connection. In situations where internet connectivity is unreliable or slow, users may experience difficulties in accessing and working with their data.Data at rest: Moving large volumes of data around and processing it repeatedly is time-consuming, bandwidth-intensive, and costly. This is sometimes referred to as the "data gravity" problem, where it becomes challenging to move large datasets quickly because of physical constraints.Analytics: Snowflake initially started as a cloud data warehouse. It was never built for operational use cases. Choose the right tool for the job regarding SLAs, latency, scalability, and features. There is no single allrounder.Customization limitations: While Snowflake offers a wide range of features, there may be cases where users require highly specialized or custom configurations that are not easily achievable within the platform.Third-party tool integration: Although Snowflake supports various data integration tools and provides its own marketplace, there may be instances where specific third-party tools or applications are not fully integrated or at least not optimized for use with Snowflake. These trade-offs show why many enterprises (have to) combine Snowflake with other technologies and SaaS to build a scalable but also cost-efficient enterprise architecture. While all of the above trade-offs are obvious, cost concerns with the growing data sets and analytical queries are the clear number one I hear from customers these days. Snowflake Integration Patterns Every middleware provides a Snowflake connector today because of its market presence. Let's explore the different integration options: Traditional data integration with ETL, ESB or iPaaSELT within the data warehouseReverse ETL with purpose built productsData Streaming (usually via the industry standard Apache Kafka)Zero ETL via direct configurable point-to-point connectons 1. Traditional Data Integration: ETL, ESB, iPaaS ETL is the way most people think about integrating with a data warehouse. Enterprises started adopting Informatica and Teradata decades ago. The approach is still the same today: ETL meant batch processing in the past. An ESB (Enterprise Service Bus) often allows near real-time integration (if the data warehouse is capable of this) — but has scalability issues because of the underlying API (= HTTP/REST) or message broker infrastructure. iPaaS (Integration Platform as a Service) is very similar to an ESB, often from the same vendors, but provides a fully managed service in the public cloud. Often not cloud-native, but just deployed in Amazon EC2 instances (so-called cloud washing of legacy middleware). 2. ELT: Data Processing Within the Data Warehouse Many Snowflake users actually only ingest the raw data sets and do all the transformations and processing in the data warehouse. DBT is the favorite tool of most data engineers. The simple tool enables the straightforward execution of simple SQL queries to re-processing data again and again at rest. While the ELT approach is very intuitive for the data engineers, it is very costly for the business unit that pays the Snowflake bill. 3. Reverse ETL: "Real Time Batch" — What?! As the name says, Reverse ETL turns the story from ETL around. It means moving data from a cloud data warehouse into third-party systems to “make data operational”, as the marketing of these solutions says: Unfortunately, Reverse ETL is a huge ANTI-PATTERN to build real-time use cases. And it is NOT cost-efficient. If you store data in a data warehouse or data lake, you cannot process it in real time anymore as it is already stored at rest. These data stores are built for indexing, search, batch processing, reporting, model training, and other use cases that make sense in the storage system. But you cannot consume the data in real-time in motion from storage at rest: Instead, think about only feeding (the right) data into the data warehouse for reporting and analytics. Real-time use cases should run ONLY in a real-time platform like an ESB or a data streaming platform. 4. Data Streaming: Apache Kafka for Real-Time and Batch With Data Consistency Data streaming is a relatively new software category. It combines: Real-time messaging at scale for analytics and operational workloads.An event store for long-term persistence, true decoupling of producers and consumers, and replayability of historical data in a guaranteed order.Data integration in real-time at scale.Stream processing for stateless or stateful data correlation of real-time and historical data.Data governance for end-to-end visibility and observability across the entire data flow The de facto standard of data streaming is Apache Kafka. Apache Flink is becoming the de facto standard for stream processing, but Kafka Streams is another excellent and widely adopted Kafka-native library. In December 2023, the research company Forrester published “The Forrester Wave™: Streaming Data Platforms, Q4 2023." Get free access to the report here. The report explores what Confluent and other vendors like AWS, Microsoft, Google, Oracle, and Cloudera provide. Similarly, in April 2024, IDC published the IDC MarketScape for Worldwide Analytic Stream Processing 2024. Data streaming enables real-time data processing where it is appropriate from a technical perspective or where it adds business value versus batch processing. But data streaming also connects to non-real-time systems like Snowflake for reporting and batch analytics. Kafka Connect is part of open-source Kafka. It provides data integration capabilities in real-time at scale with no additional ETL tool. Native connectors to streaming systems (like IoT or other message brokers) and Change Data Capture (CDC) connectors that consume from databases like Oracle or Salesforce CRM push changes as events in real-time into Kafka. 5. Zero ETL: Point-To-Point Integrations and Spaghetti Architecture Zero ETL refers to an approach in data processing. ETL processes are minimized or eliminated. Traditional ETL processes — as discussed in the above sections — involve extracting data from various sources, transforming it into a usable format, and loading it into a data warehouse or data lake. In a Zero ETL approach, data is ingested in its raw form directly from a data source into a data lake without the need for extensive transformation upfront. This raw data is then made available for analysis and processing in its native format, allowing organizations to perform transformations and analytics on-demand or in real-time as needed. By eliminating or minimizing the traditional ETL pipeline, organizations can reduce data processing latency, simplify data integration, and enable faster insights and decision-making. Zero ETL From Salesforce CRM to Snowflake A concrete Snowflake example is the bi-directional integration and data sharing with Salesforce. The feature GA'ed recently enables "zero-ETL data sharing innovation that reduces friction and empowers organizations to quickly surface powerful insights across sales, service, marketing, and commerce applications". So far, the theory. Why did I put this integration pattern last and not first on my list if it sounds so amazing? Spaghetti Architecture: Integration and Data Mess For decades, you can do point-to-point integrations with CORBA, SOAP, REST/HTTP, and many other technologies. The consequence is a spaghetti architecture: Source: Confluent In a spaghetti architecture, code dependencies are often tangled and interconnected in a way that makes it challenging to make changes or add new features without unintended consequences. This can result from poor design practices, lack of documentation, or gradual accumulation of technical debt. The consequences of a spaghetti architecture include: Maintenance challenges: It becomes difficult for developers to understand and modify the codebase without introducing errors or unintended side effects.Scalability issues: The architecture may struggle to accommodate growth or changes in requirements, leading to performance bottlenecks or instability.Lack of agility: Changes to the system become slow and cumbersome, inhibiting the ability of the organization to respond quickly to changing business needs or market demands.Higher risk: The complexity and fragility of the architecture increase the risk of software bugs, system failures, and security vulnerabilities. Therefore, please do NOT build zero-code point-to-point spaghetti architectures if you care about the mid-term and long-term success of your company regarding data consistency, time-to-market, and cost efficiency. Short-Term and Long-Term Impact of Snowflake and Integration Patterns With(Out) Kafka Zero ETL using Snowflake sounds compelling. But it is only if you need a point-to-point connection. Most information is relevant in many applications. Data Streaming with Apache Kafka enables true decoupling. Ingest events only once and consume from multiple downstream applications independently with different communication patterns (real-time, batch, request-response). This has been a common pattern for years in legacy integration, for instance, mainframe offloading. Snowflake is rarely the only endpoint of your data. Reverse ETL is a pattern only needed if you ingest data into a single data warehouse or data lake like Snowflake with a dumb pipeline (Kafka, ETL tool, Zero ETL, or any other code). Apache Kafka allows you to avoid Revere ETL. It makes the architecture more performance, scalable, and flexible. Sometimes Reverse ETL cannot be avoided for organizations or historical reasons. That's fine. But don't design an enterprise architecture where you ingest data just to reverse it later. Most times, Reverse ETL is an anti-pattern. What is your point of view on integrating patterns for Snowflake? How do you integrate it into an enterprise architecture? What are your experiences and opinions? Let’s connect on LinkedIn and discuss it! Stay informed about new blog posts by subscribing to my newsletter.
Over the past few years, Apache Kafka has emerged as the top event streaming platform for streaming data/event ingestion. However, in an earlier version of Apache Kafka, 3.5, Zookeeper was the additional and mandatory component for managing and coordinating the Kafka cluster. Relying on ZooKeeper on the operational multi-node Kafka cluster introduced complexity and could be a single point of failure. ZooKeeper is completely a separate system having its own configuration file syntax, management tools, and deployment patterns. In-depth skills with experience are necessary to manage and deploy two individual distributed systems and an eventually up-and-running Kafka cluster. Having expertise in Kafka administration without ZooKeeper won’t be able to help to come out from the crisis, especially in the production environment where ZooKeeper runs in a completely isolated environment (Cloud). Kafka's reliance on ZooKeeper for metadata management was eliminated by introducing the Apache Kafka Raft (KRaft) consensus protocol. This eliminates the need for and configuration of two distinct systems — ZooKeeper and Kafka — and significantly simplifies Kafka's architecture by transferring metadata management into Kafka itself. Apache Kafka has officially deprecated ZooKeeper in version 3.5 and the latest version of Kafka which is 3.8, improved the KRaft metadata version-related messages. There is no use unless we consume the ingested events from the Kafka topic and process them further to achieve business value. RisingWave, on the other hand, makes processing streaming data easy, dependable, and efficient once event streaming flows to it from the Kafka topic. Impressively, RisingWave excels in delivering consistently updated materialized views, which are persistent data structures reflecting the outcomes of stream processing with incremental updates. In this article, I am going to explain step by step how to install and configure the latest version of Apache Kafka, version 3.8, on a single-node cluster running on Ubuntu-22.04, and subsequently integrate it with RisingWave that was also installed and configured on the same node. Assumptions OpenJDK version 17.0.12 has been installed and configured including setting JAVA_HOME on ~/.bashrc file. SSH connection has been installed and configured. Later, this node could be clubbed with a multi-node cluster on-prem. PostgreSQL client version 14.12 (not the PostgreSQL server) has been installed and configured. This is mandatory to connect via psql with the RisingWave streaming database. psql is a command-line interface for interacting with PostgreSQL databases that is included in the PostgreSQL package. Since RisingWave is wire-compatible with PostgreSQL, by using psql, we will connect to RisingWave so that SQL queries can be issued and manage database objects. You can refer here to install and configure psql on Ubuntu. Installation and Configuration of Apache Kafka-3.8 With KRaft The binary version of Kafka 3.8, which is the latest, can be downloaded here.Extract the tarball, and after extraction, the entire directory, “kafka_2.13-3.8.0”, is moved to /usr/local/kafka. Make sure we should have “root” privilege.We can create a location directory as "kafka-logs" where Kafka logs will be stored under /usr/local. Make sure the created directory has read-write permissions.As a configuration step, navigate to “kraft” directory available inside “/usr/local/kafka_2.13-3.8.0/config” and open the server.properties in the vi editor to manipulate/update key-value pair. The following keys should have the corresponding values.In KRaft mode, each Kafka server can be configured as a controller, a broker, or both using the process.roles property. Since it is a single-node cluster, I am setting both broker and controller. process.roles=broker,controller And, subsequently, node.id=1, num.partitions=5, and delete.topic.enable=true. Start and Verify the cluster The unique cluster ID generation and other required properties can be created by using the built-in script kafka-storage.sh available inside the bin directory. Make sure the files bootstrap.checkpoint and meta.properties were generated inside the created directory kafka-logs. A unique cluster ID is available inside meta.properties file. Start the broker using the following command from the terminal. Make sure the following should be displayed on the terminal. Topic Creation Using Apache Kafka’s built-in script, kafka-topics.sh, available inside the bin directory, I can create a topic on the running Kafka broker using the terminal. Create one topic named UPIStream with the number of partitions 3. Make RisingWave functional as a single instance in standalone mode. As said above, RisingWave in the standalone mode has been installed and configured on the same node where Kafka 3.8 on KRaft mode is operational. The RisingWave in standalone mode leverages the embedded SQLite database to store metadata and data in the file system. Before that, we need to install and configure the PostgreSQL client as mentioned in the assumptions. Open a terminal and execute the following curl command: $ curl https://risingwave.com/sh | sh We can start a RisingWave instance by running the following command on the terminal: $./risingwave Open a terminal to connect to RisingWave using the following command: $ psql -h 127.0.0.1 -p 4566 -d dev -U root Connecting Kafka Broker With RisingWave Here, I am going to connect RisingWave with the Kafka broker that I want to receive events from the created topic UPIStream. I need to create a source in RisingWave using the CREATE SOURCE command. When creating a source, I can choose to persist the data from the Kafka topic in RisingWave by using the CREATE TABLE command and specifying the connection settings and data format. There are more additional parameters available while connecting to the Kafka broker. You can refer here to learn more. Adding the following to simply connect the topic UPIStream on the psql terminal. Continuous Pushing of Events From Kafka Topic to RisingWave Using a developed simulator in Java, I have published a stream of UPI transaction events at an interval of 0.5 seconds in the following JSON format to the created topic UPIStream. Here is the one stream of events. JSON {"timestamp":"2024-08-20 22:39:20.866","upiID":"9902480505@pnb","name":"Brahma Gupta Sr.","note":" ","amount":"2779.00","currency":"INR","Latitude":"22.5348319","Longitude":"15.1863628","deviceOS":"iOS","targetApp":"GPAY","merchantTransactionId":"3619d3c01f5ad14f521b320100d46318b9","merchantUserId":"11185368866533@sbi"} Verify and Analyze Events on RisingWave Move to the psql terminal that is already connected with the RisingWave single instance consuming all the published events from the Kafka topic UPIStream and storing on the source UPI_Transaction_Stream. On the other side, the Java simulator is running and continuously publishing individual events with different data to the topic UPIStream at an interval of 0.5 seconds, and subsequently, each event is getting ingested to the RisingWave instance for further processing/analyzing. After processing/modifying the events using the Materialized views, I could sink or send those events back to the different Kafka topics so that downstream applications can consume those for further analytics. I’ll articulate this in my upcoming blog, so please stay tuned. Since I have not done any processing, modification, or computations on the ingested events in the running RisingWave instance, I created a simple Materialized view to observe a few fields in the events to make sure integration with Apache Kafka on KRaft mode with RisingWave is working absolutely fine or not. And the answer is a big YES. Final Note Especially for the on-premises deployment of a multi-node Kafka cluster, Apache Kafka 3.8 is an excellent release where we completely bypass the ZooKeeper dependency. Besides, it's easy to set up a development environment for those who want to explore more about event streaming platforms like Apache Kafka. On the other hand, RisingWave functions as a streaming database that innovatively utilizes materialized views to power continuous analytics and data transformations for time-sensitive applications like alerting, monitoring, and trading. Ultimately, it's becoming a game-changer as Apache Kafka joins forces with RisingWave to unlock business value from real-time stream processing. I hope you enjoyed reading this. If you found this article valuable, please consider liking and sharing it.
Previous Articles on Snowflake Tour of Snowflake ingestion using CockroachDB and Redpanda ConnectIntegrating Snowflake with Trino Previous Articles on CockroachDB CDC Emitting Protocol Buffers with CockroachDB CDC QueriesUsing CockroachDB CDC with Apache PulsarUsing CockroachDB CDC with Azure Event HubsSaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran, and SnowflakeUsing CockroachDB CDC with Confluent Cloud Kafka and Schema RegistryCockroachDB CDC using Minio as cloud storage sinkCockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink Motivation This article builds upon the previous discussion in "Tour of Snowflake ingestion using CockroachDB and Redpanda Connect," where we investigated the process of streaming changefeeds from CockroachDB to Snowflake using Redpanda Connect and Snowpipe in batch mode. Here, we will shift our focus to Kafka Connect and demonstrate how both batch and streaming modes can be utilized for data ingestion into Snowflake. Overview Deploy a CockroachDB cluster with enterprise changefeedsDeploy SnowflakeDeploy Kafka ConnectVerifyConclusion Detailed Instructions Deploy a CockroachDB Cluster With Enterprise Changefeeds Start by either launching a CockroachDB instance or utilizing a managed service. To enable CDC, execute the following commands: SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true; Verify that changefeeds are enabled: SHOW CLUSTER SETTING kv.rangefeed.enabled; If the value is false, update it to true. Create a source table: CREATE TABLE cockroachdb ( id INT PRIMARY KEY, value STRING DEFAULT md5(random()::text), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT NULL); Insert random data: INSERT INTO cockroachdb SELECT (generate_series(1, 10000)); Update a row: UPDATE cockroachdb SET value = 'UPDATED', updated_at = now() WHERE id = 1; Create a changefeed job pointing to a local instance of Redpanda: CREATE CHANGEFEED FOR TABLE cockroachdb INTO 'kafka://redpanda:29092'; Inspect the data: SELECT * FROM cockroachdb LIMIT 5; id | value | created_at | updated_at -----+----------------------------------+-------------------------------+-------------------------------- 1 | UPDATED | 2024-09-09 13:17:57.837984+00 | 2024-09-09 13:17:57.917108+00 2 | 27a41183599c44251506e2971ba78426 | 2024-09-09 13:17:57.837984+00 | NULL 3 | 3bf8bc26a750a15691ec4d7ddbb7f5e5 | 2024-09-09 13:17:57.837984+00 | NULL 4 | b8c5786e8651ddfb3a68eabeadb52f2e | 2024-09-09 13:17:57.837984+00 | NULL 5 | 3a24df165773639ce89d0d877e7103b7 | 2024-09-09 13:17:57.837984+00 | NULL (5 rows) The next step is to set up the Snowflake Kafka connector. Deploy Snowflake Create a database and schema for outputting changefeed data: USE ROLE SYSADMIN; CREATE OR REPLACE DATABASE KAFKADB; CREATE OR REPLACE SCHEMA kafka_schema; Follow the Snowflake documentation to configure the Kafka connector. Create the necessary tables: create or replace table kafkatb_batch( RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT ); create or replace table kafkatb_streaming( RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT ); Set up roles and permissions: -- Use a role that can create and manage roles and privileges. USE ROLE securityadmin; -- Create a Snowflake role with the privileges to work with the connector. CREATE OR REPLACE ROLE kafka_connector_role_1; -- Grant privileges on the database. GRANT USAGE ON DATABASE kafkadb TO ROLE kafka_connector_role_1; -- Grant privileges on the schema. GRANT USAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE TABLE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE STAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE PIPE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1; -- Only required if the Kafka connector will load data into an existing table. GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_batch TO ROLE kafka_connector_role_1; GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_streaming TO ROLE kafka_connector_role_1; -- Grant the custom role to an existing user. GRANT ROLE kafka_connector_role_1 TO USER username; -- Set the custom role as the default role for the user. -- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user. ALTER USER username SET DEFAULT_ROLE = kafka_connector_role_1; Ensure you follow the documentation for setting up key pair authentication for the Snowflake Kafka connector. Deploy Kafka Connect Run Redpanda using Docker Compose. docker compose -f compose-redpandadata.yaml up -d Once up, navigate to the Redpanda Console. Click into the cockroachdb topic: Install the Snowflake Kafka connector: confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:latest Use the following configuration for Kafka Connect in distributed mode, saved as connect-distributed.properties: bootstrap.servers=172.18.0.3:29092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=10000 plugin.path=/usr/share/confluent-hub-components,plugin.path=/usr/local/share/kafka/plugins,/usr/share/filestream-connectors Deploy Kafka Connect in distributed mode: ./kafka-connect/bin/connect-distributed.sh connect-distributed.properties Register the Snowflake connector with the following configuration, saved as snowflake-sink-batch.json: { "name":"snowflake-sink-batch", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"cockroachdb", "snowflake.topic2table.map": "cockroachdb:kafkatb_batch", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"account-name:443", "snowflake.user.name":"username", "snowflake.private.key":"private-key", "snowflake.private.key.passphrase":"", "snowflake.database.name":"kafkadb", "snowflake.schema.name":"kafka_schema", "snowflake.role.name":"kafka_connector_role_1", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter" } } Publish the connector configuration: curl -d @"snowflake-sink-batch.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors Verify the connector in the Kafka Connect UI and in the Kafka Connect section of the Redpanda Console. If you click on the snowflake-sink-batch sink, you can see additional information. The comprehensive steps needed to set this up are thoroughly outlined in the tutorial. Data will now flow into Snowflake in batch mode, with updates occurring every 60 seconds as determined by the buffer.flush.time parameter. You can now query the data in Snowflake: select * from kafkatb_batch limit 5; If everything is configured correctly, the data from CockroachDB should be available in Snowflake in real-time or in batches, depending on your configuration. record_metadata: { "CreateTime": 1725887877966, "key": "[3]", "offset": 30007, "partition": 0, "topic": "cockroachdb" } record_content: { "after": { "created_at": "2024-09-09T13:17:57.837984Z", "id": 1, "updated_at": "2024-09-09T13:17:57.917108Z", "value": "UPDATED" } } The next step is to configure the connector in streaming mode. First, stop the current connector with the following command: curl -X DELETE http://localhost:8083/connectors/snowflake-sink-batch The updated connector configuration will appear as follows: { "name":"snowflake-sink-streaming", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"cockroachdb", "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming", "buffer.count.records":"10000", "buffer.flush.time":"10", "buffer.size.bytes":"5000000", "snowflake.url.name":"<snowflake-account>:443", "snowflake.user.name":"username", "snowflake.private.key":"private-key", "snowflake.private.key.passphrase":"", "snowflake.database.name":"kafkadb", "snowflake.schema.name":"kafka_schema", "snowflake.role.name":"kafka_connector_role_1", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", "errors.log.enable":"true", "schemas.enable":"false" } } Take note of the snowflake.ingestion.method parameter. This feature removes the need to wait 60 seconds to push data to Snowflake, allowing us to reduce the buffer.flush.time to 10 seconds. To deploy the connector, use the following command: curl -d @"snowflake-sink-streaming.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors Shortly after deployment, the data will be available in the Snowflake table. The previous examples demonstrated how data was ingested into predefined Snowflake tables. The following method will automatically infer the schema from the Kafka messages: { "name":"snowflake-sink-streaming-schematized", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"cockroachdb", "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming_schematized", "buffer.count.records":"10000", "buffer.flush.time":"10", "buffer.size.bytes":"5000000", "snowflake.url.name":"<snowflake-account>:443", "snowflake.user.name":"username", "snowflake.private.key":"private-key", "snowflake.private.key.passphrase":"", "snowflake.database.name":"kafkadb", "snowflake.schema.name":"kafka_schema", "snowflake.role.name":"kafka_connector_role_1", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", "errors.log.enable":"true", "schemas.enable":"false", "snowflake.enable.schematization": "TRUE" } } Save this as snowflake-sink-streaming-schematized.json and deploy it using: curl -d @"snowflake-sink-streaming-schematized.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors Upon deployment, a new table will be created in Snowflake with the following schema: create or replace TABLE KAFKADB.KAFKA_SCHEMA.KAFKATB_STREAMING_SCHEMATIZED ( RECORD_METADATA VARIANT COMMENT 'created by automatic table creation from Snowflake Kafka Connector', AFTER VARIANT COMMENT 'column created by schema evolution from Snowflake Kafka Connector' ); To inspect the table, use the following query: SELECT after AS record FROM kafkatb_streaming_schematized LIMIT 5; Sample result: { "created_at": "2024-09-09T16:39:34.993226Z", "id": 18712, "updated_at": null, "value": "0d6bd8a4a790aab95c97a084d17bd820" } Verify We can flatten the data for easier manipulation using the following query: USE ROLE securityadmin; GRANT CREATE VIEW ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1; USE ROLE kafka_connector_role_1; USE DATABASE KAFKADB; USE SCHEMA KAFKA_SCHEMA; CREATE VIEW v_kafkatb_batch_flattened AS SELECT PARSE_JSON(record_content:after):id AS ID, PARSE_JSON(record_content:after):value AS VALUE, PARSE_JSON(record_content:after):created_at AS CREATED_AT, PARSE_JSON(record_content:after):updated_at AS UPDATED_AT FROM kafkatb_batch; SELECT * FROM v_kafkatb_batch_flattened limit 1; ID VALUE CREATED_AT UPDATED_AT 1 "UPDATED" "2024-09-09T13:17:57.837984Z" "2024-09-09T13:17:57.917108Z" Alternatively, for the schematized table, the view creation statement would be: CREATE VIEW v_kafkatb_streaming_schematized_flattened AS SELECT PARSE_JSON(after):id AS ID, PARSE_JSON(after):value AS VALUE, PARSE_JSON(after):created_at AS CREATED_AT, PARSE_JSON(after):updated_at AS UPDATED_AT FROM kafkatb_streaming_schematized; To verify the data flow, make an update in CockroachDB and check for the changes in Snowflake: UPDATE cockroachdb SET value = 'UPDATED', updated_at = now() WHERE id = 20000; In Snowflake, execute the following query to confirm the update: SELECT * FROM v_kafkatb_streaming_schematized_flattened where VALUE = 'UPDATED'; Sample result: ID VALUE CREATED_AT UPDATED_AT 20000 "UPDATED" "2024-09-09T18:15:13.460078Z" "2024-09-09T18:16:56.550778Z" 19999 "UPDATED" "2024-09-09T18:15:13.460078Z" "2024-09-09T18:15:27.365272Z" The architectural diagram is below: Conclusion In this process, we explored Kafka Connect as a solution to stream changefeeds into Snowflake. This approach provides greater control over how messages are delivered to Snowflake, leveraging the Snowflake Kafka Connector with Snowpipe Streaming for real-time, reliable data ingestion.
Stream processing has existed for decades. However, it really kicks off in the 2020s thanks to the adoption of open-source frameworks like Apache Kafka and Flink. Fully managed cloud services make it easy to configure and deploy stream processing in a cloud-native way; even without the need to write any code. This blog post explores the past, present, and future of stream processing. The discussion includes various technologies and cloud services, low code/ no code trade-offs, outlooks into the support of machine learning and GenAI, streaming databases, and the integration between data streaming and data lakes with Apache Iceberg. In December 2023, the research company proved that data streaming is a new software category and not just yet another integration or data platform. Forrester published “The Forrester Wave™: Streaming Data Platforms, Q4 2023“. Get free access to the report here. The leaders are Microsoft, Google, and Confluent, followed by Oracle, Amazon, Cloudera, and a few others. A great time to review the past, present, and future of stream processing as a key component in a data streaming architecture. The Past of Stream Processing: The Move from Batch to Real-Time The evolution of stream processing began as industries sought more timely insights from their data. Initially, batch processing was the norm. Data was collected over a period, stored, and processed at intervals. This method, while effective for historical analysis, proved inefficient for real-time decision-making. In parallel to batch processing, message queues were created to provide real-time communication for transactional data. Message Brokers like IBM MQ or TIBCO EMS were a common way to decouple applications. Applications send data and receive data in an event-driven architecture without worrying about whether the recipient is ready, how to handle backpressure, etc. The stream processing journey began. Stream Processing Is a Journey Over Decades... ... and we are still in a very early stage at most enterprises. Here is an excellent timeline of TimePlus about the journey of stream processing open source frameworks, proprietary platforms, and SaaS cloud services: Source: TimePlus The stream processing journey started decades ago with research and the first purpose-built proprietary products for specific use cases like stock trading. Open source stream processing frameworks emerged during the big data and Hadoop era to make at least the ingestion layer a bit more real-time. Today, most enterprises at least get started understanding the value of stream processing for analytical and transactional use cases across industries. The cloud is a fundamental change as you can start streaming and processing data with a button click leveraging fully managed SaaS and simple UIs (if you don't want to operate infrastructure or write low-level source code). TIBCO StreamBase, Software AG Apama, IBM Streams The advent of message queue technologies like IBM MQ and TIBCO EMS moved many critical applications to real-time message brokers. Real-time messaging enables the consumption of data in real-time to store it in a database, mainframe, or application for further processing. However, only true stream processing capabilities included in tools like TIBCO StreamBase, Software AG Apama, or IBM (InfoSphere) Streams marked a significant shift towards real-time data processing. These products enable businesses to react to information as it arrives by processing and correlating the data in motion. Visual coding in tools like StreamBase or Apama represents an innovative approach to developing stream processing solutions. These tools provide a graphical interface that allows developers and analysts to design, build, and test applications by connecting various components and logic blocks visually, rather than writing code manually. Under the hood, the code generation worked with a Streaming SQL language. Here is a screenshot of the TIBCO StreamBase IDE for visual drag & drop of streaming pipelines: TIBCO StreamBase IDE Some drawbacks of these early stream processing solutions include high cost, vendor lock-in, no flexibility regarding tools or APIs, and missing communities. These platforms are monolithic and were built far before cloud-native elasticity and scalability became a requirement for most RFIs and RFPs when evaluating vendors. Open Source Event Streaming With Apache Kafka The actual significant change for stream processing came with the introduction of Apache Kafka, a distributed streaming platform that allowed for high-throughput, fault-tolerant handling of real-time data feeds. Kafka, alongside other technologies like Apache Flink, revolutionized the landscape by providing the tools necessary to move from batch to real-time stream processing seamlessly. The adoption of open-source technologies changed all industries. Openness, flexibility, and community-driven development enabled easier influence on the features and faster innovation. Over 100.000 organizations use Apache Kafka. The massive adoption came from a unique combination of capabilities: Messaging, storage, data integration, and stream processing, all in one scalable and distributed infrastructure. Various open-source stream processing engines emerged. Kafka Streams was added to the Apache Kafka project. Other examples include Apache Storm, Spark Streaming, and Apache Flink. The Present of Stream Processing: Architectural Evolution and Mass Adoption The fundamental change to processing data in motion has enabled the development of data products and data mesh. Decentralizing data ownership and management with domain-driven design and technology-independent microservices promotes a more collaborative and flexible approach to data architecture. Each business unit can choose its own technology, API, cloud service, and communication paradigms like real-time, batch, or request-response. From Lambda Architecture to Kappa Architecture Today, stream processing is at the heart of modern data architecture, thanks in part to the emergence of the Kappa architecture. This model simplifies the traditional Lambda Architecture by using a single stream processing system to handle both real-time and historical data analysis, reducing complexity and increasing efficiency. Lambda architecture with separate real-time and batch layers: Kappa architecture with a single pipeline for real-time and batch processing: For more details about the pros and cons of Kappa vs. Lambda, check out my "Kappa Architecture is Mainstream Replacing Lambda". It explores case studies from Uber, Twitter, Disney and Shopify. Kafka Streams and Apache Flink Become Mainstream Apache Kafka has become synonymous with building scalable and fault-tolerant streaming data pipelines. Kafka facilitating true decoupling of domains and applications makes it integral to microservices and data mesh architectures. Plenty of stream processing frameworks, products, and cloud services emerged in the past years. This includes open-source frameworks like Kafka Streams, Apache Storm, Samza, Flume, Apex, Flink, and Spark Streaming, and cloud services like Amazon Kinesis, Google Cloud Dataflow, and Azure Stream Analytics. The "Data Streaming Landscape 2024" gives an overview of relevant technologies and vendors. Apache Flink seems to become the de facto standard for many enterprises (and vendors). The adoption is like Kafka four years ago: Source: Confluent This does not mean other frameworks and solutions are bad. For instance, Kafka Streams is complementary to Apache Flink, as it suits different use cases. No matter what technology enterprises choose, the mass adoption of stream processing is in progress right now. This includes modernizing existing batch processes AND building innovative new business models that only work in real-time. As a concrete example, think about ride-hailing apps like Uber, Lyft, FREENOW, and Grab. They are only possible because events are processed and correlated in real time. Otherwise, everyone would still prefer a traditional taxi. Stateless and Stateful Stream Processing In data streaming, stateless and stateful stream processing are two approaches that define how data is handled and processed over time: The choice between stateless and stateful processing depends on the specific requirements of the application, including the nature of the data, the complexity of the processing needed, and the performance and scalability requirements. Stateless Stream Processing Stateless Stream Processing refers to the handling of each data point or event independently from others. In this model, the processing of an event does not depend on the outcomes of previous events or requires keeping track of the state between events. Each event is processed based on the information it contains, without the need for historical context or future data points. This approach is simpler and can be highly efficient for tasks that don't require knowledge beyond the current event being processed. The implementation could be a stream processor (like Kafka Streams or Flink), functionality in a connector (like Kafka Connect Single Message Transforms), or a Web Assembly (WASM) embedded into a streaming platform. Stateful Stream Processing Stateful Stream Processing involves keeping track of information (state) across multiple events to perform computations that depend on data beyond the current event. This model allows for more complex operations like windowing (aggregating events over a specific time frame), joining streams of data based on keys, and tracking sequences of events or patterns over time. Stateful processing is essential for scenarios where the outcome depends on accumulated knowledge or trends derived from a series of data points, not just on a single input. The implementation is much more complex and challenging than stateless stream processing. A dedicated stream processing implementation is required. Dedicated distributed engines (like Apache Flink) handle stateful computations, memory usage, and scalability better than Kafka-native tools like Kafka Streams or KSQL (because the latter are bound to Kafka Topics). Low Code, No Code, AND A Lot of Code! No-code and low-code tools are software platforms that enable users to develop applications quickly and with minimal coding knowledge. These tools provide graphical user interfaces with drag-and-drop capabilities, allowing users to assemble and configure applications visually rather than writing extensive lines of code. Common features and benefits of visual coding: Rapid development: Both types of platforms significantly reduce development time, enabling faster delivery of applications.User-friendly interface: The graphical interface and drag-and-drop functionality make it easy for users to design, build, and iterate on applications.Cost reduction: By enabling quicker development with fewer resources, these platforms can lower the cost of software creation and maintenance.Accessibility: They make application development accessible to a broader range of people, reducing the dependency on skilled developers for every project. So far, the theory. Disadvantages of Visual Coding Tools Actually, StreamBase, Apama, et al., had great visual coding offerings. However, no-code/low-code tools have many drawbacks and disadvantages, too: Limited customization and flexibility: While these platforms can speed up development for standard applications, they may lack the flexibility needed for highly customized solutions. Developers might find it challenging to implement specific functionalities that aren't supported out of the box.Dependency on vendors: Using no-code/low-code platforms often means relying on third-party vendors for the platform's stability, updates, and security. This dependency can lead to potential issues if the vendor cannot maintain the platform or goes out of business. And often the platform team is the bottleneck for implementing new business or integration logic.Performance concerns: Applications built with no-code/low-code platforms may not be as optimized as those developed with traditional coding, potentially leading to lower performance or inefficiencies, especially for complex applications.Scalability issues: As businesses grow, applications might need to scale up to support increased loads. No-code/low-code platforms might not always support this level of scalability or might require significant workarounds, affecting performance and user experience.Over-reliance on non-technical users: While empowering citizen developers is a key advantage of these platforms, it can also lead to governance challenges. Without proper oversight, non-technical users might create inefficient workflows or data structures, leading to technical debt and maintenance issues.Cost over time: Initially, no-code/low-code platforms can reduce development costs. However, as applications grow and evolve, the ongoing subscription costs or fees for additional features and scalability can become significant. Flexibility Is King: Stream Processing for Everyone! Microservices, domain-driven design, data mesh... All these modern design approaches taught us to provide flexible enterprise architectures. Each business unit and persona should be able to choose its own technology, API, or SaaS. And no matter if you do real-time, near real-time, batch, or request-response communication. Apache Kafka provides the true decoupling out-of-the-box. Therefore, low-code or now-code tools are an option. However, a data scientist, data engineer, software developer, or citizen integrator can choose their own technology for stream processing. The past, present, and future of stream processing show different frameworks, visual coding tools and even applied generative AI. One solution does NOT replace but complement the other alternatives: The Future of Stream Processing: Serverless SaaS, GenAI, and Streaming Databases Stream processing is set to grow exponentially in the future, thanks to advancements in cloud computing, SaaS, and AI. Let's explore the future of stream processing and look at the expected short, mid and long-term developments. SHORT TERM: Fully Managed Serverless SaaS for Stream Processing The cloud's scalability and flexibility offer an ideal environment for stream processing applications, reducing the overhead and resources required for on-premise solutions. As SaaS models continue to evolve, stream processing capabilities will become more accessible to a broader range of businesses, democratizing real-time data analytics. For instance, look at the serverless Flink Actions in Confluent Cloud. You can configure and deploy stream processing for use cases like deduplication or masking without any code: Source: Confluent MIDTERM: Automated Tooling and the Help of GenAI Integrating AI and machine learning with stream processing will enable more sophisticated predictive analytics. This opens new frontiers for automated decision-making and intelligent applications while continuously processing incoming event streams. The full potential of embedding AI into stream processing has to be learned and implemented in the upcoming years. For instance, automated data profiling is one instance of stream processing that GenAI can support significantly. Software tools analyze and understand the quality, structure, and content of a dataset without manual intervention as the events flow through the data pipeline in real time. This process typically involves examining the data to identify patterns, anomalies, missing values, and inconsistencies. A perfect fit for stream processing! Automated data profiling in the stream processor can provide insights into data types, frequency distributions, relationships between columns, and other metadata information crucial for data quality assessment, governance, and preparation for further analysis or processing. MIDTERM: Streaming Storage and Analytics With Apache Iceberg Apache Iceberg is an open-source table format for huge analytic datasets that provides powerful capabilities in managing large-scale data in data lakes. Its integration with streaming data sources like Apache Kafka and analytics platforms, such as Snowflake, Starburst, Dremio, AWS Athena, or Databricks, can significantly enhance data management and analytics workflows. Integration Between Streaming Data From Kafka and Analytics on Databricks or Snowflake Using Apache Iceberg Supporting the Apache Iceberg table format might be a crucial strategic move by streaming and analytics frameworks, vendors, and cloud services. Here are some key benefits from the enterprise architecture perspective: Unified batch and stream processing: Iceberg tables can serve as a bridge between streaming data ingestion from Kafka and downstream analytic processing. By treating streaming data as an extension of a batch-based table, Iceberg enables a seamless transition from real-time to batch analytics, allowing organizations to analyze data with minimal latency.Schema evolution: Iceberg supports schema evolution without breaking downstream systems. This is useful when dealing with streaming data from Kafka, where the schema might evolve. Consumers can continue reading data using the schema they understand, ensuring compatibility and reducing the need for data pipeline modifications.Time travel and snapshot isolation: Iceberg's time travel feature allows analytics on data as it looked at any point in time, providing snapshot isolation for consistent reads. This is crucial for reproducible reporting and debugging, especially when dealing with continuously updating streaming data from Kafka.Cross-platform compatibility: Iceberg provides a unified data layer accessible by different compute engines, including those used by Databricks and Snowflake. This enables organizations to maintain a single copy of their data that is queryable across different platforms, facilitating a multi-tool analytics ecosystem without data silos. LONG-TERM: Transactional + Analytics = Streaming Database? Streaming databases, like RisingWave or Materialize, are designed to handle real-time data processing and analytics. This offers a way to manage and query data that is continuously generated from sources like IoT devices, online transactions, and application logs. Traditional databases that are optimized for static data are stored on disk. Instead, streaming databases are built to process and analyze data in motion. They provide insights almost instantaneously as the data flows through the system. Streaming databases offer the ability to perform complex queries and analytics on streaming data, further empowering organizations to harness real-time insights. The ongoing innovation in streaming databases will probably lead to more advanced, efficient, and user-friendly solutions, facilitating broader adoption and more creative applications of stream processing technologies. Having said this, we are still in the very early stages. It is not clear yet when you really need a streaming database instead of a mature and scalable stream processor like Apache Flink. The future will show us that competition is great for innovation. The Future of Stream Processing is Open Source and Cloud The journey from batch to real-time processing has transformed how businesses interact with their data. The continued evolution couples technologies like Apache Kafka, Kafka Streams, and Apache Flink with the growth of cloud computing and SaaS. Stream processing will redefine the future of data analytics and decision-making. As we look ahead, the future possibilities for stream processing are boundless, promising more agile, intelligent, and real-time insights into the ever-increasing streams of data. If you want to learn more, listen to the following on-demand webinar about the past, present, and future of stream processing with me joined by the two streaming industry veterans Richard Tibbets (founder of StreamBase) and Michael Benjamin (TimePlus). I had the please work with them for a few years at TIBCO where we deployed StreamBase at many Financial Services companies for stock trading and similar use cases: What does your stream processing journey look like? In which decade did you join? Or are you just learning with the latest open-source frameworks or cloud services? Let’s connect on LinkedIn and discuss it! Stay informed about new blog posts by subscribing to my newsletter.
Advanced SQL is an indispensable tool for retrieving, analyzing, and manipulating substantial datasets in a structured and efficient manner. It is extensively utilized in data analysis and business intelligence, as well as in various domains such as software development, finance, and marketing. Mastering advanced SQL can empower you to: Efficiently retrieve and analyze large datasets from databases.Create intricate reports and visualizations to derive meaningful insights from your data.Write optimized queries to enhance the performance of your database.Utilize advanced features such as window functions, common table expressions, and recursive queries.Understand and fine-tune the performance of your database.Explore, analyze, and derive insights from data more effectively.Provide data-driven insights and make decisions based on solid evidence. In today's data-driven landscape, the ability to handle and interpret big data is increasingly vital. Proficiency in advanced SQL can render you a valuable asset to any organization that manages substantial amounts of data. Below are some examples of advanced SQL queries that illustrate the utilization of complex and powerful SQL features: Using Subqueries in the SELECT Clause SQL SELECT customers.name, (SELECT SUM(amount) FROM orders WHERE orders.customer_id = customers.id) AS total_spent FROM customers ORDER BY total_spent DESC; This query employs a subquery in the SELECT clause to compute the total amount spent by each customer, returning a list of customers along with their total spending, ordered in descending order. Using the WITH Clause for Common Table Expressions (CTEs) SQL WITH top_customers AS (SELECT customer_id, SUM(amount) AS total_spent FROM orders GROUP BY customer_id ORDER BY total_spent DESC LIMIT 10), customer_info AS (SELECT id, name, email FROM customers) SELECT customer_info.name, customer_info.email, top_customers.total_spent FROM top_customers JOIN customer_info ON top_customers.customer_id = customer_info.id; This query uses the WITH clause to define two CTEs, "top_customers" and "customer_info" which simplifies and modularizes the query. The first CTE identifies the top 10 customers based on their total spending, and the second CTE retrieves customer information. The final result is obtained by joining these two CTEs. Using Window Functions To Calculate Running Totals SQL SELECT name, amount, SUM(amount) OVER (PARTITION BY name ORDER BY date) AS running_total FROM transactions ORDER BY name, date; This query utilizes a window function,`SUM(amount) OVER (PARTITION BY name ORDER BY date)`, to calculate the running total of transactions for each name. It returns all transactions along with the running total for each name, ordered by name and date. Using Self-Join SQL SELECT e1.name AS employee, e2.name AS manager FROM employees e1 JOIN employees e2 ON e1.manager_id = e2.id; This query employs a self-join to link a table to itself, illustrating the relationship between employees and their managers. It returns a list of all employees and their corresponding managers. Using JOIN, GROUP BY, HAVING SQL SELECT orders.product_id, SUM(order_items.quantity) AS product_sold, products.name FROM orders JOIN order_items ON orders.id = order_items.order_id JOIN products ON products.id = order_items.product_id GROUP BY orders.product_id HAVING SUM(order_items.quantity) > 100; This query uses JOIN to combine the orders and order_items tables on the order_id column, and joins with the product table on the product_id column. It then uses the GROUP BY clause to group results by product_id and the HAVING clause to filter products with more than 100 units sold. The SELECT clause lists the product_id, total quantity sold, and product name. Using COUNT() and GROUP BY SQL SELECT department, COUNT(employee_id) AS total_employees FROM employees GROUP BY department ORDER BY total_employees DESC; This query uses the COUNT() function to tally the number of employees in each department and the GROUP BY clause to group results by department. The SELECT clause lists the department name and total number of employees, ordered by total employees in descending order. Using UNION and ORDER BY SQL (SELECT id, name, 'customer' AS type FROM customers) UNION (SELECT id, name, 'employee' AS type FROM employees) ORDER BY name; This query uses the UNION operator to combine the results of two separate SELECT statements—one for customers and one for employees — and orders the final result set by name. The UNION operator removes duplicates if present. Recursive Queries A recursive query employs a self-referencing mechanism to perform tasks, such as traversing a hierarchical data structure like a tree or graph. Example: SQL WITH RECURSIVE ancestors (id, parent_id, name) AS ( -- Anchor query to select the starting node SELECT id, parent_id, name FROM nodes WHERE id = 5 UNION -- Recursive query to select the parent of each node SELECT nodes.id, nodes.parent_id, nodes.name FROM nodes JOIN ancestors ON nodes.id = ancestors.parent_id ) SELECT * FROM ancestors; This query uses a CTE called "ancestors" to define the recursive query with columns: id, parent_id, and name. The anchor query selects the starting node (id = 5), and the recursive query selects each node's parent, joining it with the "ancestors" CTE on the parent_id column. This process continues until the root of the tree is reached or the maximum recursion level is attained. The final query retrieves all identified ancestors. While recursive queries are potent, they can be resource-intensive; therefore, they should be used judiciously to avoid performance issues. Ensure proper recursion termination and consider the maximum recursion level permitted by your DBMS. Not all SQL implementations support recursion, but major RDBMS systems such as PostgreSQL, Oracle, SQL Server, and SQLite do support recursive queries using the WITH RECURSIVE keyword. These examples showcase just a few of SQL's powerful capabilities and the diverse types of queries you can construct. The specific details of the queries will depend on your database structure and the information you seek to retrieve, but these examples should provide a foundational understanding of what is achievable with advanced SQL.
At first, data tiering was a tactic used by storage systems to reduce data storage costs. This involved grouping data that was not accessed as often into more affordable, if less effective, storage array choices. Data that has been idle for a year or more, for example, may be moved from an expensive Flash tier to a more affordable SATA disk tier. Even though they are quite costly, SSDs and flash can be categorized as high-performance storage classes. Smaller datasets that are actively used and require the maximum performance are usually stored in Flash. Cloud data tiering has gained popularity as customers seek alternative options for tiering or archiving data to a public cloud. Public clouds presently offer a mix of object and file storage options. Object storage classes such as Amazon S3 and Azure Blob (Azure Storage) deliver significant cost efficiency and all the benefits of object storage without the complexities of setup and management. The term “hot” data as well as "cold" data can be viewed differently from a multi-node Kafka cluster perspective. The data ingested into a Kafka topic and reaching the downstream applications for quick retrieval as the final output after passing through various data pipelines can be termed "hot” data. For example, IoT sensor events from various critical equipment used in oil refineries. Similarly, the ingested data into the Kafka topic that is less frequently accessed by the downstream application can be termed “cold” data. As an example of “cold” data, we can consider inventory updates in e-commerce applications by ingesting product quantities, etc. from third-party warehouse systems. The cold data can be moved out from the cluster into a cost-effective storage solution. After the classification of data that is ingested into a Kafka topic based on the requirements of the downstream application, we can designate data tiers as hot tiers for hot data and cold tiers for cold data in the Kafka cluster. High-performance storage options like NVMe (Non-Volatile Memory Express) or SSDs (Solid State Drives) can be leveraged for the hot data tier, as quick retrieval of data is desired. Similarly, scalable cloud storage services like Amazon S3 can be used for the cold tier. Historical and less frequently accessed data that is identified as cold data is ideal for the cold tier. Of course, the volume of data being ingested into the Kafka topic, as well as the retention period, are also deciding factors for selecting cloud storage. Basic Execution Procedure at Kafka’s Topic Hot Data Tier As mentioned above, SSD or NVMe is for the hot data tier and scalable cloud storage for the cold data tier; the same can be configured in Kafka’s server.properties file. Topic configurations have a default setting mentioned in the server.properties file, with an option to override it on a per-topic basis. If no specific value is provided for a topic, the parameters mentioned in the server.properties file will be used. However, using the --config option, we can override the configuration of a created topic in the server.properties file. In this scenario, we want the created topic should store the hot tier data in a directory where the location should be on a storage device that offers high-speed access, such as SSDs or NVMe devices. As a first step, we should disable the automatic topic creation in the server.properties file. By default, Kafka automatically creates topics if they do not exist. However, in a tiered storage scenario, it may be preferable to disable automatic topic creation to maintain greater control over topic configurations. We need to add the following key-value pair in server.properties file. #Disable Automatic Topic Creation auto.create.topics.enable=false In the second step, update the log.dirs property with a location to a storage device that offers high-speed access. log.dirs=/path/to/SSD or / NVMe devices for hot tier Eventually, point to the created topic for the hot data tier using the --config option in the server.properties file. topic.config.my_topic_for_hot_tier= log.dirs=/path/to/SSD or NVMe devices for hot tier We might need to tweak other key-value pairs in the server.properties file for the hot tier depending on our unique use case and requirements such as log.retention.hours, default.replication.factor, and log.segment.bytes. Cold Data Tier As said, scalable cloud storage services like Amazon S3 can be used for the cold tier. There are two options to configure the cold tier in Kafka. One is using Confluent’s built-in Amazon S3 Sink connector and the other one is configuring Amazon S3 bucket in Kafka’s server.properties file. The Amazon S3 Sink connector exports data from Apache Kafka® topics to S3 objects in either Avro, JSON, or Bytes formats. It periodically polls data from Kafka and in turn, uploads it to S3. After consuming records from the designated topics and organizing them into various partitions, the Amazon S3 Sink connector sends batches of records from each partition to a file, which is subsequently uploaded to the S3 bucket. We can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file and must install the connector on every machine on the cluster where Connect will run. Besides the above, we could configure in Kafka’s server.properties file and create a topic for the cold data tier that leverages the S3 bucket using the following steps: Update the log.dirs property with a location to a S3 storage location. We need to make sure that all necessary AWS credentials and permissions are set up for Kafka to write to the specified S3 bucket. log.dirs=/path/to/S3 bucket We can create a topic that will use the cold tier (S3) using the built-in script Kafka-topics.sh. Here we need to set the log.dirs configuration for that specific topic to point to the S3 path. bin/kafka-topics.sh --create --topic our_s3_cold_topic --partitions 5 --replication-factor 3 --config log.dirs=s3://our-s3-bucket/path/to/cold/tier --bootstrap-server <<IP address of broker>>:9092 According to our requirements and characteristics of S3 storage, we could adjust the Kafka configurations specific to the cold tier like modifying the value of log.retention.hours in server.properties. Final Note As a final note, by partitioning the hot and cold data tiers in the Apache Kafka Cluster, we can optimize storage resources based on data characteristics. Scalability and cost-effectiveness of storage become critical as more and more enterprises have started adopting real-time data streaming for their business growth. They can achieve optimal performance and effective cost management of storage by implementing high-performance and cost-effective storage tiers wisely. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable. Thank you for reading this tutorial.
In this article, I am going to show you how to build Message Publisher using Apache Kafka and Spring Boot. First, we will talk about what Apache Kafka is. Apache Kafka is an open-source, distributed streaming platform designed for real-time event processing. It provides a reliable, scalable, and fault-tolerant way to handle large volumes of data streams. Kafka allows you to publish and subscribe to data topics, making it ideal for building event-driven applications, log aggregation, and data pipelines. Prerequisites Apache KafkaJavaApache MavenAny IDE (Intellij or STS or Eclipse) Project Structure In this project, we will expose an endpoint to create a user and we will publish UserCreatedEvent to Kafka Topic. application.yml file YAML spring: application: name: message-publisher kafka: producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer app: topic_name: users-topic server: port: 8089 spring.application.name is used to define the application name.bootstrap-servers specifies the hostname and port number of Kafka. Serializer specifies which serializer needs to be used to convert Java object to bytes before sending it to Kafka. Based on key type we can use StringSerializer or IntegerSerializer. (Example: org.apache.kafka.common.serialization.StringSerializer) key-serializer is used in a scenario when the same keys should go to the same partition.value-serializer specifies which serializer needs to be used to convert Java objects to bytes before sending Kafka. If we are using a custom java class as value, then we can use JSONSerializer as value-serializer. pom.xml XML <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.3.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.lights5.com</groupId> <artifactId>message-publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>message-publisher</name> <description>Demo project for Kafka Producer using Spring Boot</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project> spring web, spring kafka are required dependencies. ApplicationConfiguration class Java package com.lights5.com.message.publisher; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Getter @Setter @Configuration @ConfigurationProperties(prefix = "app") public class AppConfig { private String topicName; } This class is used to bind configuration values from application.yml file to the respective fields. Application class Java package com.lights5.com.message.publisher; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.TopicBuilder; @SpringBootApplication @RequiredArgsConstructor public class Application { private final AppConfig appConfig; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean NewTopic usersTopic() { return TopicBuilder.name(appConfig.getTopicName()) .partitions(3) .replicas(2) .build(); } } NewTopic Bean is used to create a topic if the topic doesn’t exist already on the Kafka broker. We can configure the required number of partitions and replicas as we need. Model Classes User class Java package com.lights5.com.message.publisher; import java.time.LocalDateTime; record User ( String firstName, String lastName, String email, Long phoneNumber, Address address, LocalDateTime createdAt) { record Address ( String city, String country, String zipcode) { } } EventType enum Java package com.lights5.com.message.publisher; enum EventType { USER_CREATED_EVENT; } EventPayload class Java package com.lights5.com.message.publisher; record EventPayload ( EventType eventType, String payload) { } Endpoint to Create User (UserController class) Java package com.lights5.com.message.publisher; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import static com.lights5.com.message.publisher.EventType.USER_CREATED_EVENT; @RestController @RequiredArgsConstructor @RequestMapping("/v1/users") class UsersController { private final UsersService usersService; @PostMapping @ResponseStatus(HttpStatus.CREATED) public void createUser(@RequestBody User user) { usersService.publishMessage(user, USER_CREATED_EVENT); } } UsersController class exposes the POST method to create a user, which in turn calls a method in the UsersService class. UsersService class Java package com.lights5.com.message.publisher; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Slf4j @Service @RequiredArgsConstructor class UsersService { private final AppConfig appConfig; private final ObjectMapper objectMapper; private final KafkaTemplate<String, EventPayload> kafkaTemplate; public void publishMessage(User user, EventType eventType) { try { var userCreatedEventPayload = objectMapper.writeValueAsString(user); var eventPayload = new EventPayload(eventType, userCreatedEventPayload); kafkaTemplate.send(appConfig.getTopicName(), eventPayload); } catch (JsonProcessingException ex) { log.error("Exception occurred in processing JSON {}", ex.getMessage()); } } } KafkaTemplate is used to send messages to Kafka. Spring Boot autoconfigures KafkaTemplate and injects to the required class. KafkaTemplate<K, V> is of this form. Here K is the key type and V is the value type. In our case key is String type and V is EventPayload class type. So we need to use StringSerializer for the key and JsonSerializer (EventPayload is the custom Java class type) for values. kafkaTemplate.send() method takes topicName as 1st parameter and data to be published as 2nd argument. Running Kafka in Local To run this application locally, first, we need to run Kafka locally and then start the Spring Boot application. Please use this docker-compose file to run Kafka locally. YAML version: '2.1' services: zoo1: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888 kafka1: image: confluentinc/cp-kafka:7.3.2 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 5 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zoo1 kafka2: image: confluentinc/cp-kafka:7.3.2 hostname: kafka2 container_name: kafka2 ports: - "9093:9093" - "29093:29093" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 6 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zoo1 kafka3: image: confluentinc/cp-kafka:7.3.2 hostname: kafka3 container_name: kafka3 ports: - "9094:9094" - "29094:29094" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 7 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" depends_on: - zoo1 docker-compose -f up . Run this command in the directory where the compose file is located. The above command starts the Kafka locally. Testing Using Postman Endpoint: (POST method) Payload JSON { "firstName": "John", "lastName": "Albert", "email": "johnalbert@gmail.com", "phoneNumber": "9999999999", "address": { "city": "NewYork", "country": "USA", "zipcode": "111111" }, "createdAt": "2024-06-06T16:46:00" } You can verify using kafka-console-consumer command whether the data is published or not. Source Code. Conclusion Spring Boot provides easy integration with Kafka and helps us create pub sub-model applications easily with minimal configurations. We can develop Microservices event-driven applications easily with Spring Boot and Kafka.
Have you ever wondered how some of your favorite apps handle real-time updates? Live sports scores, stock market tickers, or even social media notifications — all rely on event-driven architecture (EDA) to process data instantly. EDA is like having a conversation where every new piece of information triggers an immediate response. It’s what makes an application more interactive and responsive. In this walkthrough, we'll guide you through building a simple event-driven application using Apache Kafka on Heroku. We'll cover: Setting up a Kafka cluster on HerokuBuilding a Node.js application that produces and consumes eventsDeploying your application to Heroku Apache Kafka is a powerful tool for building EDA systems. It's an open-source platform designed for handling real-time data feeds. Apache Kafka on Heroku is a Heroku add-on that provides Kafka as a service. Heroku makes it pretty easy to deploy and manage applications, and I’ve been using it more in my projects recently. Combining Kafka with Heroku simplifies the setup process when you want to run an event-driven application. By the end of this guide, you'll have a running application that demonstrates the power of EDA with Apache Kafka on Heroku. Let’s get started! Getting Started Before we dive into the code, let's quickly review some core concepts. Once you understand these, following along will be easier. Events are pieces of data that signify some occurrence in the system, like a temperature reading from a sensor.Topics are categories or channels where events are published. Think of them as the subjects you subscribe to in a newsletter.Producers are the entities that create and send events to topics. In our demo EDA application, our producers will be a set of weather sensors.Consumers are the entities that read and process events from topics. Our application will have a consumer that listens for weather data events and logs them. Introduction to Our Application We'll build a Node.js application using the KafkaJS library. Here's a quick overview of how our application will work: Our weather sensors (the producers) will periodically generate data — such as temperature, humidity, and barometric pressure — and send these events to Apache Kafka. For demo purposes, the data will be randomly generated.We'll have a consumer listening to the topics. When a new event is received, it will write the data to a log.We'll deploy the entire setup to Heroku and use Heroku logs to monitor the events as they occur. Prerequisites Before we start, make sure you have the following: A Heroku account: If you don't have one, sign up at Heroku.Heroku CLI: Download and install the Heroku CLI.Node.js installed on your local machine for development. On my machine, I’m using Node (v.20.9.0) and npm (10.4.0). The codebase for this entire project is available in this GitHub repository. Feel free to clone the code and follow along throughout this post. Now that we’ve covered the basics, let’s set up our Kafka cluster on Heroku and start building. Setting up a Kafka Cluster on Heroku Let’s get everything set up on Heroku. It’s a pretty quick and easy process. Step 1: Log in via the Heroku CLI Shell ~/project$ heroku login Step 2: Create a Heroku App Shell ~/project$ heroku create weather-eda (I’ve named my Heroku app weather-eda, but you can choose a unique name for your app.) Step 3: Add the Apache Kafka on the Heroku Add-On Shell ~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation You can find more information about Apache Kafka on Heroku add-on here. For our demo, I’m adding the Basic 0 tier of the add-on. The cost of the add-on is $0.139/hour. As I went through building this demo application, I used the add-on for less than an hour, and then I spun it down. It takes a few minutes for Heroku to get Kafka spun up and ready for you. Pretty soon, this is what you’ll see: Shell ~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created Step 4: Get Kafka Credentials and Configurations With our Kafka cluster spun up, we will need to get credentials and other configurations. Heroku creates several config vars for our application, populating them with information from the Kafka cluster that was just created. We can see all of these config vars by running the following: Shell ~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096 As you can see, we have several config variables. We’ll want a file in our project root folder called .env with all of these config var values. To do this, we simply run the following command: Shell ~/project$ heroku config --shell > .env Our .env file looks like this: Shell KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096" Also, we make sure to add .env to our .gitignore file. We wouldn’t want to commit this sensitive data to our repository. Step 5: Install the Kafka Plugin Into the Heroku CLI The Heroku CLI doesn’t come with Kafka-related commands right out of the box. Since we’re using Kafka, we’ll need to install the CLI plugin. Shell ~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0 Now, we can manage our Kafka cluster from the CLI. Shell ~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40) Step 6: Test Out Interacting With the Cluster Just as a sanity check, let’s play around with our Kafka cluster. We start by creating a topic. Shell ~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet Within a minute or so, our topic becomes available. Shell ~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours Next, in this terminal window, we’ll act as a consumer, listening to this topic by tailing it. Shell ~/project$ heroku kafka:topics:tail test-topic-01 From here, the terminal simply waits for any events published on the topic. In a separate terminal window, we’ll act as a producer, and we’ll publish some messages on the topic. Shell ~/project$ heroku kafka:topics:write test-topic-01 "hello world!" Back in our consumer’s terminal window, this is what we see: Shell ~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world! Excellent! We have successfully produced and consumed an event to a topic in our Kafka cluster. We’re ready to move on to our Node.js application. Let’s destroy this test topic to keep our playground tidy. Shell ~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40). Step 7: Prepare Kafka for Our Application To prepare for our application to use Kafka, we will need to create two things: a topic and a consumer group. Let’s create the topic that our application will use. Shell ~/project$ heroku kafka:topics:create weather-data Next, we’ll create the consumer group that our application’s consumer will be a part of: Shell ~/project$ heroku kafka:consumer-groups:create weather-consumers We’re ready to build our Node.js application! Build the Application Let’s initialize a new project and install our dependencies. Shell ~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty Our project will have two processes running: consumer.js, which is subscribed to the topic and logs any events that are published.producer.js, which will publish some randomized weather data on the topic every few seconds. Both of these processes will need to use KafkaJS to connect to our Kafka cluster, so we will modularize our code to make it reusable. Working With the Kafka Client In the project src folder, we create a file called kafka.js. It looks like this: JavaScript const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP }; In this file, we start by creating a new Kafka client. This requires URLs for the Kafka brokers, which we are able to parse from the KAFKA_URL variable in our .env file (which originally came from calling heroku config). To authenticate the connection attempt, we need to provide KAFKA_TRUSTED_CERT, KAFKA_CLIENT_CERT_KEY, and KAFKA_CLIENT_CERT. Then, from our Kafka client, we create a producer and a consumer, making sure to subscribe our consumer to the weather-data topic. Clarification on the Kafka Prefix Notice in kafka.js that we prepend KAFKA_PREFIX to our topic and consumer group name. We’re using the Basic 0 plan for Apache Kafka on Heroku, which is a multi-tenant Kafka plan. This means we work with a KAFKA_PREFIX. Even though we named our topic weather-data and our consumer group weather-consumers, their actual names in our multi-tenant Kafka cluster must have the KAFKA_PREFIX prepended to them (to ensure they are unique). So, technically, for our demo, the actual topic name is columbia-68051.weather-data, not weather-data. (Likewise for the consumer group name.) The Producer Process Now, let’s create our background process which will act as our weather sensor producers. In our project root folder, we have a file called producer.js. It looks like this: JavaScript require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })() A lot of the code in the file has to do with generating random values. I’ll highlight the important parts: We’ll simulate having five different weather sensors. Their names are found in SENSORS.A sensor will emit (publish) a value for one of three possible readings: temperature, humidity, or barometric_pressure. The getRandomReading object has a function for each of these readings, to generate a reasonable corresponding value.The entire process runs as an async function with an infinite while loop. Within the while loop, we: Choose a sensor at random.Choose a reading at random.Generate a random value for that reading.Call producer.send to publish this data to the topic. The sensor serves as the key for the event, while the reading and value will form the event message.Then, we wait for up to 20 seconds before our next iteration of the loop. The Consumer Process The background process in consumer.js is considerably simpler. JavaScript require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })() Our consumer is already subscribed to the weather-data topic. We call consumer.run, and then we set up a handler for eachMessage. Whenever Kafka notifies the consumer of a message, it logs the message. That’s all there is to it. Processes and the Procfile In the package.json file, we need to add a few scripts which start up our producer and consumer background processes. The file should now include the following: JSON ... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ... The important ones are start:consumer and start:producer. But we keep start in our file (even though it doesn’t do anything meaningful) because the Heroku builder expects it to be there. Next, we create a Procfile which will tell Heroku how to start up the various workers we need for our Heroku app. In the root folder of our project, the Procfile should look like this: Shell consumer_worker: npm run start:consumer producer_worker: npm run start:producer Pretty simple, right? We’ll have a background process worker called consumer_worker, and another called producer_worker. You’ll notice that we don’t have a web worker, which is what you would typically see in Procfile for a web application. For our Heroku app, we just need the two background workers. We don’t need web. Deploy and Test the Application With that, all of our code is set. We’ve committed all of our code to the repo, and we’re ready to deploy. Shell ~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done After we’ve deployed, we want to make sure that we scale our dynos properly. We don’t need a dyno for a web process, but we’ll need one for both consumer_worker and producer_worker. We run the following command to set these processes based on our needs. Shell ~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco Now, everything should be up and running. Behind the scenes, our producer_worker should connect to the Kafka cluster and then begin publishing weather sensor data every few seconds. Then, our consumer_worker should connect to the Kafka cluster and log any messages that it receives from the topic that it is subscribed to. To see what our consumer_worker is doing, we can look in our Heroku logs. Shell ~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55} It works! We know that our producer is periodically publishing messages to Kafka because our consumer is receiving them and then logging them. Of course, in a larger EDA app, every sensor is a producer. They might publish on multiple topics for various purposes, or they might all publish on the same topic. And your consumer can be subscribed to multiple topics. Also, in our demo app, our consumers simply emitted a lot on eachMessage; but in an EDA application, a consumer might respond by calling a third-party API, sending an SMS notification, or querying a database. Now that you have a basic understanding of events, topics, producers, and consumers, and you know how to work with Kafka, you can start to design and build your own EDA applications to satisfy more complex business use cases. Conclusion EDA is pretty powerful — you can decouple your systems while enjoying key features like easy scalability and real-time data processing. For EDA, Kafka is a key tool that helps you handle high-throughput data streams with ease. Using Apache Kafka on Heroku helps you get started quickly. Since it’s a managed service, you don’t need to worry about the complex parts of Kafka cluster management. You can just focus on building your apps. From here, it’s time for you to experiment and prototype. Identify which use cases fit well with EDA. Dive in, test it out on Heroku, and build something amazing. Happy coding!
Miguel Garcia
Sr Engineering Director,
Factorial
Gautam Goswami
Founder,
DataView