LongCut logo

Make your own event-sourced agent harness using stream processors — Jonas Templestein, Iterate

By AI Engineer

Summary

Topics Covered

  • Everything That Happens Becomes an Event
  • Every Agent Gets a URL
  • Stream Processor: The One Abstraction
  • Code Is an Event
  • No Before Hooks: Proceed and Adapt

Full Transcript

Welcome to to the workshop. Uh I am Jonas. Um and this is Misha. We both

Jonas. Um and this is Misha. We both

work at it.

Hi.

And the um this is going to be uh chaos.

I think pure chaos first of all because we only we only decided uh last Monday to do this and then there was like um bunch of personal stuff and halftterm and the kids were sick and then this

morning we came here and I thought this was going to be like a like hour and 20 hackathon or something like that and then we came here and we're told you're going to do it without an audience because of some fire thing. Uh and now there's an audience again which is

amazing. So, we're going to do uh gonna

amazing. So, we're going to do uh gonna try and do this as like a improvised hackathon, but I've only just pushed the SDK literally one minute ago, and Misha is going to try and sort of like live

fix it. But I I hope it'll still be

fix it. But I I hope it'll still be worth your time because I think there's sort of some uh hopefully at least some interesting ideas or or maybe like one way to put it is I would like to find out whether this is like dumb or cool.

Uh and and the um and you can help me with that. So please also just like ask

with that. So please also just like ask clarifying questions because part of this the reason we said last week okay let's do this workshop is because these have been ideas that have been kind of noodling around over the last year but

we haven't really actually done anything with it's not really like a commercial value to it necessarily but I think um it's it's basically how I would like to build agent harnesses. So, you know,

I've been noodling with this uh uh for a while now and and you know, obviously you've got like Claude and you've got Pi now, but back in the day you didn't. And

so, you make your own coding harness and there's all these different ways to do it. And I think um the way I would like

it. And I think um the way I would like to do it is I would like to do it purely event sourced. Um and I say here aka

event sourced. Um and I say here aka debugable and that is because everything kind of skirts around of being event source like all these things without quite being event sourced. There's

always something that has a side effect that you later can't tell and then you can only see it in the hotel traces or something which doesn't really make sense to me because normally these these these agents the way they work and we'll see that in a second is they already

have a log of events, right? So you so if you just say everything that could possibly happen is in there. It'll be

really easy to debug. Um and as we'll see hopefully a little bit strange but potentially in other ways also really easy to to extend. Um, and then the second thing is I want I want this to be extensible and and we've seen this I

think with PI which is was just incredible um how valuable it is to make an agent extensible not just by humans but also extensible by itself. Um and um

one thing that I want in the extensibility is is I want I want it to be composable. like I don't think we'll

be composable. like I don't think we'll get there within one hour with all of you but but maybe afterwards if you say oh this is actually cool and not dumb we'll keep working on this and you can say oh I made this extension you made

that extension and you can combine them and um um because I think we don't really yet know what is the best recipe for for for agents basically for agent harnesses and it's been too hard to

experiment even in pi like everything has to run in a single thread in in in in like one process over here on this particular computer and so on and so forth so I think I think we we can maybe do a little bit better than that. Uh

then I think it should be on the edge publicly rootable. Um

publicly rootable. Um like the way I think of it is basically an intelligent entity like a digital one that's not a robot in the future. It's

just internet connected uh basically server program, right? Like it just speaks HTTP. So the slice broadly

speaks HTTP. So the slice broadly speaking that should be sufficient for almost anything. And then I would just

almost anything. And then I would just put these agents on the edge speaking to the internet straight away. H this is not very safe. None of what we're doing today is safe by the way. There's no

authentication you'll see. you'll all be able to see everybody else's event streams and agents and and whatever else. You should not put any secrets in

else. You should not put any secrets in the actual payloads of the events or rotate them afterwards, right? So, we

can um you know those things can be solved, but broadly speaking, the moment an agent exists, it should have a URL.

That's just like uh that's just my opinion because otherwise like you end up making all these things and then you tie yourself backwards inventing a connector concept just so you can get Slack messages in or or god knows what.

And that I think that should be really easy. um or like maybe like a human

easy. um or like maybe like a human should fill in a web form and the result of the web form should be input for the agent again or or something like that.

That's kind of like the same thing as as a Slack web hook. And then it should be distributed. And this is potentially uh

distributed. And this is potentially uh like a very double-edged sword as we'll see like right but but by distributed I mean you should be able to have uh an agent um running uh sort of on one

computer here on a server and my my plug-in running over here and your plug-in running over there and yours is written in Rust and mine is written in in in Typescript and it all just doesn't really matter. Um and um the reason that

really matter. Um and um the reason that is bad is because you will eventually get race conditions and loops, right?

Like sort of like two plugins kind of sending messages back and forth creating like an endless stream of events in your agent. Um but on the flip side, uh you

agent. Um but on the flip side, uh you have those issues normally anyway in in in most agent harnesses I've seen. So

it's quite good if you if you can kind of preempt that a little bit. Okay. So

let's um let's talk about this uh thing that we have invented for um oh my gosh um why is this this isn't the same as in

my file system. So I'll use the one in my file system. Um basically uh we have created this this uh service over last week called events.iterate.com.

It has a web UI um and uh we can um uh we you can use that. I think you'll certainly use it, but for now we'll just play with it with curl because I think it's it makes it in some sense more more

uh tractable. Um in the in the agents um

uh tractable. Um in the in the agents um in the events uh app we have this concept of agent paths. So just like a file in a file system every agent has a

path as a hierarchy and you just have here show this here you just have raw events. This is YAML formatted raw

events. This is YAML formatted raw events in each of these paths. And that

is basically the the the entire uh the entire primitive that we're going to be building our agent on. And so we can start by playing with this. We can say, okay, we we have a path prefix. If you

copy this from the repo that Misha's going to put it in the read me in a second.

