Make your own event-sourced agent harness using stream processors — Jonas Templestein, Iterate
By AI Engineer
Summary
Topics Covered
- Everything That Happens Becomes an Event
- Every Agent Gets a URL
- Stream Processor: The One Abstraction
- Code Is an Event
- No Before Hooks: Proceed and Adapt
Full Transcript
Welcome to to the workshop. Uh I am Jonas. Um and this is Misha. We both
Jonas. Um and this is Misha. We both
work at it.
Hi.
And the um this is going to be uh chaos.
I think pure chaos first of all because we only we only decided uh last Monday to do this and then there was like um bunch of personal stuff and halftterm and the kids were sick and then this
morning we came here and I thought this was going to be like a like hour and 20 hackathon or something like that and then we came here and we're told you're going to do it without an audience because of some fire thing. Uh and now there's an audience again which is
amazing. So, we're going to do uh gonna
amazing. So, we're going to do uh gonna try and do this as like a improvised hackathon, but I've only just pushed the SDK literally one minute ago, and Misha is going to try and sort of like live
fix it. But I I hope it'll still be
fix it. But I I hope it'll still be worth your time because I think there's sort of some uh hopefully at least some interesting ideas or or maybe like one way to put it is I would like to find out whether this is like dumb or cool.
Uh and and the um and you can help me with that. So please also just like ask
with that. So please also just like ask clarifying questions because part of this the reason we said last week okay let's do this workshop is because these have been ideas that have been kind of noodling around over the last year but
we haven't really actually done anything with it's not really like a commercial value to it necessarily but I think um it's it's basically how I would like to build agent harnesses. So, you know,
I've been noodling with this uh uh for a while now and and you know, obviously you've got like Claude and you've got Pi now, but back in the day you didn't. And
so, you make your own coding harness and there's all these different ways to do it. And I think um the way I would like
it. And I think um the way I would like to do it is I would like to do it purely event sourced. Um and I say here aka
event sourced. Um and I say here aka debugable and that is because everything kind of skirts around of being event source like all these things without quite being event sourced. There's
always something that has a side effect that you later can't tell and then you can only see it in the hotel traces or something which doesn't really make sense to me because normally these these these agents the way they work and we'll see that in a second is they already
have a log of events, right? So you so if you just say everything that could possibly happen is in there. It'll be
really easy to debug. Um and as we'll see hopefully a little bit strange but potentially in other ways also really easy to to extend. Um, and then the second thing is I want I want this to be extensible and and we've seen this I
think with PI which is was just incredible um how valuable it is to make an agent extensible not just by humans but also extensible by itself. Um and um
one thing that I want in the extensibility is is I want I want it to be composable. like I don't think we'll
be composable. like I don't think we'll get there within one hour with all of you but but maybe afterwards if you say oh this is actually cool and not dumb we'll keep working on this and you can say oh I made this extension you made
that extension and you can combine them and um um because I think we don't really yet know what is the best recipe for for for agents basically for agent harnesses and it's been too hard to
experiment even in pi like everything has to run in a single thread in in in in like one process over here on this particular computer and so on and so forth so I think I think we we can maybe do a little bit better than that. Uh
then I think it should be on the edge publicly rootable. Um
publicly rootable. Um like the way I think of it is basically an intelligent entity like a digital one that's not a robot in the future. It's
just internet connected uh basically server program, right? Like it just speaks HTTP. So the slice broadly
speaks HTTP. So the slice broadly speaking that should be sufficient for almost anything. And then I would just
almost anything. And then I would just put these agents on the edge speaking to the internet straight away. H this is not very safe. None of what we're doing today is safe by the way. There's no
authentication you'll see. you'll all be able to see everybody else's event streams and agents and and whatever else. You should not put any secrets in
else. You should not put any secrets in the actual payloads of the events or rotate them afterwards, right? So, we
can um you know those things can be solved, but broadly speaking, the moment an agent exists, it should have a URL.
That's just like uh that's just my opinion because otherwise like you end up making all these things and then you tie yourself backwards inventing a connector concept just so you can get Slack messages in or or god knows what.
And that I think that should be really easy. um or like maybe like a human
easy. um or like maybe like a human should fill in a web form and the result of the web form should be input for the agent again or or something like that.
That's kind of like the same thing as as a Slack web hook. And then it should be distributed. And this is potentially uh
distributed. And this is potentially uh like a very double-edged sword as we'll see like right but but by distributed I mean you should be able to have uh an agent um running uh sort of on one
computer here on a server and my my plug-in running over here and your plug-in running over there and yours is written in Rust and mine is written in in in Typescript and it all just doesn't really matter. Um and um the reason that
really matter. Um and um the reason that is bad is because you will eventually get race conditions and loops, right?
Like sort of like two plugins kind of sending messages back and forth creating like an endless stream of events in your agent. Um but on the flip side, uh you
agent. Um but on the flip side, uh you have those issues normally anyway in in in most agent harnesses I've seen. So
it's quite good if you if you can kind of preempt that a little bit. Okay. So
let's um let's talk about this uh thing that we have invented for um oh my gosh um why is this this isn't the same as in
my file system. So I'll use the one in my file system. Um basically uh we have created this this uh service over last week called events.iterate.com.
It has a web UI um and uh we can um uh we you can use that. I think you'll certainly use it, but for now we'll just play with it with curl because I think it's it makes it in some sense more more
uh tractable. Um in the in the agents um
uh tractable. Um in the in the agents um in the events uh app we have this concept of agent paths. So just like a file in a file system every agent has a
path as a hierarchy and you just have here show this here you just have raw events. This is YAML formatted raw
events. This is YAML formatted raw events in each of these paths. And that
is basically the the the entire uh the entire primitive that we're going to be building our agent on. And so we can start by playing with this. We can say, okay, we we have a path prefix. If you
copy this from the repo that Misha's going to put it in the read me in a second.
I'll I'll add workshop. MD as well.
Yeah. Uh yeah. Okay. Uh oh yeah, maybe that's why. Um
that's why. Um I I added the readme but not the workshop.
Yeah. So uh basically um let's set ourselves a path prefix here. In my
case, that is just going to be uh Jonas Templestein or something. Um they start with forward slash by the way, which is a little bit um awkward potentially. Uh
but the but the service is very tolerant of you URL encoding your slashes or doing whatever you want. But I want I I wanted good curl economics. Uh but I also wanted them to start with a forward slash so you have like a well-
definfined route. Then for the base,
definfined route. Then for the base, it is now pushed to the repo. But have
you have you shared the repo?
Oh yeah, because now you could actually do this on your computer. Uh let's do that actually. If you go to um
that actually. If you go to um github.comiterate
github.comiterate there is an an ai-engineer-workshop um uh and in there you should be able to if
you click on this workshop.nd file
that's what we're now going through. So
you should be able to copy and paste the curls from there and again there's no authentication so you can just do this in uh in in fact why why don't a couple are hitting our live deployments.
Okay. So,
uh like this is entirely just sort of like a proof of concept of um uh how I think this should work. Um
we're basically what we're going to do is we're going to turn this simple event stream into a fullyfledged agent without like really deploying any server
software uh or or anything like that. Um
maybe uh yeah, I should be able to see if anybody if anybody has managed to do it. I'll wait until two or three people
it. I'll wait until two or three people have managed to successfully curl something. Um, and then I'll carry on
something. Um, and then I'll carry on because I think it'll be it'll be good if you if you if you curl uh it yourself. So, it's here in this AI
yourself. So, it's here in this AI engineer workshop workshop.mmd.
If two people have the same name, that's going to cause username, so it doesn't matter. Um, that's fine.
Uh, okay. There we go. Got a few people. Um,
okay. There we go. Got a few people. Um,
okay. So, that's pretty good. Um,
by the way, uh, you might see this here and there. There's also this thing down
and there. There's also this thing down here called project slug. If you really start stepping on each other's toes, you can just type what like use a different project slack and it's like a completely
new database, a new namespace. Um, you
can pass that in as an HTTP header. If
anybody needs that later, you can you can ask me. Uh, okay. So now we have a few people curling things. I will also for start curling things. So, uh, now I'm just going to say hello world, uh,
here to my stream and it'll come back and and and we straight away we notice like a few things. There's a basic event envelope
things. There's a basic event envelope shape. We have a type that I've just
shape. We have a type that I've just invented here. Events don't need to have
invented here. Events don't need to have anything but a type, but most of them also have a payload. Uh, then um the uh there is the stream path which came from the URL that we posted this to. And
there's an offset. This is just an auto incrementing integer that starts at one.
Uh and so there's basically like a point of serialization at the server that just increments them and you get a created created ad. Uh and then um here what we
created ad. Uh and then um here what we can do with curl which is quite nice. Uh
if you do dashn you can do uh streaming uh streaming SSE. So this here is literally nothing magical. This is
literally just you know Jonas templehello world. This is literally the curl that
world. This is literally the curl that we're doing right here. Um
and um there I can see the messages coming through. Very nice. And I can um I can
through. Very nice. And I can um I can add another message. And now um basically uh all all the messages will show up there. And there's also a um uh
oh, if you do question mark live equals true. This is inspired by the durable
true. This is inspired by the durable stream standard, but like ever so slightly different. But if you do live
slightly different. But if you do live equals true, the connection will stay open and it'll just keep sending you more events as you as you put them through. So this is sort of already you
through. So this is sort of already you know um like you could say this is an AI agent if Misha was simultaneously uh giving responses to my hello world uh this would already be an agent and then
there's my my AI has told me you can um if you're going to be doing that a lot uh you can pipe this uh through set get just the data elements and then pipe it through jcq dot and then you get a
little bit nicer formatting. So this is sort of like uh this is so trivial that you I actually do spend a lot of time in curl just curling it. Uh then what can you do? Um the thing is very tolerant
you do? Um the thing is very tolerant just in general of of stuff you post to it and this will become relevant if you wanted to say send web hooks from random third parties there. Like my theory is always that like the LLMs are good enough. They can basically just consume
enough. They can basically just consume raw web hooks from anywhere and and they'll roughly do the right thing. So
what the what specifically the server will do is if you send it something that doesn't um validate like hogwash here it says there's there's no type property.
So there's not actually a valid event.
Then you can see here what it has appended is basically an error. But it
because we're event sourcing the error is also just an event and it just says here okay um you have an invalid event appended um event. Oh, another thing uh
that we we will notice is that the type here uh that our server has created it starts with https uh events.iterate.com.
You don't have to do that. Uh but that is something that we like to do because that literally leads to the documentation for that event type. And
if it's going to be an opaque string, like why not make it make it something uh like that, but uh obviously you can you can just do you know single words or two words or whatever. Um but you will
see these URLs and they're just they're just types. Um then uh what else is
just types. Um then uh what else is interesting that we can do? I mean this is kind of boring but you know again if you're going to do web hooks you have can put an item potency key in which just means if you have the same item
potency key twice you don't get um you don't get the event appended twice. You
can pause a stream. I think in the interest of time we can skip that. You
can resume a stream. The reason the pausing is important and is we put this in even though we literally just made this a couple days ago is because of the infinite loops. like it's very very easy
infinite loops. like it's very very easy to accidentally put like you know thousands and thousands of events in a in an infinite loop in the thing and so if you put more than 100 events in a second or something it is just going to
circuit break and then the stream will be paused until somebody unpauses it. So
basically if you try to um here actually let's do that because again like for those of you who haven't seen event sourcing this might be a bit weird like how do you pause a stream? Well you
pause a stream by uh oops this is the wrong one. You pause a stream by
wrong one. You pause a stream by appending an event that uh says you're you're paused. The the only external
you're paused. The the only external interface to the agent is oh um now it's complaining. Why is it complaining?
complaining. Why is it complaining?
Ah because I think there needs to be a reason or something maybe.
Uh yeah, reason demo. Yeah, there you go. So now
reason demo. Yeah, there you go. So now
this thing is paused. So now if I um you know try to send it the hello world event from earlier, it's just going to barf and it say no no no more events allowed. And that is one like that is
allowed. And that is one like that is one thing that doesn't appear as an error in the stream because otherwise you would still get infinite numbers of error events. Um
error events. Um uh then I mean there's other weird and wonderful things you can do. I I don't even know if we really have to do this but uh there's like um I have made a
plug-in for this already uh that makes it so that if I put this JSON transformer configured event in what it'll do is it'll just observe certain events, rewrite them a little bit and make a new event out of it. But I think
that's a bit advanced so we'll we'll do it later. Um um it can also do weird
it later. Um um it can also do weird things like it can uh you can schedule you can schedule things like some agent SDKs you'll you'll know can do this but uh here you know if you want like an
like a heartbeat event every five seconds. Uh oh god now I need to resume
seconds. Uh oh god now I need to resume the stream because it's paused. Uh let's
see. stream resumed.
Okay, so now I am just creating uh this heartbeat event every 5 seconds which can be quite handy or you can schedule something for 10 minutes in the future or at a certain time and again it's just
an event that says do this thing in the future. Um
future. Um um I think I will skip over these but um you can cancel the schedule. You can
tell the stream by appending an event to send your server or some thirdparty server or your you know your Slack API endpoint or whatever an event every time an event gets added. Uh so you can
basically uh subscribe to the stream. Uh
here we're subscribing in our in our terminal. We're doing a pull
terminal. We're doing a pull subscription, right? We're doing we're
subscription, right? We're doing we're connecting to uh an HTTP server and we're getting we're pulling uh pulling uh we're the HTTP client and they're the HTTP server and we're getting an SSE response stream of of streaming events.
But uh it can also do the other way around where it will push you the stuff that gets appended or push you a filtered stream. Um uh which can be
filtered stream. Um uh which can be quite handy. Okay. So um let's maybe
quite handy. Okay. So um let's maybe stop there because I know also um it wasn't really clear from the session description how deeply technical this
was and so let's take any questions on what is even this thing that we're going to be using for for the next 45 but questions have to be submitted as an event to the questions stream
yeah exactly you have any trouble using that um does I assume lots of things don't make sense So please do ask them because I
think this will be extremely like random and strange so far.
What does it do?
Oh uh we we're like a we're like a hacker hobby club at this point to be quite honest like uh we we are uh we have not got a launch product. Um
but if we did it would be um I think based on this architecture. It's like uh anyway, I'll tell I can talk about it afterwards.
Okay. Okay. Okay.
But it's uh Yeah, it's more it's more homebrew hacker club than than like commercial enterprise at the minute, but it'll get there.
Yeah.
Yeah.
Basic questions that are on the market right now. Aren't
What's the architecture?
It's just um I think it's just the API like if you um have you tried to make uh you know like a like an open code extension or plug-in
or PI extension or claude claude extension and they all have like an API surface that is a little bit bigger than what you would have or what we're going to have in in a minute. It's not like by much, right? like okay like maybe the
much, right? like okay like maybe the streaming chunks for partial completions are a different kind of thing for some reason rather than just an event of type I've received you know like five letters of a streaming response to me that's a
perfectly valid event and and and the benefit then is that you can uh that can you I'm kind of like a one abstraction kind of guy right like this this should be like a like web standardsbased this should be there should only be one thing
which is an event and then as you'll see like the the thing um that was at the top of this file I think you can build a super expressive state of the art coding agent by just implementing uh like a
stream processor that reduces over the event stream and occasionally an X side effects. That's that's all it does,
effects. That's that's all it does, right? Like it just sort of sits there
right? Like it just sort of sits there and says, "Okay, I I'll show you that in a second." Like it's it's a bit weird.
a second." Like it's it's a bit weird.
Um I would say a weird way of programming.
Um so if I understand correctly, you're sort of saying take events that you get generated from an AI models and then you're basically going to pipe them through this stream processor.
Is that the idea?
The stream processor is or like the events iterator app is the storage like you can almost think of it as a database like one one um you know do you know convex is like a sort of you know they they kind of give you the illusion that
they're running Typescript in the database. You you could almost think of
database. You you could almost think of this this could be like a programmable stream or something, right? Because
actually one thing that you can also do, we'll we'll see later once you hack on your stream processors, you can also make an event that contains the source code of your stream processor and then it'll just run on every event. And and
so like it's basically it is like it is like the I think you can experiment with a huge array of different kinds of agent harness implementations
uh that can respond to real events on the internet and make real API requests without deploying your service or something like that. All you need is this sort of um streaming streaming
service that um uh has some of some of these um things like specifically it needs to be able to wake up you know like other other APIs and programs and
so on. I don't know if that Yeah. Um
so on. I don't know if that Yeah. Um
that does that answer your question at all or Yeah, I think so.
Okay. Shall we try and make uh now a um like a like a slightly more serious version of these curls? Um so does
anybody here not know Typescript?
I think so. I think the like one of the arguments in favor of this architecture that I also put in that um in that uh in the in the blurb I think is this is in principle polyglot right and there is a
uh like in any language there is actually at events.iterate.comidoccks
iterate.com APIdocs you have just I mean this is an exceedingly simple API right um you can you can make yourself there's a open API spec you can make yourself a client and
so on but for the purpose of this uh this um uh workshop here we'll just do a TypeScript version and we've shipped a client in a in a npm package already but I I I just wanted to point out like even though we're doing everything in
Typescript the whole point is that we wouldn't have to right but we just have to do one thing uh for this workshop Um, okay. So, let's uh let's do do something
okay. So, let's uh let's do do something pretty simple. Um,
pretty simple. Um, uh, is the AI engineer workshop um, package uh, now up to date and working?
I think it is, right?
I think so. Yeah. Uh, you might need to pull your repo because there was a client side type error in the iterate repo.
Uh, no, the in the AI engineer workshop.
Okay, that's fine.
It's in.ts.
Yeah, but uh, um, oh, maybe I will actually use that then. and then I'll be able to experience uh the same the same bugs if there are any. Yeah, that's
probably better. Okay. Um Okay. So, uh I think um if you wanted to just have a quick start, you can uh you can just pull uh clone the AI engineer workshop repo. Um there's already a teeny tiny
repo. Um there's already a teeny tiny example file in there and then we'll just add a couple more um as we go along.
And the workshop MD has runnable curl as well.
Nice. Yeah.
Um, if you wanted, you could put a couple like a couple of the follow on examples that are in the iterate repo that have actual processors. Um,
actual processors. Um, yeah, I can take I don't know what's going on here.
There we go.
My computer seems to have uh slowed to a crawl trying to open a cursor window.
This has been unfortunately like a somewhat regular occurrence recently.
Uh okay.
Okay.
Or Oh, no. I think this might be what you mentioned about the internet. I
think it's just struggling because it can't open a terminal because the my my terminal tries to connect with the internet.
Oh well then let's do not the internet, is it?
There we go. Okay.
Okay. So,
we have here um we have packaged up an SDK here to this AI engineer workshop um package that you should be able to just pmpp install or you just use uh use this
example here. And then basically uh what
example here. And then basically uh what what did we need in our curl, right?
Like we needed a base URL which is just events.iterate.com. That's where our
events.iterate.com. That's where our server is. uh we can use a path prefix.
server is. uh we can use a path prefix.
Um and um then we can uh run this function here called create events client that is from our SDK. And then
this is under the hood. This is
really don't know what's going on with this terminal. It's very strange.
this terminal. It's very strange.
Okay. Maybe um everything is just incredibly slow.
You want to try mine?
Um maybe, but there's nothing actually hogging the CPU. It's just I I do think it might be actually the network, but we'll see. Um
we'll see. Um so here so here you have um I don't think this has picked up the anyway. Okay. So this is basically what
anyway. Okay. So this is basically what we were doing just now right like we we say okay we're making ourselves a client uh and then um here there is a function called stream. This is basically the um
called stream. This is basically the um the the the right hand side of the of the terminal that we had earlier. And
then you say live true and give in the stream path. And we can say okay if if
stream path. And we can say okay if if uh if the event is not of type ping we will do nothing. But if the event is of type ping then we will I mean this is a completely unnecessary helper down
there. Uh then we can just do client uh
there. Uh then we can just do client uh append. I don't understand why the um
append. I don't understand why the um the the TypeScript language server isn't picking up any of the types for me. But
I can't tell if that's just cursor being weird. Again
weird. Again your own example.
Anyway um could quickly switch IDs.
Yeah, it's fine.
It is working for me. Yeah.
Okay. Yeah, I think this is unfortunately like a sort of cursor.
Okay. And then you should be able to uh just if you have a modern version of node, you should be able to just run um or maybe not. Did I make it?
It says invalid TypeScript syntax.
You know what? I'm just gonna I'm just going to restart cursor. I think that is worth it.
Is anyone uh able to run this example TS on their computer? There you go. I am.
Okay.
Um and are you able to send a ping to it and get a pong out of it? Okay. Very
cool.
Okay.
I Yeah, I wonder if we use use I don't know if you can see this, but basically all my commands they have like a several second delay, which is Yeah, it's looking okay to me. if you
want to just swap laptops. Um
maybe Yeah, no, I know. But it's just Yeah, it's just my editor doesn't work. And I
also have several non-responsive uh cursor sort of to I feel like this is a unfortunately a pattern with cursor recently.
I if you want to swap laptops it does seem to be working okay from online.
I would just give everybody 60 more seconds to just get get the basic demo working and then just let me know.
Yeah because I think using some you this is still almost an hour to go. It's not
good to use somebody else's computer.
Maybe I will continue on yours and I'll just restart my computer. I think that's a good idea because we can I also have examples 01 and 02.
Oh, really? Oh, brilliant. I
think working with something.
What's your keyboard layout?
Uh, probably the same as yours.
One can one can one can hope. Uh, okay.
Entire screen looks the same.
Uh, looks pretty good so far.
Um, what if you got a terminal?
Uh so node examples 01 hello world.
Okay, perfect. Yeah. Oh, this works flawlessly. Uh so um
flawlessly. Uh so um has everybody got that more or less working? Because then we can just make
working? Because then we can just make an open AI coding agent now. Um I think that's all we need.
Let's maybe I'll ask um do you have a speech control in here?
Uh yeah, it's control and option. It's
kits one.
Yeah. And then
that's it. It's recording.
Hello. Oh, cool. See that red bar?
Um, hey, I would like to make a third example uh called agent processor and for that we need to install uh just the
normal OpenAI SDK. Please uh do that and just make me a new file that looks like the hello world processor.
Okay. Um
Okay. So,
um I just got to gather everybody again after I left you for about uh 10 minutes here. This is a normal uh pretty normal
here. This is a normal uh pretty normal spaghetti code TypeScript program, right? Like basically what we've done
right? Like basically what we've done here is we we sort of like doing some boilerplate work. Um here we want to
boilerplate work. Um here we want to subscribe, we we have to make a subscription. Um, oh my gosh, this
subscription. Um, oh my gosh, this example that you put in there has like an a board signal and and handles all kinds of things. And and um anyway, the the um I think um uh the way I think
about what we're really doing is we're we're writing a stream processor. And if
you've never done stream processing, uh this this might be like a bit weird and people use slightly different names for for these things, but really um what we're saying is um there uh exists uh some kind of world state that we're
interesting in that can be derived from the events. Uh in this in this example
the events. Uh in this in this example here that Misha made uh we are uh trying to count uh the number of times in hello world event has been encountered in the stream. And then you can write the
stream. And then you can write the entire logic of your program as uh effectively a reduce function. This is
this is synchronous and it just um it just uh looks at all the new events as they come in and then it updates the state. That's all it does. So it takes a
state. That's all it does. So it takes a state and event and it uh returns the new state. Or you can just also return
new state. Or you can just also return nothing. It'll it'll be fine. It'll just
nothing. It'll it'll be fine. It'll just
ignore it. And um and then uh you can you should not do side effects in this reduce function. Um, and the reason I've
reduce function. Um, and the reason I've split the reduce function from from after append where you should do your side effects is because uh if you think about what happens if if your program uh goes to sleep, right? Like you're uh
you're closing your laptop at the moment. Everything's running on our
moment. Everything's running on our laptops. Uh you know 100 new events go
laptops. Uh you know 100 new events go into the stream. Some of these events might in the future need to trigger LLM requests or something like this. And
then you bring you open your computer again. you start your your program again
again. you start your your program again and you don't want your program to go and um make a ton of LLM requests for all of those 100 events in the past uh necessarily. Uh instead you wanted to
necessarily. Uh instead you wanted to sort of like catch up all the way and then decide what to do um with the state at the end. And so we've made this like super lightweight um abstraction here of
a processor that basically um gets hooked up into a processor runtime. Um
and um what it does uh is it splits your work into reducing events into state and then if you want to do some side effects like for example appending um appending to uh uh you know to the same stream
again you can do this. By the way I think uh I think you can also do this which which is like potentially uh that's pretty interesting. um like uh
you know you you can literally just append to uh you know like a child path or you can append to a parent path or or or whatever um else you want. It's
basically like a meant to be like a file system. And so if you wanted to build
system. And so if you wanted to build sub agents on this thing, all you would do is probably you would just append to dot slash, you know, Boris and pretend that Boris is a sub agent and then you would just subscribe to some special
events that Boris might output like I have the result and then you would automatically be woken up again on the parent when Boris has the result. Um, is
this confirmed, Misha, that this runs?
Like how do you run this? This just
exports the um like you need you need the runtime, right?
Yes. There is a CLI command that I now can't remember because my agent's been running it. Uh,
running it. Uh, you should be able to do.
Did you do all that just now?
I did that about a week ago, but you may have changed.
No, no, no. Yeah. Okay. So, okay. Okay.
Um, let's go to our repo. Like I
promised, this is uh a little bit The these are the um so you can actually also do this if you go to
iterate AI engineer workshop um here there are a bunch of examples uh of files that are um uh workshop two I was going for workshop yeah I also
that was meant to be uh meant to be renamed um ah here this is this is this is basically the snippet we want okay there
is this um there's this thing uh that you can have. Um so here now we've just made a processor, right? Like this is just like a sort of inert thing and then um there's a thing called a uh pull
subscriptions processor runtime. Um and
all that does is it takes your um it takes your uh your processor that you just made and it uh runs it.
Um, I think you probably need to assign it to a variable because you'll just export default above.
Um, yes.
Okay.
Uh, and here let's just say uh just see if we can run this.
Uh, yes, but unfortunately they need to be modified uh um in order to uh um do anything interesting. because I
don't see um yes those ones are those ones are were added in the last few minutes but uh using yeah I gave slightly um bad copying
instructions um but I think I think committed uh I think the we're just going to have to sort of like slowly slowly build it
up in in this repo and then uh push it once it works here unfortunately Ely.
Um, so here, uh, basically what I've now done is I've taken this processor and I've hooked it up, uh, to this runtime and all the runtime does is it does this like boiler plate for for consuming the stream and running the reducer and run
running um, uh, the uh, after append hook and what we should see is once uh, you submit hello world into this uh, into this path here that is just Jonasle
uh, you should get a response there. So
if I go here in the in the UI and I go to Jonas example and I think I said this should be event of time hello world. Um
oh no this is up here if all works. This would be sort of like the proof point that lets us it doesn't hello world scene.
Um, I don't remember if this example relies on the uh machine username, but if it does, then that would be mmcal, not Jonas. Um,
Jonas. Um, maybe let's just do this.
Sorry, this is like the worst workshop because now I'm also on the wrong computer.
And it does after Okay.
Is my computer back up? Yes, but I don't know your password.
Okay. Yeah, I'll I'll do that in a second. Okay. Um
second. Okay. Um
This example here is not working.
Okay. So, I think we have to do plan B, but we might not actually be able to run this.
Are you getting getting I'm just going to walk through the code as we would have written it from scratch.
Sorry about that. Um because I think it it it sort of like illustrates the the progression of things and maybe if you it's installing dependencies which is is slow. Uh
slow. Uh yeah, but but it's exactly eventually eventually this will it will be runnable by the walk through.
So um in this in this paradigm h as as we uh we've been using it in our agent when you want to implement some sort of feature what you do is you say okay what is the state I actually need in in order
to do my my my thing in this case we want to do an LLM request whenever somebody sends a message and says hi. So
um we have here in our um in our user interface and you can access this as well. There is basically like a cheat
well. There is basically like a cheat code that makes a bit easier to um to create uh new events of a type called agent input addit. And this is like deliberately the simplest possible thing
you can imagine. This is basically just a string. And so what we're trying to
a string. And so what we're trying to achieve is that whenever somebody puts an agent input added event into an event stream, we want to uh react to that by making an LLM request. And for
simplicity, we're just going to go in and do it using the OpenAI SDK. So the
open AI SDK um the way it works is um you have to uh at some point uh before you want to use it you have to make yourself um
here a new open AAI client and then you can use uh their responses API. So you
have openAI.responses
uh responses.create create and then uh in order to create it um you need some information uh specifically you need a model state uh a string and so immediately there it's saying okay so
let's just say our state for our LLM agent it has a model string because that is one of the things that we will need when we when we come to actually make an LLM request and there is nothing but event sourcing so let's just say okay we
have um we have a model string and that comes from from um from our state uh and then you know you could easily imagine in the future. This was one of the things that you could have done as an exercise in in the hackathon is you
could say let's add an event that is like LLM changed LLM model changed and you have the teeny tiny oneline reducer and it just sets the model to be a different model and then um off you go you make the next LLM request um with a
different model and the other thing we need is instructions. Uh this is I think a bit of a misnomer. So I would call it system prompt in our state and say okay the in like the system prompt for our LLM what is meant to be doing we'll take
that in our state and then you need a history um a state history and um and and so we've defined this uh and normally what you'll do is uh you'll
just put this into into uh like a little u into like a little TypeScript type if you're using TypeScript. So here we're saying this is the state of our agent right and and in the beginning it's really simple but in the future you can
very easily imagine how how you quickly add things to it like state am I currently compacting or is a tool call currently in progress or or whatever else um you might have um and uh the
idea is to create a system in which it is trivially easy with three four lines of code to add one of those capabilities uh to it and then you say okay what's my initial state um because we're processing the stream initially there's
no events so we need to know what we're what we're initially starting with. And
so we're just saying uh this is we're starting with an empty history and this system prompt and GPT 5.4.
Um and then the next thing you do once you've um uh once you've done that is you need to decide what events do I need to derive this um to derive this state.
Uh and in this case we will just um uh create a new event here. Uh a new we're using zod here for schemas. We're
saying it's an agent input added event and it just has a string.
Um, and then whenever anything comes back from OpenAI, because that is like a fact that has occurred, even if it's just a teeny tiny fragment of a string or or something else, we store that in
um in an event. Um, and that's it. Uh,
these are these are the two the two main event um event types. Um, I don't think we need any of this stuff here actually.
Basically what happened as is often the case, I uh I attempted to instruct an agent to rewrite all of these examples to uh be simpler to execute just before
and it just um rewrote them quite significantly. But this is really all
significantly. But this is really all you need. So um and then uh you have to
you need. So um and then uh you have to write your reducer first and the reducer is basically where you just say okay when an event is encountered uh how do I update my um how do I update my uh
state? you say okay when an agent input
state? you say okay when an agent input addit event um is encountered this here is using a little like zod helper library actually one that Misha made called sche match uh it's just uh it
just gives you basically safety on this payload here when I have an agent input addit event then I want my state to be updated as the current state plus the
history except the history also gets uh this little uh this little thing here uh here appended so we're um and and um the reason we're doing that is because uh
OpenAI's responses API um since we're making OpenAI processor, we're basically translating between our view of the world. We just have this like agent
world. We just have this like agent input and um it's just a string. Uh but
OpenAI wants you to put RO user and and then put the string in a content field.
So, we're putting that there. Um
and um then the other thing we need to do um when you use the OpenAI uh streaming API uh it'll it'll like basically chuck out tons and tons of
these uh output uh response um events and uh one of them finally at the very end will be um of type um
uh um um Yeah, I think this is a bit hogwash. Um,
huh.
Yeah. Yeah, exactly. It basically what we really want here is um uh
Oh, thank you.
Um, is that actually right?
Um no.
Um, let's just I don't suppose you have managed to get the I have pushed workshop 2 to the Yeah, but
the demo repo, but it's more or less as is.
streamed like so.
Yes.
Yeah. Let Okay. So, let's let's just like uh sort of like admit defeat on the thing that I was going to do and just like try try and show you how how how it work because we we can get something like very simple working, right? Because
we have the extremely simple ping pong processor that that does work. Uh and so here we can uh we can even uh just
um let's maybe actually start here.
Oh man, I hit I hit insufficient quote running my uh jup.
Yeah, we we do now have the same examples on the other repo, but it'll it'll Yeah, to be honest, I think uh I think we might have to just admit defeat and we can just talk about something else or
give you give you the time back just because the problem is we can't actually run these things. Uh and I think it would take me like probably like 10 minutes of actually concentrating to to
get them runnable. Um so maybe we should uh admit defeat on that. Um, I do wonder if there's like does,
you know, does it does it sort of make sense what the what the idea is, especially with with it being distributed. So the right that you can
distributed. So the right that you can say, okay, like I have my processor over here and your agent can use it and it can be on a different server or it can
actually be in a in a dynamic worker. Um
and um yeah I I think without showing it it doesn't really make sense.
So the idea here I was trying to understand with this stream processor framework events are
new.
Um consume the rural materialization side or part.
So every yeah so so everything that happens in the system every streaming chunk everything becomes a becomes an event in the stream and then you have reducers running in many different places like uh for for instance uh you know even this thing
with like is the stream paused or is the stream not paused the actual implementation of that is um maybe that maybe that's actually useful because this is running in production now uh like right like there is a circuit
breaker processor and and this is like how the whole thing is built even inside the event service and the circuit breaker processor has an initial state of paused false and it has pause reason
null and basically then it has a really really simple reducer um uh that just checks what are the last 100 time stamps of events I've seen if the last 100 time stamps I've seen seen don't go uh you
know don't go more than a second back then I'm just going to append append an event so the reducer basically just accumulates these timestamps and then the after append function uh here says okay if I if I have seen too many time
stamps in the last second uh I'm going to I'm going to throw the circuit breaker. So, this is like a real
breaker. So, this is like a real processor that runs inside my uh my um my uh service. But let me show you another one. Uh this one here, this UI,
another one. Uh this one here, this UI, this is also a stream processor. What
does this UI do? The UI takes all of these events and for some of the events that it deems like interesting, it chucks uh it basically it it basically projects them onto what we call feed
items. So, these things here that have like a slightly nicer rendering, they are feed items and they are not the same as events, but they're derived from events. So, uh, when you're making UI
events. So, uh, when you're making UI for this thing, you're also just reducing. Um, and because the reducer is
reducing. Um, and because the reducer is just a synchronous function, right? If
if if you knew, for example, I have I have this like a PI agent harness processor and it comes with a whole bunch of event event types. Um, and it has a reducer. Um, you can just in your
processor use their reducer. It's
practically free, right? Like you can just import it and and run it. Uh, and
then you can just sort of like start building abstractions that rely on their event types. Um, and so, um, I think
event types. Um, and so, um, I think what I'll what I'll what I what I will commit to doing because this was such a shitty experience is I will just do do the whole rundown in video form, uh, on
the on the internet. And that was one of the things I was going to show like you can just hook up Pi or Claude or or open code as well and um, just say, okay, like they come with a bunch of events
and you can store them in this event stream and you can react to them or or build on them. Um,
does that answer the question like where the reducer runs? Um, um, and and then like one of the things you can do with this is you can also actually just append an event that has some source
code in it and then it runs that particular processor literally automatically for you horizontally scalable. Um,
scalable. Um, it's just Yeah. Do you have the example of showing how you do that where you're appending like a Yeah, I do have that.
It's not runnable. I think the is somewhat.
Yeah. Well, actually we can do it here.
Uh yeah, I think I think maybe maybe that is good. Just like focus instead of trying to write code, we can focus on the things that might work. Um so let's we can create a new stream. By the way, all streams are created uh you know uh
completely implicitly. So you can just
completely implicitly. So you can just post to anything under the streams API and it'll it'll make you a new stream.
And then uh here there's in the UI and you can also play with this if you want to just see um there is a there's a bunch of sort of preset um preset events. Uh and one of them is here. You
events. Uh and one of them is here. You
can append an event of type dynamic worker configured to this thing. And uh
what does it have? It just has a script in it. And the script is a string. And
in it. And the script is a string. And
inside the string is a reducer uh and an after append hook. And so this is um literally a uh like um a stream a stream processor. It would be funny if this if
processor. It would be funny if this if this is the thing that works because this is I was basically I was basically up most of the night working on this because this is the most exciting bit.
But it wasn't really like workshop grade because it was, you know, you get to get like the first five concepts and then you're like, by the way, you can deploy this by just adding an event. Um, uh,
but if I, if I just say ping here, it says pong, right? That's pretty crazy.
Like I have here a stream uh that knows nothing about the world and then I have appended this event here that just has a little bit of JavaScript in it. And this
little bit of JavaScript uh just makes it respond with pong whenever it encounters something with ping. And then
there is actually a a version of this.
This is the thing that actually took took the night away so to speak is if you have these processor files that export a processor that you define with define processor. You can bundle that
define processor. You can bundle that into an event effectively. And this AI agent I also got running like that. But
of course you don't want to have your OpenAI API key in there. So that is why we have this like slightly awkward envir here where you can put your OpenAI API key outside of the stream and then um it
will basically when it makes a fetch request it'll substitute the secret in in the header. But um yeah, I'll definitely I'll definitely post a video of that because that's that's pretty wild. Like you you can write like the 40
wild. Like you you can write like the 40 lines of code required for a basic AI agent and then you can append that to any stream and then that stream becomes an AI agent.
I don't know. I'm sort of looking at like two or three nods but mostly blank blank stairs. It is very strange. Um but
blank stairs. It is very strange. Um but
uh but but I does this does this make sense what happened here at least because you can this one you can just play with like you can just go in the UI and so uh to some extent uh actually you
can uh you can make increasingly more complicated processors for your streams. uh you can just you can just code them in this input field if you want or uh you know um the only problem is it
cannot have any dependencies like if you want to npm packages you have to bundle it into this string which gets a bit more complicated but um yeah
yeah so this is the other thing like everything you might have noticed is everything has a slug it's just like a URL safe identifier and so the the event here is actually called dynamic worker
configured because you can override it and And what also already works with this event type is you can give instructions to an AI agent to make itself
different functionality just by calling append um with like a little bit of a different JavaScript. Um
and uh you know you you could you can quite easily imagine overcoming the bundling problem for example by just saying oh there's another event type which is called like a sort of like unbundled dynamic worker which just has
a package.json JSON file, a field, and a
a package.json JSON file, a field, and a script field. And then a different
script field. And then a different processor takes it, bundles it, makes the FAT event that has all of the bundle source code, and suddenly uh is is basically like a like IDE. And that's
then where you can you can hack on the agent harness without having to really like install anything or like bring servers into the
mix or or whatever. Um, but sadly was not able to show it. So, bummer.
Yeah.
Uh uh well, this is uh this is JavaScript.
There wouldn't be a typing typing error.
Um uh but it uh if there's any kind of error, the only thing that can possibly happen is that you get an event.
So there would have to be an error event. Um,
event. Um, and there's no there are no typing errors as implemented right now. But you
that's because this is a JavaScript evaluator, but you could also find a way to run a TypeScript compilation step on it first and then you could emit error saying there was a TypeScript compiler error. Then you could run it through an
error. Then you could run it through an LLM to try and fix it and emit a new event with the fixed code or do something else with it.
Yeah. Um the idea is that just everything that happens results in an event and then you can react to those events however you see fit.
Right.
Yes. So in this particular case it's it's dynamic workers in Cloudflare. So
it it just spins up a small small dynamic worker. Um, so basically like
dynamic worker. Um, so basically like the deployment story for a processor like this is or or like one way to think about it is uh at when when this was still working before I ask it to be
refactored and broken, I had um the way I would do it is I would locally on my computer I would do the SSE subscription and run my processor that way and then once I thought it was cool uh I could
either deploy it just as a web service on on Cloudflare workers or Versell or or wherever uh and and just tell uh tell the um agents the tell the streams to
notify me whenever a new event needs to be processed or alternatively just use this dynamic worker thing. Um, and like there's I don't know if this is going to work. This is why like at the moment
work. This is why like at the moment it's looking more like it's a really dumb idea I think because it's really not coming across. But the for example, you could have a plugin for your agent uh that runs on another computer, especially if it was like something like
really important like a prompt injection protection or something like that. Like
you could say, "Oh, okay. the way that my agent uses your prompt injection protection service. It can't really be
protection service. It can't really be through MCP because there's no way to to kind of like proactively hook into the agent loop or or at least not at the minute. But what it could be is it could
minute. But what it could be is it could say, "Okay, my my agent loop actually waits for up to like 200 milliseconds before making any new LLM request uh for any safety checkers to say and then can
just literally ask your safety checker over there. I don't care how you
over there. I don't care how you implement it. You could even charge me
implement it. You could even charge me for it, right? Um and I just don't yeah I just don't think that's currently possible in in any other way. That's why
I think it's quite important that the stream can reach out to the processors and say you got to do something you know you need to process some events now. Um
um there's a there's like a couple other uh potentially I don't I mean this might Yeah.
The idea one of the ideas you shared is my agent might use just in time services other agents are offering.
Yeah.
Then he might be charged with it.
Yeah. Well, I mean like probably in initially the human would be charged of it, but but this is just like there's no way like let's say okay so cloud code if I want to make a cloud code plugin um if
if I wanted to make that into a business think think about how you would have to do that at the moment it and you need to give some sort of instructions uh or or
something to to uh to claude code either to proactively call your MCP tools or or something of that nature or call maybe some CLI tool that was previously authenticated where you did like CLI log
to you know like prompt injectionp protector.com and then somehow it works where whereas um uh really I think you can just plug into the into the agent stream like into the event stream
directly like the the interface is just somebody somebody notices that claude is about to do something and chucks some more events in and and like they kind of all work that way but not quite like
it's uh it's sort of like you have to squint a lot to to see that that's what it all compiles down to. Um um another thing maybe worth pointing out for some
of you that have tried to like tried to make open code or or or PI plugins or whatever is there's no before hook. This
is very very important. There's no like um there is actually in the back end like our built-in processes. The only
difference to our processes and your processes is that we can actually stop events from ever being appended. For
example by a third party. So there's sort of like certain things that are just um that that need to happen before an event goes in the stream. But um broadly speaking, I'm very against before hooks.
Like I think there were some instances in in Open Claw, for example, some massive performance regressions and and cost increases where you know you can you can break uh um you can break context caching quite easily with before hooks. You can massively destroy
hooks. You can massively destroy performance. like it's just way better
performance. like it's just way better to think of the whole system as as being eventually consistent and uh you know um doing all these distributed systems things like saying okay we're going to wait for up to 200 milliseconds for
somebody to say I've got a little bit of information for you but then we're actually going to make the LLM request whether you come in with information or not because this is needs to be system so if you think for example about rag
pipelines and things like that you can easily have a little processor that says I have like a indexed version of the notion knowledge base or something and I'm just going to sit here and try to
squeeze in a little bit of extra context if I think it's relevant, but if I don't get there in time, it's like totally chill. The whole thing still works. It's
chill. The whole thing still works. It's
not like I' I've kind of delayed the um the agent or something. Um
yeah, you mean I mean the the the streaming database like I don't know I I I think um I think this could work on almost anything that is like durable stream shaped. So this
was just for the purposes of this um exercise. Uh I do think like there is
exercise. Uh I do think like there is potentially an interesting scenario where uh like where a like it's not even really like it's a combination of a Q and a pub subsystem
and and like a streaming database and uh that can also run code for you. But but
that that sort of thing is an interesting infrastructure primitive that uh maybe somebody could make. Um I
do think that uh yeah but in in principle like I I don't know if you don't know about it yet you can read up on this durable streams uh API specification that somebody came out
with. We don't follow it exactly but uh
with. We don't follow it exactly but uh close enough and it's it's basically like a like a very common old idea like
um the thing that is more new is that it would be like you would also have push subscriptions out of it but there are some things that do is there's a thing called n like jetreamenat or something
uh that that does it um yeah are you talking about the electric core yeah exactly they they have durable streams And like a durable stream is just like
an appendon event log with with offset tracking and and the the the you know they want the client to track the last consumed offset. That's also how say
consumed offset. That's also how say Kafka works or something like that. But
you can also do it the other way around and have the server track the offsets and those are more normally like there's Google Cloud and AWS they have these pub subsystems where they say oh we'll just
like give you the next event and we'll keep track of of the offset for you.
Um Yeah.
And then anyone could essentially pass a stream that would go to my generically.
Oh, I mean you would need to have authentication on it, right? Right. Like
this this entire thing would never uh like right. like this. Uh I was thinking
like right. like this. Uh I was thinking like it might be fun to just say there's a do that yet basically.
Yeah, exactly. It might be fun to just say there's a public namespace and it just gets cleared every hour or something, right? Because it is kind of
something, right? Because it is kind of nice that anybody anywhere on the internet can just curl a thing or like copy and paste a curl from somewhere and and suddenly have like a flavor of agent. I do think that's quite
agent. I do think that's quite interesting, but it just needs to be deleted every like quite aggressively.
Um yeah, but but I think I think that's more or less a solved problem. It's like
a bit gnarly, but basically the way I am envision it you would do it um if you did this properly is on each event you basically have like client provenence information. We say like who was this
information. We say like who was this like what were they able to do like how how sure are we that these people you know the HTTP client or the the stream client was actually entitled to do this
operation.
And and again, you can model all of that as events, right? Like you can you would just have an event that is like make this stream public and until you make
that event, it can only be uh written to by whoever created it or something.
Um, right. I might ask uh um Sean if we can
right. I might ask uh um Sean if we can instead of this video do a video we'll record a little bit later today um that shows it more nicely because I do think it's worth showing.
Just a little bit too short short notice.
Shall we just wrap it then? If uh I I don't mean to keep you all of your uh like 10 minute late longer break or if anybody wants to chat.
Loading video analysis...