11: Stream Processing

Stream Processing #


Stream processing is about handling data that becomes incrementally available over time. The data points, or events, are processed as they happen rather than in fixed time slices.

An event is something that happened at a particular point in time. It is generated by a producer, or publisher, and can be processed by multiple consumers, or subscribers. Related events are usually grouped together into a topic or a stream.

Messaging Systems #

Messaging systems are used to notify consumers about new events. They allow a producer to send a message containing the event, which is then pushed to consumers.

A publish/subscribe model allows multiple producers and/or consumers. In a publish/subscribe model, messaging systems can be differentiated by how they handle the following scenarios:

  1. Backpressure Handling: This refers to how the system responds when consumers can’t process messages as fast as they are produced. Systems can choose to:

    • Drop the message if the consumer cannot keep up with the producer.
    • Buffer the messages for later delivery when the consumer is able to process them.
    • Apply backpressure which slows down the producer when consumers can’t keep up.
  2. Fault Tolerance: This refers to how the system behaves when nodes crash or go temporarily offline, and whether any messages are lost as a result. To prevent message loss, systems need to be durable, which can be achieved through:

    • Disk Writes: Storing messages on disk until they are successfully delivered.
    • Replication: Duplicating messages across multiple nodes to prevent loss in case of a single node failure.

Direct Messaging from Producers to Consumers #

Direct messaging systems require application code to handle possible message loss. They can only tolerate limited faults and typically assume that producers and consumers are constantly online. If a consumer is offline, it may miss messages, and if a producer crashes, it may lose buffered messages.


Message brokers #

An alternative to the direct messaging systems is to send messages via a message broker (or message queue), which is a kind of database that is optimised for handling message streams. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.

  • Role: Producers write messages to brokers; consumers receive them by reading from brokers.
  • Durability: Some brokers only buffer messages in memory, others store them on disk to prevent data loss during a broker crash.
  • Asynchronicity: Consumers are typically asynchronous; the producer waits only for broker’s confirmation of message buffering, not its processing by consumers.
  • Commit Protocols: Some brokers support two-phase commit protocols like XA and JTA.
  • Usage Limitations: They are not suitable for long-term storage, assume a small working set, and might degrade throughput with many buffered messages.
  • Topic Subscriptions: They often support subscribing to topics matching certain patterns.
  • Query Limitations: They don’t support arbitrary queries but notify clients when data changes.

Message brokers are implemented in standards like JMS and AMQP and in products such as RabbitMQ, ActiveMQ, HornetQ, Qpid, TIBCO Enterprise Message Service, IBM MQ, Azure Service Bus, and Google Cloud Pub/Sub.

When multiple consumers read messages from the same topic, two main patterns are used:

  • Load balancing: Each message is delivered to one of the consumers.
  • Fan-out: Each message is delivered to all of the consumers.

Brokers ensure message durability through acknowledgements. A client must explicitly tell the broker when it has finished processing a message, allowing it to be removed from the queue.


Log-based Message Brokers #

Log-based message brokers, merge the durable storage of databases with the low-latency capabilities of messaging systems.

A log is a simple, append-only sequence of records. In this model:

  1. Producers send messages by appending them to the end of the log.
  2. Consumers receive messages by reading the log sequentially. If they reach the end, they wait for new messages to be appended.

For improved scalability and fault-tolerance, the log is partitioned (divided among multiple servers) and replicated (copied to multiple servers). A topic can be defined as a group of partitions, all carrying the same type of message.


Offsets #

  • Offsets: Messages in each partition have an increasing sequence number (offset). This tracks which messages have been processed.
  • Consumer Failures: If a consumer fails, another starts processing from the last recorded offset. Messages may be reprocessed if offsets weren’t updated.
  • Consumer Control: Consumers control the offset, enabling replay of old messages.
  • Non-Destructive Processing: Unlike JMS/AMQP brokers where message processing leads to deletion, in log-based brokers, it’s akin to file reading, leaving messages intact.

Use Cases

Log-based brokers like Apache Kafka, Amazon Kinesis Streams, and Twitter's DistributedLog are ideal for high throughput, fast processing, and order-sensitive scenarios. Traditional JMS/AMQP brokers may be better for complex, per-message processing.


Databases and Streams #

Stream processing leverages a replication log, a stream of database write events, to mirror the database’s state across different instances. An emerging technique, Change Data Capture (CDC), is gaining popularity.

Change Data Capture (CDC): The process of observing and extracting all data changes written to a database for replication to other systems (e.g., syncing changes to a search index).

Systems that store varying data views are referred to as Derived Data Systems. They remain updated through CDC mechanisms, ensuring they consistently reflect the changes in the original database (you can say that the data warehouse is just another view of the log).

There are various ways to implement CDC. One common method is using database triggers. However, this can be fragile and induce performance overheads. A more robust alternative is parsing the replication log. CDC is used in tools like:

  • LinkedIn’s Databus
  • Facebook’s Wormhole
  • Yahoo!’s Sherpa
  • Bottled Water (for PostgreSQL)
  • Maxwell and Debezium (for MySQL)
  • Mongoriver (for MongoDB)
  • GoldenGate (for Oracle)