I'll I'll add workshop. MD as well.

Yeah. Uh yeah. Okay. Uh oh yeah, maybe that's why. Um

that's why. Um I I added the readme but not the workshop.

Yeah. So uh basically um let's set ourselves a path prefix here. In my

case, that is just going to be uh Jonas Templestein or something. Um they start with forward slash by the way, which is a little bit um awkward potentially. Uh

but the but the service is very tolerant of you URL encoding your slashes or doing whatever you want. But I want I I wanted good curl economics. Uh but I also wanted them to start with a forward slash so you have like a well-

definfined route. Then for the base,

definfined route. Then for the base, it is now pushed to the repo. But have

you have you shared the repo?

Oh yeah, because now you could actually do this on your computer. Uh let's do that actually. If you go to um

that actually. If you go to um github.comiterate

github.comiterate there is an an ai-engineer-workshop um uh and in there you should be able to if

you click on this workshop.nd file

that's what we're now going through. So

you should be able to copy and paste the curls from there and again there's no authentication so you can just do this in uh in in fact why why don't a couple are hitting our live deployments.

Okay. So,

uh like this is entirely just sort of like a proof of concept of um uh how I think this should work. Um

we're basically what we're going to do is we're going to turn this simple event stream into a fullyfledged agent without like really deploying any server

software uh or or anything like that. Um

maybe uh yeah, I should be able to see if anybody if anybody has managed to do it. I'll wait until two or three people

it. I'll wait until two or three people have managed to successfully curl something. Um, and then I'll carry on

something. Um, and then I'll carry on because I think it'll be it'll be good if you if you if you curl uh it yourself. So, it's here in this AI

yourself. So, it's here in this AI engineer workshop workshop.mmd.

If two people have the same name, that's going to cause username, so it doesn't matter. Um, that's fine.

Uh, okay. There we go. Got a few people. Um,

okay. There we go. Got a few people. Um,

okay. So, that's pretty good. Um,

by the way, uh, you might see this here and there. There's also this thing down

and there. There's also this thing down here called project slug. If you really start stepping on each other's toes, you can just type what like use a different project slack and it's like a completely

new database, a new namespace. Um, you

can pass that in as an HTTP header. If

anybody needs that later, you can you can ask me. Uh, okay. So now we have a few people curling things. I will also for start curling things. So, uh, now I'm just going to say hello world, uh,

here to my stream and it'll come back and and and we straight away we notice like a few things. There's a basic event envelope

things. There's a basic event envelope shape. We have a type that I've just

shape. We have a type that I've just invented here. Events don't need to have

invented here. Events don't need to have anything but a type, but most of them also have a payload. Uh, then um the uh there is the stream path which came from the URL that we posted this to. And

there's an offset. This is just an auto incrementing integer that starts at one.

Uh and so there's basically like a point of serialization at the server that just increments them and you get a created created ad. Uh and then um here what we

created ad. Uh and then um here what we can do with curl which is quite nice. Uh

if you do dashn you can do uh streaming uh streaming SSE. So this here is literally nothing magical. This is

literally just you know Jonas templehello world. This is literally the curl that

world. This is literally the curl that we're doing right here. Um

and um there I can see the messages coming through. Very nice. And I can um I can

through. Very nice. And I can um I can add another message. And now um basically uh all all the messages will show up there. And there's also a um uh

oh, if you do question mark live equals true. This is inspired by the durable

true. This is inspired by the durable stream standard, but like ever so slightly different. But if you do live

slightly different. But if you do live equals true, the connection will stay open and it'll just keep sending you more events as you as you put them through. So this is sort of already you

through. So this is sort of already you know um like you could say this is an AI agent if Misha was simultaneously uh giving responses to my hello world uh this would already be an agent and then

there's my my AI has told me you can um if you're going to be doing that a lot uh you can pipe this uh through set get just the data elements and then pipe it through jcq dot and then you get a

little bit nicer formatting. So this is sort of like uh this is so trivial that you I actually do spend a lot of time in curl just curling it. Uh then what can you do? Um the thing is very tolerant

you do? Um the thing is very tolerant just in general of of stuff you post to it and this will become relevant if you wanted to say send web hooks from random third parties there. Like my theory is always that like the LLMs are good enough. They can basically just consume

enough. They can basically just consume raw web hooks from anywhere and and they'll roughly do the right thing. So

what the what specifically the server will do is if you send it something that doesn't um validate like hogwash here it says there's there's no type property.

So there's not actually a valid event.

Then you can see here what it has appended is basically an error. But it

because we're event sourcing the error is also just an event and it just says here okay um you have an invalid event appended um event. Oh, another thing uh

that we we will notice is that the type here uh that our server has created it starts with https uh events.iterate.com.

You don't have to do that. Uh but that is something that we like to do because that literally leads to the documentation for that event type. And

if it's going to be an opaque string, like why not make it make it something uh like that, but uh obviously you can you can just do you know single words or two words or whatever. Um but you will

see these URLs and they're just they're just types. Um then uh what else is

just types. Um then uh what else is interesting that we can do? I mean this is kind of boring but you know again if you're going to do web hooks you have can put an item potency key in which just means if you have the same item

potency key twice you don't get um you don't get the event appended twice. You

can pause a stream. I think in the interest of time we can skip that. You

can resume a stream. The reason the pausing is important and is we put this in even though we literally just made this a couple days ago is because of the infinite loops. like it's very very easy

infinite loops. like it's very very easy to accidentally put like you know thousands and thousands of events in a in an infinite loop in the thing and so if you put more than 100 events in a second or something it is just going to

circuit break and then the stream will be paused until somebody unpauses it. So

basically if you try to um here actually let's do that because again like for those of you who haven't seen event sourcing this might be a bit weird like how do you pause a stream? Well you

pause a stream by uh oops this is the wrong one. You pause a stream by

wrong one. You pause a stream by appending an event that uh says you're you're paused. The the only external

