LongCut logo

Flink Demystified | Distributed Systems Deep Dives With Ex-Google SWE

By Jordan has no life

Summary

Topics Covered

  • Flink Delivers Exactly-Once via Checkpoints
  • RocksDB Scales Flink State to Disk
  • KeyBy Enables Deterministic Stream Partitioning
  • Collocated Kafka Joins Minimize Flink Shuffles
  • Kafka Transactions Seal Exactly-Once Outputs

Full Transcript

Hello everyone and welcome back to the channel. It is 1:00 in the morning on a

channel. It is 1:00 in the morning on a Thursday night. Uh the reason I am

Thursday night. Uh the reason I am filming is because I am going to the Indie 500 tomorrow, which as we know is one of the classiest events in all of the world in terms of sporting. Uh so

you could call me Sleepy Jordan, uh cousin of Sleepy Joe, who uh hopefully uh is doing okay with his health issues.

Uh today, after many many years of glazing flank, uh albeit talking about it at a pretty high level, I'm going to at least make an attempt at describing its API a little bit more, albeit not

too low level, and uh give you guys an example of what it is that we're actually doing when we say we're going to use Flink in a systems design video.

So, I was going through some of the Flink docs this week, uh, partially for work, partially because, you know, I wanted to look into it a little bit more. And I think that I realized that

more. And I think that I realized that when I do typically cover Flink in one of these, you know, systems design interview question videos, I think I do so at a very high level, right? I'm

like, "All right, well, here's this Kafka stream. Here's this Kafka stream.

Kafka stream. Here's this Kafka stream.

Just use Flink, join them together, blah blah blah." And I'm not going to go

blah blah." And I'm not going to go insanely deep in this video in terms of how everything works there, but I think that given all of the videos that we made on the Kafka consumer groups, uh,

Kafka transactions, things like that this past week, I think the groundwork has kind of been laid to talk a little bit about how Flink can take advantage of some of these things in order to kind

of do some exactly once uh, semantics when it comes to using Kafka as both a source and a sync. Um, so we're going to go talk about that a little bit more and just a couple of options that you can configure when using Flink in order to,

you know, get it to do exactly what you want to do. So, Flink is a stateful consumer

do. So, Flink is a stateful consumer that is fault tolerant, right? So you

take in data from some sort of streaming input and then if you need to perform some amount of computations that rely on you know keeping data within the flank ecosystem it ensures that even if

there's a failure within Flink uh that you can still have that kind of exactly once fault tolerant processing if you want it right so let's imagine that you know we're taking in some data from

Kafka and then every single minute we're going to take the last 5 minutes of that data maybe based on a bunch of time stamps on the Kafka records themselves and And then every minute once we see those progress sufficiently such that

you know we're saying oh here's a new record from the new minute uh we're going to take the average of the last 5 minutes of data and we're going to output that to an output Kafka topic. So what Flink has to do in that

topic. So what Flink has to do in that case is it has to cache all of the events from the last 5 minutes. Right?

This is state. It's state because you know we're not just taking in a Kafka event you know doing some processing on it and putting it out. this stuff

actually has to live in Flink for a period of time. And if for whatever reason Flink were to go down, you know, maybe we can process those events again, right? We can read back from Kafka again

right? We can read back from Kafka again because that log is durable. But unless

our input offset or rather our committed offset is in exactly the right place, we're probably going to uh you know output some events twice. And since we need the last 5 minutes of data anyway, we have to make sure to you know read

back a little bit in the log to get the last 5 minutes again and you know reprocess those only once.

So the point is you know it's not trivial to get exactly one semantics in a streambased processing setup uh because things can fail and if they do any of the state that you cache is prone

to get lost and that's going to be problematic. So basically what flink

problematic. So basically what flink does is it is basically going to take the occasional checkpoint right so we've spoken about this adn nauseium on this channel but the idea is that this

internal state uh you know it's occasionally checkpointing it over here to S3 as well as things like you know your input offsets and you know what you've written to your Kafka output and

what that's going to allow us to do is basically ensure that we're processing messages exactly once. So, I'm not going to touch upon the checkpointing algorithm too much in this video because I've done it in multiple other videos