Managing CDC involves log truncation. Storing all changes indefinitely would require excessive disk space and lengthy replays. Hence, the log needs to be periodically truncated. To do this, start with a consistent snapshot of the database that corresponds to a known position in the change log.

Kafka Connect facilitates the integration of CDC with Kafka. It seamlessly ties CDC tools with Kafka for a broad range of database systems.


Event Sourcing #

Event sourcing is a practice of storing all changes to the application state as a log of change events. It’s applied at a different level of abstraction, making it easier to evolve applications over time, assist debugging, and protect against application bugs.

In event sourcing:

  • An initial user request is a command, which may fail.
  • Once validated, the command becomes an event, which is durable and immutable.
  • State changes are the result of mutating events over time.
  • Immutable events provide more information than the current state.
  • Various views can be derived from the same event log.

However, there are challenges with event sourcing and change data capture:

  • Event log consumers are typically asynchronous.
  • Limits on immutable event history depend on the dataset churn.
  • Sometimes, data deletion or history rewriting might be necessary.

Streams #

State, Streams, and Immutability #

Event changes in a database can be thought of as an immutable history, with the database contents serving as a cache of the latest values in the log.

  • Rich History & Debugging: Immutable events, as seen in accounting’s append-only ledger, provide a richer history for analytics and aid in debugging.
  • Multiple Views: By explicitly translating event log entries to the database, it’s possible to derive multiple views from the same event log. Separating data writing and reading offers significant flexibility.
  • Concurrency Control: While a challenge of CDC and event sourcing is the potential staleness of read after write due to asynchronous updates, well-designed, self-contained events can eliminate the need for multi-object transactions.
  • Workloads: Handling workloads with frequent updates and deletes can be challenging. The performance of fragmentation, compaction, and garbage collection is crucial.
  • Deletion Needs: Certain circumstances (e.g., privacy rules around account closure) necessitate data deletion, posing a limitation to immutability.

Processing Streams #

Stream processing can:

  1. Store Data: Write data from the stream into databases, caches, or search indices for querying.
  2. Send Notifications: Push notifications like email alerts, push notifications, or real-time dashboard updates to users.
  3. Generate Derived Streams: Process input streams to create new, derived streams.

Advanced techniques like Complex Event Processing (CEP) can detect patterns in event streams, triggering a complex event when a match is found. Frameworks like Apache Storm, Spark Streaming, Flink, and Kafka Streams focus more on aggregation and statistical metrics.


Stream Processing and Time #

  • Processing time vs Event time: Processing time uses the system clock of the processing machine, while event time uses the time the event actually occurred. Confusing the two can lead to inaccurate data.
  • Late Arriving Data (Stragglers): Stragglers are events that arrive after a window has been declared complete. Handling stragglers can be done by either ignoring them or publishing a correction that includes the straggler data.
  • Time Correction: To account for incorrect device clocks, log three timestamps - the event time (device clock), the event send time (device clock), and the event receive time (server clock). Use these to estimate the offset and correct the event time.

Window Types

  • Tumbling Window: Fixed-length, non-overlapping windows.
  • Hopping Window: Fixed-length, overlapping windows.
  • Sliding Window: Windows based on the event times, covers all events within an interval.
  • Session Window: Windows based on user activity, commonly used in analytics.

Stream Joins #

The continuous nature of streams can make joins challenging.

  • Stream-Stream Join (Window Join): Joins events from two different streams. A typical approach involves maintaining a state (like a hash index) to facilitate the join.

  • Stream-Table Join (Stream Enrichment): A stream is joined with a table. This often involves keeping a local copy of the table (updated using change data capture), such as a user table.

  • Table-Table Join (Materialized View Maintenance): A materialized view of a join between two tables is updated by streams. Changes in either stream can affect multiple rows from the other table in the join output.

  • Time Dependence of Joins: If the ordering between two streams is undetermined, joins can be nondeterministic. Using the history of a slowly changing dimension can solve determinism, but it restricts the possibility of log compaction.


Fault Tolerance #

Batch Processing: With batch processing frameworks, fault tolerance is straightforward. If a task in a MapReduce job fails, it can simply be restarted on another machine. Records can be processed multiple times, but the output behaves as if they were processed once, maintaining exactly-once semantics.
Stream Processing: Fault tolerance in stream processing is challenging due to its infinite nature. Solutions include breaking the stream into small blocks (micro-batching) or generating periodic checkpoints of states (checkpointing). However, once the output leaves the stream processor, it can’t be discarded.
  • Atomicity: To give an appearance of exactly-once processing, things either need to happen atomically or not happen at all. Distributed transactions and two-phase commits can be used to maintain consistency. Systems like Google Cloud Dataflow and VoltDB implement this approach.

  • Idempotence: An operation is idempotent if it can be performed multiple times and still have the same effect as if performed once. Non-idempotent operations can be made idempotent with some additional metadata, enabling exactly-once semantics with minimal overhead.

  • State Recovery: Any stream process that requires state must ensure that this state can be recovered after a failure. This can be achieved by keeping the state in a remote datastore and replicating it, or by keeping state local to the stream processor and replicating it periodically. Different systems adopt different approaches to replicate state.

comments powered by Disqus