you're paused. The the only external interface to the agent is oh um now it's complaining. Why is it complaining?

complaining. Why is it complaining?

Ah because I think there needs to be a reason or something maybe.

Uh yeah, reason demo. Yeah, there you go. So now

reason demo. Yeah, there you go. So now

this thing is paused. So now if I um you know try to send it the hello world event from earlier, it's just going to barf and it say no no no more events allowed. And that is one like that is

allowed. And that is one like that is one thing that doesn't appear as an error in the stream because otherwise you would still get infinite numbers of error events. Um

error events. Um uh then I mean there's other weird and wonderful things you can do. I I don't even know if we really have to do this but uh there's like um I have made a

plug-in for this already uh that makes it so that if I put this JSON transformer configured event in what it'll do is it'll just observe certain events, rewrite them a little bit and make a new event out of it. But I think

that's a bit advanced so we'll we'll do it later. Um um it can also do weird

it later. Um um it can also do weird things like it can uh you can schedule you can schedule things like some agent SDKs you'll you'll know can do this but uh here you know if you want like an

like a heartbeat event every five seconds. Uh oh god now I need to resume

seconds. Uh oh god now I need to resume the stream because it's paused. Uh let's

see. stream resumed.

Okay, so now I am just creating uh this heartbeat event every 5 seconds which can be quite handy or you can schedule something for 10 minutes in the future or at a certain time and again it's just

an event that says do this thing in the future. Um

future. Um um I think I will skip over these but um you can cancel the schedule. You can

tell the stream by appending an event to send your server or some thirdparty server or your you know your Slack API endpoint or whatever an event every time an event gets added. Uh so you can

basically uh subscribe to the stream. Uh

here we're subscribing in our in our terminal. We're doing a pull

terminal. We're doing a pull subscription, right? We're doing we're

subscription, right? We're doing we're connecting to uh an HTTP server and we're getting we're pulling uh pulling uh we're the HTTP client and they're the HTTP server and we're getting an SSE response stream of of streaming events.

But uh it can also do the other way around where it will push you the stuff that gets appended or push you a filtered stream. Um uh which can be

filtered stream. Um uh which can be quite handy. Okay. So um let's maybe

quite handy. Okay. So um let's maybe stop there because I know also um it wasn't really clear from the session description how deeply technical this

was and so let's take any questions on what is even this thing that we're going to be using for for the next 45 but questions have to be submitted as an event to the questions stream

yeah exactly you have any trouble using that um does I assume lots of things don't make sense So please do ask them because I

think this will be extremely like random and strange so far.

What does it do?

Oh uh we we're like a we're like a hacker hobby club at this point to be quite honest like uh we we are uh we have not got a launch product. Um

but if we did it would be um I think based on this architecture. It's like uh anyway, I'll tell I can talk about it afterwards.

Okay. Okay. Okay.

But it's uh Yeah, it's more it's more homebrew hacker club than than like commercial enterprise at the minute, but it'll get there.

Yeah.

Yeah.

Basic questions that are on the market right now. Aren't

What's the architecture?

It's just um I think it's just the API like if you um have you tried to make uh you know like a like an open code extension or plug-in

or PI extension or claude claude extension and they all have like an API surface that is a little bit bigger than what you would have or what we're going to have in in a minute. It's not like by much, right? like okay like maybe the

much, right? like okay like maybe the streaming chunks for partial completions are a different kind of thing for some reason rather than just an event of type I've received you know like five letters of a streaming response to me that's a

perfectly valid event and and and the benefit then is that you can uh that can you I'm kind of like a one abstraction kind of guy right like this this should be like a like web standardsbased this should be there should only be one thing

which is an event and then as you'll see like the the thing um that was at the top of this file I think you can build a super expressive state of the art coding agent by just implementing uh like a

stream processor that reduces over the event stream and occasionally an X side effects. That's that's all it does,

effects. That's that's all it does, right? Like it just sort of sits there

right? Like it just sort of sits there and says, "Okay, I I'll show you that in a second." Like it's it's a bit weird.

a second." Like it's it's a bit weird.

Um I would say a weird way of programming.

Um so if I understand correctly, you're sort of saying take events that you get generated from an AI models and then you're basically going to pipe them through this stream processor.

Is that the idea?

The stream processor is or like the events iterator app is the storage like you can almost think of it as a database like one one um you know do you know convex is like a sort of you know they they kind of give you the illusion that

they're running Typescript in the database. You you could almost think of

database. You you could almost think of this this could be like a programmable stream or something, right? Because

actually one thing that you can also do, we'll we'll see later once you hack on your stream processors, you can also make an event that contains the source code of your stream processor and then it'll just run on every event. And and

so like it's basically it is like it is like the I think you can experiment with a huge array of different kinds of agent harness implementations

uh that can respond to real events on the internet and make real API requests without deploying your service or something like that. All you need is this sort of um streaming streaming

service that um uh has some of some of these um things like specifically it needs to be able to wake up you know like other other APIs and programs and

so on. I don't know if that Yeah. Um

so on. I don't know if that Yeah. Um

that does that answer your question at all or Yeah, I think so.

Okay. Shall we try and make uh now a um like a like a slightly more serious version of these curls? Um so does

anybody here not know Typescript?

I think so. I think the like one of the arguments in favor of this architecture that I also put in that um in that uh in the in the blurb I think is this is in principle polyglot right and there is a

uh like in any language there is actually at events.iterate.comidoccks

iterate.com APIdocs you have just I mean this is an exceedingly simple API right um you can you can make yourself there's a open API spec you can make yourself a client and

so on but for the purpose of this uh this um uh workshop here we'll just do a TypeScript version and we've shipped a client in a in a npm package already but I I I just wanted to point out like even though we're doing everything in

Typescript the whole point is that we wouldn't have to right but we just have to do one thing uh for this workshop Um, okay. So, let's uh let's do do something

okay. So, let's uh let's do do something pretty simple. Um,