and I care a lot more about how you might use Flank as a highle user in this particular video. So, a couple things when it comes

video. So, a couple things when it comes to Flank. Um, one question that I get

to Flank. Um, one question that I get pretty often in systems design videos is, "Hey Jordan, you know, like you're really suggesting caching a lot of state in Flink. Can it actually handle all

in Flink. Can it actually handle all that?" And the answer to that basically

that?" And the answer to that basically at a high level is yes, right? like

state can be stored as you know a key value store in memory basically just using some sort of hashmap but if you need a little bit more space and you're willing to sacrifice some performance in order to do so you can use something

known as a rox db so a rox db you can think of it basically just as like an lsm tree plus ss tables meaning that you're effectively just using hard drive to build a key value store and so that's

going to allow us to get a lot more storage not only on an individual flink node but flink jobs can be parallelized across many different virtual ual machines or physical machines within a cluster. And so these things can be

cluster. And so these things can be parallelized and the unit of computation that is run within a flink job is known as a task. So across those machines that we're parallelizing this these jobs on,

you can have multiple different task slots and tasks can be assigned to those task slots. So typically every single

task slots. So typically every single Flink job is internally represented as a DAG or a directed asyclic graph. And

basically you know like a lot of the other you know kind of batch or stream processing uh technologies that we've seen on this channel all the operators within the DAG are kind of broken up in a way that each of them can be

considered as part of one task. So in

this case you know if you have operators that can be chained together. So look at this uh graph down here. If I have a map and then I have a filter, I know that I don't need to do any shuffle phase between those. I'm taking any of the

between those. I'm taking any of the records, I'm transforming them in my map, and then I'm filtering some of them out in my filter step. That being said, if some operator like a group by, you know, so maybe group by a certain key

value comes along, we're going to potentially have to do a shuffle because, you know, the records in this operator over here might be different than the one down here and we have to shuffle in order to get everything

together.

So basically this is what we're going to be doing in order to ensure that uh you know we're uh you know parallelizing our outputs uh sufficiently within Flink and again you can do those over multiple

different machines. So because you can

different machines. So because you can you know put data on disk and you can also parallelize it over multiple different computers uh flink jobs can basically scale as much as they need

to. Okay, let's go ahead and talk about

to. Okay, let's go ahead and talk about how tasks communicate with one another, right? Because we mentioned that, you

right? Because we mentioned that, you know, you've got these kind of chained together tasks right here where it's trivial to communicate with one another, but then you've also got this kind of uh shuffle phase stuff right here. So,

what's the mechanism through which that happens? Well, basically there are a few

happens? Well, basically there are a few different types of streams that can happen in Flink, you know, as Flink internal streams between operators. For

starters, you can have this concept of a global stream, which is just like one partition. Can't really scale it out.

partition. Can't really scale it out.

There is this concept of a non-partition stream. So every single task in Flink is

stream. So every single task in Flink is going to be taking in some input data.

But sometimes that input data could be kind of arbitrary, right? It could just be coming from like a random partition of Kafka or a few different partitions.

And it might not be be because you said uh this, you know, these particular records belong to this particular task.

It just happened to get those in. So the

main thing to note there is that that's nondeterministic, right? If we were to

nondeterministic, right? If we were to add a bunch more tasks to our Flink job, all of a sudden maybe the record location per task would actually change a lot. On the contrary, you can also do

a lot. On the contrary, you can also do deterministic partitioning within Flink and that is basically what's called a keyed stream. So if you have an input

keyed stream. So if you have an input source like Kafka, all of a sudden you can use this operator called key by and you know pass in a function that extracts a particular key from every

single record and then based on that you can create a bunch of different partitions of your input stream and use that so that you're processing every single partition individually. So

basically that is going to do one partition per unique key and then depending on the input data itself and how that's initially coming in that may invoke a shuffle phase. Right? So if

it's the case that every single task is already making sure that all the records to each task uh you know have the same set of keys then you're fine you don't need to shuffle but if it's the case