pretty simple. Um, uh, is the AI engineer workshop um, package uh, now up to date and working?

I think it is, right?

I think so. Yeah. Uh, you might need to pull your repo because there was a client side type error in the iterate repo.

Uh, no, the in the AI engineer workshop.

Okay, that's fine.

It's in.ts.

Yeah, but uh, um, oh, maybe I will actually use that then. and then I'll be able to experience uh the same the same bugs if there are any. Yeah, that's

probably better. Okay. Um Okay. So, uh I think um if you wanted to just have a quick start, you can uh you can just pull uh clone the AI engineer workshop repo. Um there's already a teeny tiny

repo. Um there's already a teeny tiny example file in there and then we'll just add a couple more um as we go along.

And the workshop MD has runnable curl as well.

Nice. Yeah.

Um, if you wanted, you could put a couple like a couple of the follow on examples that are in the iterate repo that have actual processors. Um,

actual processors. Um, yeah, I can take I don't know what's going on here.

There we go.

My computer seems to have uh slowed to a crawl trying to open a cursor window.

This has been unfortunately like a somewhat regular occurrence recently.

Uh okay.

Okay.

Or Oh, no. I think this might be what you mentioned about the internet. I

think it's just struggling because it can't open a terminal because the my my terminal tries to connect with the internet.

Oh well then let's do not the internet, is it?

There we go. Okay.

Okay. So,

we have here um we have packaged up an SDK here to this AI engineer workshop um package that you should be able to just pmpp install or you just use uh use this

example here. And then basically uh what

example here. And then basically uh what what did we need in our curl, right?

Like we needed a base URL which is just events.iterate.com. That's where our

events.iterate.com. That's where our server is. uh we can use a path prefix.

server is. uh we can use a path prefix.

Um and um then we can uh run this function here called create events client that is from our SDK. And then

this is under the hood. This is

really don't know what's going on with this terminal. It's very strange.

this terminal. It's very strange.

Okay. Maybe um everything is just incredibly slow.

You want to try mine?

Um maybe, but there's nothing actually hogging the CPU. It's just I I do think it might be actually the network, but we'll see. Um

we'll see. Um so here so here you have um I don't think this has picked up the anyway. Okay. So this is basically what

anyway. Okay. So this is basically what we were doing just now right like we we say okay we're making ourselves a client uh and then um here there is a function called stream. This is basically the um

called stream. This is basically the um the the the right hand side of the of the terminal that we had earlier. And

then you say live true and give in the stream path. And we can say okay if if

stream path. And we can say okay if if uh if the event is not of type ping we will do nothing. But if the event is of type ping then we will I mean this is a completely unnecessary helper down

there. Uh then we can just do client uh

there. Uh then we can just do client uh append. I don't understand why the um

append. I don't understand why the um the the TypeScript language server isn't picking up any of the types for me. But

I can't tell if that's just cursor being weird. Again

weird. Again your own example.

Anyway um could quickly switch IDs.

Yeah, it's fine.

It is working for me. Yeah.

Okay. Yeah, I think this is unfortunately like a sort of cursor.

Okay. And then you should be able to uh just if you have a modern version of node, you should be able to just run um or maybe not. Did I make it?

It says invalid TypeScript syntax.

You know what? I'm just gonna I'm just going to restart cursor. I think that is worth it.

Is anyone uh able to run this example TS on their computer? There you go. I am.

Okay.

Um and are you able to send a ping to it and get a pong out of it? Okay. Very

cool.

Okay.

I Yeah, I wonder if we use use I don't know if you can see this, but basically all my commands they have like a several second delay, which is Yeah, it's looking okay to me. if you

want to just swap laptops. Um

maybe Yeah, no, I know. But it's just Yeah, it's just my editor doesn't work. And I

also have several non-responsive uh cursor sort of to I feel like this is a unfortunately a pattern with cursor recently.

I if you want to swap laptops it does seem to be working okay from online.

I would just give everybody 60 more seconds to just get get the basic demo working and then just let me know.

Yeah because I think using some you this is still almost an hour to go. It's not

good to use somebody else's computer.

Maybe I will continue on yours and I'll just restart my computer. I think that's a good idea because we can I also have examples 01 and 02.

Oh, really? Oh, brilliant. I

think working with something.

What's your keyboard layout?

Uh, probably the same as yours.

One can one can one can hope. Uh, okay.

Entire screen looks the same.

Uh, looks pretty good so far.

Um, what if you got a terminal?

Uh so node examples 01 hello world.

Okay, perfect. Yeah. Oh, this works flawlessly. Uh so um

flawlessly. Uh so um has everybody got that more or less working? Because then we can just make

working? Because then we can just make an open AI coding agent now. Um I think that's all we need.

Let's maybe I'll ask um do you have a speech control in here?

Uh yeah, it's control and option. It's

kits one.

Yeah. And then

that's it. It's recording.

Hello. Oh, cool. See that red bar?

Um, hey, I would like to make a third example uh called agent processor and for that we need to install uh just the

normal OpenAI SDK. Please uh do that and just make me a new file that looks like the hello world processor.

Okay. Um

Okay. So,

um I just got to gather everybody again after I left you for about uh 10 minutes here. This is a normal uh pretty normal

here. This is a normal uh pretty normal spaghetti code TypeScript program, right? Like basically what we've done

right? Like basically what we've done here is we we sort of like doing some boilerplate work. Um here we want to

boilerplate work. Um here we want to subscribe, we we have to make a subscription. Um, oh my gosh, this

subscription. Um, oh my gosh, this example that you put in there has like an a board signal and and handles all kinds of things. And and um anyway, the the um I think um uh the way I think

about what we're really doing is we're we're writing a stream processor. And if

you've never done stream processing, uh this this might be like a bit weird and people use slightly different names for for these things, but really um what we're saying is um there uh exists uh some kind of world state that we're

interesting in that can be derived from the events. Uh in this in this example

the events. Uh in this in this example here that Misha made uh we are uh trying to count uh the number of times in hello world event has been encountered in the stream. And then you can write the

stream. And then you can write the entire logic of your program as uh effectively a reduce function. This is

this is synchronous and it just um it just uh looks at all the new events as they come in and then it updates the state. That's all it does. So it takes a

state. That's all it does. So it takes a state and event and it uh returns the new state. Or you can just also return

new state. Or you can just also return nothing. It'll it'll be fine. It'll just

nothing. It'll it'll be fine. It'll just

ignore it. And um and then uh you can you should not do side effects in this reduce function. Um, and the reason I've

reduce function. Um, and the reason I've split the reduce function from from after append where you should do your side effects is because uh if you think about what happens if if your program uh goes to sleep, right? Like you're uh

you're closing your laptop at the moment. Everything's running on our

moment. Everything's running on our laptops. Uh you know 100 new events go

laptops. Uh you know 100 new events go into the stream. Some of these events might in the future need to trigger LLM requests or something like this. And

then you bring you open your computer again. you start your your program again

again. you start your your program again and you don't want your program to go and um make a ton of LLM requests for all of those 100 events in the past uh necessarily. Uh instead you wanted to

necessarily. Uh instead you wanted to sort of like catch up all the way and then decide what to do um with the state at the end. And so we've made this like super lightweight um abstraction here of

a processor that basically um gets hooked up into a processor runtime. Um

and um what it does uh is it splits your work into reducing events into state and then if you want to do some side effects like for example appending um appending to uh uh you know to the same stream

again you can do this. By the way I think uh I think you can also do this which which is like potentially uh that's pretty interesting. um like uh

you know you you can literally just append to uh you know like a child path or you can append to a parent path or or or whatever um else you want. It's

basically like a meant to be like a file system. And so if you wanted to build

system. And so if you wanted to build sub agents on this thing, all you would do is probably you would just append to dot slash, you know, Boris and pretend that Boris is a sub agent and then you would just subscribe to some special

events that Boris might output like I have the result and then you would automatically be woken up again on the parent when Boris has the result. Um, is

this confirmed, Misha, that this runs?

Like how do you run this? This just

exports the um like you need you need the runtime, right?

Yes. There is a CLI command that I now can't remember because my agent's been running it. Uh,

running it. Uh, you should be able to do.

Did you do all that just now?

I did that about a week ago, but you may have changed.

No, no, no. Yeah. Okay. So, okay. Okay.

Um, let's go to our repo. Like I

promised, this is uh a little bit The these are the um so you can actually also do this if you go to

iterate AI engineer workshop um here there are a bunch of examples uh of files that are um uh workshop two I was going for workshop yeah I also

that was meant to be uh meant to be renamed um ah here this is this is this is basically the snippet we want okay there

is this um there's this thing uh that you can have. Um so here now we've just made a processor, right? Like this is just like a sort of inert thing and then um there's a thing called a uh pull

subscriptions processor runtime. Um and

all that does is it takes your um it takes your uh your processor that you just made and it uh runs it.

Um, I think you probably need to assign it to a variable because you'll just export default above.

Um, yes.

Okay.

Uh, and here let's just say uh just see if we can run this.

Uh, yes, but unfortunately they need to be modified uh um in order to uh um do anything interesting. because I

don't see um yes those ones are those ones are were added in the last few minutes but uh using yeah I gave slightly um bad copying

instructions um but I think I think committed uh I think the we're just going to have to sort of like slowly slowly build it

up in in this repo and then uh push it once it works here unfortunately Ely.

Um, so here, uh, basically what I've now done is I've taken this processor and I've hooked it up, uh, to this runtime and all the runtime does is it does this like boiler plate for for consuming the stream and running the reducer and run

running um, uh, the uh, after append hook and what we should see is once uh, you submit hello world into this uh, into this path here that is just Jonasle

uh, you should get a response there. So

if I go here in the in the UI and I go to Jonas example and I think I said this should be event of time hello world. Um

oh no this is up here if all works. This would be sort of like the proof point that lets us it doesn't hello world scene.

Um, I don't remember if this example relies on the uh machine username, but if it does, then that would be mmcal, not Jonas. Um,

Jonas. Um, maybe let's just do this.

Sorry, this is like the worst workshop because now I'm also on the wrong computer.

And it does after Okay.

Is my computer back up? Yes, but I don't know your password.

Okay. Yeah, I'll I'll do that in a second. Okay. Um

second. Okay. Um

This example here is not working.

Okay. So, I think we have to do plan B, but we might not actually be able to run this.

Are you getting getting I'm just going to walk through the code as we would have written it from scratch.

Sorry about that. Um because I think it it it sort of like illustrates the the progression of things and maybe if you it's installing dependencies which is is slow. Uh

slow. Uh yeah, but but it's exactly eventually eventually this will it will be runnable by the walk through.

So um in this in this paradigm h as as we uh we've been using it in our agent when you want to implement some sort of feature what you do is you say okay what is the state I actually need in in order

to do my my my thing in this case we want to do an LLM request whenever somebody sends a message and says hi. So

um we have here in our um in our user interface and you can access this as well. There is basically like a cheat

well. There is basically like a cheat code that makes a bit easier to um to create uh new events of a type called agent input addit. And this is like deliberately the simplest possible thing

you can imagine. This is basically just a string. And so what we're trying to

a string. And so what we're trying to achieve is that whenever somebody puts an agent input added event into an event stream, we want to uh react to that by making an LLM request. And for

simplicity, we're just going to go in and do it using the OpenAI SDK. So the

open AI SDK um the way it works is um you have to uh at some point uh before you want to use it you have to make yourself um

here a new open AAI client and then you can use uh their responses API. So you

have openAI.responses

uh responses.create create and then uh in order to create it um you need some information uh specifically you need a model state uh a string and so immediately there it's saying okay so