that you know keys can be in any individual task from the original input and then you do a key by again you might need to do a shuffle and then there's also this concept of broadcasting stream. So if I have one stream and I

stream. So if I have one stream and I want to send it to a bunch of other operators, maybe because you know the output of that stream is particularly small and every operator, you know, it makes it easier if it knows about all

the entries in that stream, you can kind of broadcast it. The concept is very similar to like a broadcast hash join, right? Where you're joining a really big

right? Where you're joining a really big data set with a really small one. So you

broadcast the really small one to all partitions of the really big one. Uh

because it's so small that uh you know it makes the join much more efficient.

So, it's kind of the same idea right here. Basically, you know, if you're

here. Basically, you know, if you're joining two different streams together and one of them has very few elements, you can broadcast the stream with very few elements all the way over to, you know, all partitions of the, you know,

more populous stream. Okay. So, now we're going to

stream. Okay. So, now we're going to talk about Flink and Kafka's interactions a little bit because that's where I tend to, you know, focus in uh the systems design videos that I create.

So basically we often take in flank source data from Kafka because Kafka is replayable right so that means if a flank node goes down and it needs to replay some messages to rebuild state it

can do so that's in contrast to certain memory based cues where you know you acknowledge a message and it flushes it out of memory so Kafka is kind of an ideal data source for Flink it's probably what a lot of people are using

the majority of the time now one thing to note that's important about Kafka is when you're partitioning out a Kafka topic basically what you do is you assign every single message a key. You

assign that topic a number of partitions. And then to determine for

partitions. And then to determine for the most part which partition a message belongs in, you do the key modulo the number of partitions or you'll probably take you know the hash of the key modulo

the number of partitions. And so that's how we determine which record uh basically which partition a Kafka record belongs to. Okay. So what we're going to

to. Okay. So what we're going to basically do is come up with a little example here and hopefully it's going to make a little bit more sense how Flink and Kafka interact with one another.

Basically what Flink is going to do in this case is take in input from two Kafka topics which themselves are partitioned. Uh Flink itself will also

partitioned. Uh Flink itself will also be parallelized or partitioned and then it's going to output the results of joining those two topics based on a common key. We also want to make sure

common key. We also want to make sure that we're getting exactly one semantics, right? So that every single

semantics, right? So that every single joined record is only produced once. uh

you know if it's unique and then also that we have sufficient space to actually cache all the data that we need to in order to perform this join. So

I'll explain that more right now. Basically let's imagine that we've

now. Basically let's imagine that we've got our Kafka partitions on the left here. So we've got two topics topic one

here. So we've got two topics topic one and topic two and each one has two partitions. Now because of the way that

partitions. Now because of the way that Kafka is uh you know doing its partitioning internally keep in mind that you know assuming that these two topics are partitioned the same way. So

they have the same number of partitions and they're using the join key as their message key. It would mean that any

message key. It would mean that any record in topic one that can be joined with any record in topic two should be in identical partitions. So I know that it's not possible for any records in

this guy to be joined with any records in this guy because of how we're using the join key and setting things up properly in Kafka. This is kind of the idea behind

Kafka. This is kind of the idea behind the colloccated join that we discussed a little bit in the consumer groups video.

Now, in addition, we've got two Flink tasks. We can just kind of configure

tasks. We can just kind of configure this ourselves and set the parallelism within Flink. It's worth noting that in the

Flink. It's worth noting that in the actual Flink Kafka source code itself, it's not super easy to go ahead and do a partition assignment like this where you explicitly do some sort of like range

partition assignment. You may have to

partition assignment. You may have to actually write a little bit of extra code yourself to make that happen. The

point is if we've got flink task one and task two and we want to get maximum collocation within these joins so that we're shuffling a minimum amount of records, we want to make it so that task one is reading these guys over here and

task two is reading these guys over here because we know that records in partition one are only going to be joining with records in partition one and records in partition two same thing.

Okay, so now once we're reading those in Flink we want to keep basically some cache state of both of those topics, right? Basically what we want is we want

right? Basically what we want is we want to map from the join key to the value for both topics. The reason we need that is because you know let's say one record comes in from this side. We're now going