let's just say our state for our LLM agent it has a model string because that is one of the things that we will need when we when we come to actually make an LLM request and there is nothing but event sourcing so let's just say okay we

have um we have a model string and that comes from from um from our state uh and then you know you could easily imagine in the future. This was one of the things that you could have done as an exercise in in the hackathon is you

could say let's add an event that is like LLM changed LLM model changed and you have the teeny tiny oneline reducer and it just sets the model to be a different model and then um off you go you make the next LLM request um with a

different model and the other thing we need is instructions. Uh this is I think a bit of a misnomer. So I would call it system prompt in our state and say okay the in like the system prompt for our LLM what is meant to be doing we'll take

that in our state and then you need a history um a state history and um and and so we've defined this uh and normally what you'll do is uh you'll

just put this into into uh like a little u into like a little TypeScript type if you're using TypeScript. So here we're saying this is the state of our agent right and and in the beginning it's really simple but in the future you can

very easily imagine how how you quickly add things to it like state am I currently compacting or is a tool call currently in progress or or whatever else um you might have um and uh the

idea is to create a system in which it is trivially easy with three four lines of code to add one of those capabilities uh to it and then you say okay what's my initial state um because we're processing the stream initially there's

no events so we need to know what we're what we're initially starting with. And

so we're just saying uh this is we're starting with an empty history and this system prompt and GPT 5.4.

Um and then the next thing you do once you've um uh once you've done that is you need to decide what events do I need to derive this um to derive this state.

Uh and in this case we will just um uh create a new event here. Uh a new we're using zod here for schemas. We're

saying it's an agent input added event and it just has a string.

Um, and then whenever anything comes back from OpenAI, because that is like a fact that has occurred, even if it's just a teeny tiny fragment of a string or or something else, we store that in

um in an event. Um, and that's it. Uh,

these are these are the two the two main event um event types. Um, I don't think we need any of this stuff here actually.

Basically what happened as is often the case, I uh I attempted to instruct an agent to rewrite all of these examples to uh be simpler to execute just before

and it just um rewrote them quite significantly. But this is really all

significantly. But this is really all you need. So um and then uh you have to

you need. So um and then uh you have to write your reducer first and the reducer is basically where you just say okay when an event is encountered uh how do I update my um how do I update my uh

state? you say okay when an agent input

state? you say okay when an agent input addit event um is encountered this here is using a little like zod helper library actually one that Misha made called sche match uh it's just uh it

just gives you basically safety on this payload here when I have an agent input addit event then I want my state to be updated as the current state plus the

history except the history also gets uh this little uh this little thing here uh here appended so we're um and and um the reason we're doing that is because uh

OpenAI's responses API um since we're making OpenAI processor, we're basically translating between our view of the world. We just have this like agent

world. We just have this like agent input and um it's just a string. Uh but

OpenAI wants you to put RO user and and then put the string in a content field.

So, we're putting that there. Um

and um then the other thing we need to do um when you use the OpenAI uh streaming API uh it'll it'll like basically chuck out tons and tons of

these uh output uh response um events and uh one of them finally at the very end will be um of type um

uh um um Yeah, I think this is a bit hogwash. Um,

huh.

Yeah. Yeah, exactly. It basically what we really want here is um uh

Oh, thank you.

Um, is that actually right?

Um no.

Um, let's just I don't suppose you have managed to get the I have pushed workshop 2 to the Yeah, but

the demo repo, but it's more or less as is.

streamed like so.

Yes.

Yeah. Let Okay. So, let's let's just like uh sort of like admit defeat on the thing that I was going to do and just like try try and show you how how how it work because we we can get something like very simple working, right? Because

we have the extremely simple ping pong processor that that does work. Uh and so here we can uh we can even uh just

um let's maybe actually start here.

Oh man, I hit I hit insufficient quote running my uh jup.

Yeah, we we do now have the same examples on the other repo, but it'll it'll Yeah, to be honest, I think uh I think we might have to just admit defeat and we can just talk about something else or

give you give you the time back just because the problem is we can't actually run these things. Uh and I think it would take me like probably like 10 minutes of actually concentrating to to

get them runnable. Um so maybe we should uh admit defeat on that. Um, I do wonder if there's like does,

you know, does it does it sort of make sense what the what the idea is, especially with with it being distributed. So the right that you can

distributed. So the right that you can say, okay, like I have my processor over here and your agent can use it and it can be on a different server or it can

actually be in a in a dynamic worker. Um

and um yeah I I think without showing it it doesn't really make sense.

So the idea here I was trying to understand with this stream processor framework events are

new.

Um consume the rural materialization side or part.

So every yeah so so everything that happens in the system every streaming chunk everything becomes a becomes an event in the stream and then you have reducers running in many different places like uh for for instance uh you know even this thing

with like is the stream paused or is the stream not paused the actual implementation of that is um maybe that maybe that's actually useful because this is running in production now uh like right like there is a circuit

breaker processor and and this is like how the whole thing is built even inside the event service and the circuit breaker processor has an initial state of paused false and it has pause reason

null and basically then it has a really really simple reducer um uh that just checks what are the last 100 time stamps of events I've seen if the last 100 time stamps I've seen seen don't go uh you

know don't go more than a second back then I'm just going to append append an event so the reducer basically just accumulates these timestamps and then the after append function uh here says okay if I if I have seen too many time

stamps in the last second uh I'm going to I'm going to throw the circuit breaker. So, this is like a real

breaker. So, this is like a real processor that runs inside my uh my um my uh service. But let me show you another one. Uh this one here, this UI,

another one. Uh this one here, this UI, this is also a stream processor. What

does this UI do? The UI takes all of these events and for some of the events that it deems like interesting, it chucks uh it basically it it basically projects them onto what we call feed

items. So, these things here that have like a slightly nicer rendering, they are feed items and they are not the same as events, but they're derived from events. So, uh, when you're making UI

events. So, uh, when you're making UI for this thing, you're also just reducing. Um, and because the reducer is

reducing. Um, and because the reducer is just a synchronous function, right? If

if if you knew, for example, I have I have this like a PI agent harness processor and it comes with a whole bunch of event event types. Um, and it has a reducer. Um, you can just in your

processor use their reducer. It's

practically free, right? Like you can just import it and and run it. Uh, and

then you can just sort of like start building abstractions that rely on their event types. Um, and so, um, I think

event types. Um, and so, um, I think what I'll what I'll what I what I will commit to doing because this was such a shitty experience is I will just do do the whole rundown in video form, uh, on

the on the internet. And that was one of the things I was going to show like you can just hook up Pi or Claude or or open code as well and um, just say, okay, like they come with a bunch of events

and you can store them in this event stream and you can react to them or or build on them. Um,

does that answer the question like where the reducer runs? Um, um, and and then like one of the things you can do with this is you can also actually just append an event that has some source

code in it and then it runs that particular processor literally automatically for you horizontally scalable. Um,

scalable. Um, it's just Yeah. Do you have the example of showing how you do that where you're appending like a Yeah, I do have that.

It's not runnable. I think the is somewhat.

Yeah. Well, actually we can do it here.

Uh yeah, I think I think maybe maybe that is good. Just like focus instead of trying to write code, we can focus on the things that might work. Um so let's we can create a new stream. By the way, all streams are created uh you know uh

completely implicitly. So you can just

completely implicitly. So you can just post to anything under the streams API and it'll it'll make you a new stream.

And then uh here there's in the UI and you can also play with this if you want to just see um there is a there's a bunch of sort of preset um preset events. Uh and one of them is here. You

events. Uh and one of them is here. You

can append an event of type dynamic worker configured to this thing. And uh

what does it have? It just has a script in it. And the script is a string. And

in it. And the script is a string. And

inside the string is a reducer uh and an after append hook. And so this is um literally a uh like um a stream a stream processor. It would be funny if this if

processor. It would be funny if this if this is the thing that works because this is I was basically I was basically up most of the night working on this because this is the most exciting bit.

But it wasn't really like workshop grade because it was, you know, you get to get like the first five concepts and then you're like, by the way, you can deploy this by just adding an event. Um, uh,

but if I, if I just say ping here, it says pong, right? That's pretty crazy.

Like I have here a stream uh that knows nothing about the world and then I have appended this event here that just has a little bit of JavaScript in it. And this

little bit of JavaScript uh just makes it respond with pong whenever it encounters something with ping. And then

there is actually a a version of this.

This is the thing that actually took took the night away so to speak is if you have these processor files that export a processor that you define with define processor. You can bundle that

define processor. You can bundle that into an event effectively. And this AI agent I also got running like that. But

of course you don't want to have your OpenAI API key in there. So that is why we have this like slightly awkward envir here where you can put your OpenAI API key outside of the stream and then um it

will basically when it makes a fetch request it'll substitute the secret in in the header. But um yeah, I'll definitely I'll definitely post a video of that because that's that's pretty wild. Like you you can write like the 40

wild. Like you you can write like the 40 lines of code required for a basic AI agent and then you can append that to any stream and then that stream becomes an AI agent.

I don't know. I'm sort of looking at like two or three nods but mostly blank blank stairs. It is very strange. Um but

blank stairs. It is very strange. Um but

uh but but I does this does this make sense what happened here at least because you can this one you can just play with like you can just go in the UI and so uh to some extent uh actually you

can uh you can make increasingly more complicated processors for your streams. uh you can just you can just code them in this input field if you want or uh you know um the only problem is it

cannot have any dependencies like if you want to npm packages you have to bundle it into this string which gets a bit more complicated but um yeah

yeah so this is the other thing like everything you might have noticed is everything has a slug it's just like a URL safe identifier and so the the event here is actually called dynamic worker

configured because you can override it and And what also already works with this event type is you can give instructions to an AI agent to make itself

different functionality just by calling append um with like a little bit of a different JavaScript. Um

and uh you know you you could you can quite easily imagine overcoming the bundling problem for example by just saying oh there's another event type which is called like a sort of like unbundled dynamic worker which just has

a package.json JSON file, a field, and a

a package.json JSON file, a field, and a script field. And then a different

script field. And then a different processor takes it, bundles it, makes the FAT event that has all of the bundle source code, and suddenly uh is is basically like a like IDE. And that's

then where you can you can hack on the agent harness without having to really like install anything or like bring servers into the

mix or or whatever. Um, but sadly was not able to show it. So, bummer.

Yeah.

Uh uh well, this is uh this is JavaScript.

There wouldn't be a typing typing error.

Um uh but it uh if there's any kind of error, the only thing that can possibly happen is that you get an event.

So there would have to be an error event. Um,

event. Um, and there's no there are no typing errors as implemented right now. But you

that's because this is a JavaScript evaluator, but you could also find a way to run a TypeScript compilation step on it first and then you could emit error saying there was a TypeScript compiler error. Then you could run it through an

error. Then you could run it through an LLM to try and fix it and emit a new event with the fixed code or do something else with it.

Yeah. Um the idea is that just everything that happens results in an event and then you can react to those events however you see fit.

Right.

Yes. So in this particular case it's it's dynamic workers in Cloudflare. So

it it just spins up a small small dynamic worker. Um, so basically like

dynamic worker. Um, so basically like the deployment story for a processor like this is or or like one way to think about it is uh at when when this was still working before I ask it to be

refactored and broken, I had um the way I would do it is I would locally on my computer I would do the SSE subscription and run my processor that way and then once I thought it was cool uh I could

either deploy it just as a web service on on Cloudflare workers or Versell or or wherever uh and and just tell uh tell the um agents the tell the streams to

notify me whenever a new event needs to be processed or alternatively just use this dynamic worker thing. Um, and like there's I don't know if this is going to work. This is why like at the moment