to read this hashmap and basically find all records that have a key that's matching the key of this incoming value.

If we find any of them, we'll then spit them out over here to this output Kafka topic. All pretty simple. Now note that

topic. All pretty simple. Now note that because we're actually caching all of these things from these incoming partitions, this can get kind of expensive, right? It's a lot of things

expensive, right? It's a lot of things to potentially store. In theory, you have a couple of options, right? What

you can do is take these maps and basically, you know, gradually expire entries out of them over time if you need to reclaim space in memory. Or if

we want more space than that, perhaps what we can do instead is store them on Rox DB and use the actual hard drive on each Flink node in order to ensure that we have enough space. So, for example, this is a pretty common pattern that I

use when I do things like in the, you know, Facebook or Twitter following problem, right? You've got one post

problem, right? You've got one post coming in and then based on that post that's getting sent to Flink and then based on the author of that post we're trying to join it with all the people who are interested in reading that post

because that's going to be a lot of storage. It's probably better to use

storage. It's probably better to use something like Rox DB there especially because it's not particularly latency sensitive. Okay. Now the other part of

sensitive. Okay. Now the other part of this uh pattern that's pretty important is the exactly one semantics part. Now,

we know that Flink is basically doing checkpointing on some sort of interval, maybe every 30 seconds, maybe every 10 minutes. It doesn't really matter. Uh,

minutes. It doesn't really matter. Uh,

you know, there are trade-offs there between basically performance. So, if

you're checkpointing super often, your performance isn't going to be great. If

you're checkpointing infrequently, your performance is going to be better, but if there's a failure, it's going to take longer for you to start back up. So,

basically, this Flink task is doing checkpointing maybe to S3 over here. And

when it's doing that, it's checkpointing the following things. Basically, we're

checkpointing the offset of all the input topics, the offset of all the output topics that we're writing to, and also the internal state that we have, right? We need to be able to serialize

right? We need to be able to serialize that as a bite array on disk. The reason

that we do this is so that if we were to fail, you know, we can basically say, okay, I know that whatever this state is right here, uh, you know, this topic map state corresponds to these two offsets

right here, uh, for the input topics. So

then we can start reading from those input topic offsets. Again, the reason that we want an output topic offset is because we're probably going to be using something like Kafka transactions. The reason we want Kafka

transactions. The reason we want Kafka transactions is because we want to constantly be writing records to Kafka, but we don't want anyone to be able to actually see them until we commit these

records. Now having something like the

records. Now having something like the output offset of uh your output Kafka topic in the checkpoint is good because maybe what'll happen is you'll be able to checkpoint but then you'll never actually be able to commit records to

Kafka. So when you start up from this uh

Kafka. So when you start up from this uh checkpoint again you'll see this offset right here and then you'll go back to Kafka and you'll commit the records uh

up to and including that offset. Now,

the nice thing about Kafka transactions is that you can write a bunch of records, but no one downstream is able to see them until you actually commit them. And you only commit them once

them. And you only commit them once you've checkpointed state. This means

that any downstream consumers like this guy over here are not able to read the records as long as they're configured properly in readcommitted isolation mode. If that's the case, then we do in

mode. If that's the case, then we do in fact actually get exactly one semantics, which is really, really awesome.

Well guys, I know I've spent a lot of time talking about Flink, but when typically I do, I feel like it's often covering the algorithm that they use to do kind of checkpointing with, you know, a bunch of barrier entries and all these

input cues and going through. And while

I think that's useful to understand and it helps everyone out, uh, ultimately what we really care about is the programming API. A lot of times you're

programming API. A lot of times you're probably not going to remember exactly how they do checkpointing, but the truth is that doesn't really matter. What's

more important for the systems design interview is understanding how Flink actually gets used within the context of an application and probably a few different settings or configurations that matter when it comes to actually

using it. Hope you guys have a great

using it. Hope you guys have a great day. Okay, I'm going to go ahead and

day. Okay, I'm going to go ahead and PTFO and I will see you in the next

Loading...

Loading video analysis...