work. This is why like at the moment it's looking more like it's a really dumb idea I think because it's really not coming across. But the for example, you could have a plugin for your agent uh that runs on another computer, especially if it was like something like

really important like a prompt injection protection or something like that. Like

you could say, "Oh, okay. the way that my agent uses your prompt injection protection service. It can't really be

protection service. It can't really be through MCP because there's no way to to kind of like proactively hook into the agent loop or or at least not at the minute. But what it could be is it could

minute. But what it could be is it could say, "Okay, my my agent loop actually waits for up to like 200 milliseconds before making any new LLM request uh for any safety checkers to say and then can

just literally ask your safety checker over there. I don't care how you

over there. I don't care how you implement it. You could even charge me

implement it. You could even charge me for it, right? Um and I just don't yeah I just don't think that's currently possible in in any other way. That's why

I think it's quite important that the stream can reach out to the processors and say you got to do something you know you need to process some events now. Um

um there's a there's like a couple other uh potentially I don't I mean this might Yeah.

The idea one of the ideas you shared is my agent might use just in time services other agents are offering.

Yeah.

Then he might be charged with it.

Yeah. Well, I mean like probably in initially the human would be charged of it, but but this is just like there's no way like let's say okay so cloud code if I want to make a cloud code plugin um if

if I wanted to make that into a business think think about how you would have to do that at the moment it and you need to give some sort of instructions uh or or

something to to uh to claude code either to proactively call your MCP tools or or something of that nature or call maybe some CLI tool that was previously authenticated where you did like CLI log

to you know like prompt injectionp protector.com and then somehow it works where whereas um uh really I think you can just plug into the into the agent stream like into the event stream

directly like the the interface is just somebody somebody notices that claude is about to do something and chucks some more events in and and like they kind of all work that way but not quite like

it's uh it's sort of like you have to squint a lot to to see that that's what it all compiles down to. Um um another thing maybe worth pointing out for some

of you that have tried to like tried to make open code or or or PI plugins or whatever is there's no before hook. This

is very very important. There's no like um there is actually in the back end like our built-in processes. The only

difference to our processes and your processes is that we can actually stop events from ever being appended. For

example by a third party. So there's sort of like certain things that are just um that that need to happen before an event goes in the stream. But um broadly speaking, I'm very against before hooks.

Like I think there were some instances in in Open Claw, for example, some massive performance regressions and and cost increases where you know you can you can break uh um you can break context caching quite easily with before hooks. You can massively destroy

hooks. You can massively destroy performance. like it's just way better

performance. like it's just way better to think of the whole system as as being eventually consistent and uh you know um doing all these distributed systems things like saying okay we're going to wait for up to 200 milliseconds for

somebody to say I've got a little bit of information for you but then we're actually going to make the LLM request whether you come in with information or not because this is needs to be system so if you think for example about rag

pipelines and things like that you can easily have a little processor that says I have like a indexed version of the notion knowledge base or something and I'm just going to sit here and try to

squeeze in a little bit of extra context if I think it's relevant, but if I don't get there in time, it's like totally chill. The whole thing still works. It's

chill. The whole thing still works. It's

not like I' I've kind of delayed the um the agent or something. Um

yeah, you mean I mean the the the streaming database like I don't know I I I think um I think this could work on almost anything that is like durable stream shaped. So this

was just for the purposes of this um exercise. Uh I do think like there is

exercise. Uh I do think like there is potentially an interesting scenario where uh like where a like it's not even really like it's a combination of a Q and a pub subsystem

and and like a streaming database and uh that can also run code for you. But but

that that sort of thing is an interesting infrastructure primitive that uh maybe somebody could make. Um I

do think that uh yeah but in in principle like I I don't know if you don't know about it yet you can read up on this durable streams uh API specification that somebody came out

with. We don't follow it exactly but uh

with. We don't follow it exactly but uh close enough and it's it's basically like a like a very common old idea like

um the thing that is more new is that it would be like you would also have push subscriptions out of it but there are some things that do is there's a thing called n like jetreamenat or something

uh that that does it um yeah are you talking about the electric core yeah exactly they they have durable streams And like a durable stream is just like

an appendon event log with with offset tracking and and the the the you know they want the client to track the last consumed offset. That's also how say

consumed offset. That's also how say Kafka works or something like that. But

you can also do it the other way around and have the server track the offsets and those are more normally like there's Google Cloud and AWS they have these pub subsystems where they say oh we'll just

like give you the next event and we'll keep track of of the offset for you.

Um Yeah.

And then anyone could essentially pass a stream that would go to my generically.

Oh, I mean you would need to have authentication on it, right? Right. Like

this this entire thing would never uh like right. like this. Uh I was thinking

like right. like this. Uh I was thinking like it might be fun to just say there's a do that yet basically.

Yeah, exactly. It might be fun to just say there's a public namespace and it just gets cleared every hour or something, right? Because it is kind of

something, right? Because it is kind of nice that anybody anywhere on the internet can just curl a thing or like copy and paste a curl from somewhere and and suddenly have like a flavor of agent. I do think that's quite

agent. I do think that's quite interesting, but it just needs to be deleted every like quite aggressively.

Um yeah, but but I think I think that's more or less a solved problem. It's like

a bit gnarly, but basically the way I am envision it you would do it um if you did this properly is on each event you basically have like client provenence information. We say like who was this

information. We say like who was this like what were they able to do like how how sure are we that these people you know the HTTP client or the the stream client was actually entitled to do this

operation.

And and again, you can model all of that as events, right? Like you can you would just have an event that is like make this stream public and until you make

that event, it can only be uh written to by whoever created it or something.

Um, right. I might ask uh um Sean if we can

right. I might ask uh um Sean if we can instead of this video do a video we'll record a little bit later today um that shows it more nicely because I do think it's worth showing.

Just a little bit too short short notice.

Shall we just wrap it then? If uh I I don't mean to keep you all of your uh like 10 minute late longer break or if anybody wants to chat.

Loading...

Loading video analysis...