5-Operators and Composition in Our System – Building a Reactive System in TypeScript



This content originally appeared on DEV Community and was authored by Michael De Abreu

To recap, we have built and optimized a reactive system that can store variables and react to updates and event emissions, update other states, and perform side-effect calls. The journey has been incredible, and I hope this has helped you to understand more about how reactive systems work under the hood, but all things must come to an end. This will be the final post of the series, where we will explore how to create operators for our system. This will also be a long post since everything is related to operators, and I didn’t want to break the flow.

Defining an operator

If you are familiar with other reactive libraries, you have probably seen these functions that allow you to transform values in a data stream. These functions are called operators. An operator should be a function that takes a transformation function and returns another function that gets the internal stream, applies the transformation, and returns a new stream.

export type OperatorFn<T, R = T> = (source: Subscribable<T>) => Event<R>;

We could also apply the transformation directly without involving the stream system, but then the operators would be synchronous, and that wouldn’t be much fun, would it? By keeping the stream, we can use it to create async operators easily; more on that later.

Coding our first operator

If I mention transforming a data stream, I bet you are thinking about the map method of an array, and you’d be right. The first operator we will create is a map operator. Let’s start by defining the test:

describe("map", () => {
  it("should apply map correctly", () => {
    const source = event<number>();

    const mapped1 = map((x: number) => x * 2)(source);
    const mapped2 = map((x: number) => `${x}`)(mapped1);

    const result = mock();

    mapped2.subscribe(result);

    source(2);

    expect(result).toHaveBeenCalledWith("4");
  });
});

We want the operator to be composable. This first callback will create the signature of the OperatorFn that we defined, something we can reuse to call the operator and create a new stream.

Let’s start by creating the map function in its own file:

export function map<T, R>(fn: (value: T) => R): OperatorFn<T, R> {
}

Since we want to return an operator function, we need something like this:

return (source) => {
  const mappedEvent = event<R>();
  return mappedEvent;
};

We return a function that creates a new event and returns it, but it’s not transforming the output yet. To do that, we need to use the fn callback we’re passing to the operator.

source.subscribe((value) => mappedEvent(fn(value)));

With this, we subscribe to the input source of the operator and emit transformed values into the new event. Finally, this is what our first operator looks like:

export function map<T, R>(fn: (value: T) => R): OperatorFn<T, R> {
  return (source) => {
    const mappedEvent = event<R>();
    source.subscribe((value) => mappedEvent(fn(value)));
    return mappedEvent;
  };
}

This operator emits a new value each time the input is updated. However, another useful method the Array prototype has is filter, which allows us to get a new array with only the values we want, based on a predicate function.

The filter operator

To create our second operator, we’ll start again by writing the tests first:

describe("filter", () => {
  it("should apply filter correctly", () => {
    const source = event<number>();
    const filtered = filter((x) => x > 5)(source);

    const result = mock();

    filtered.subscribe(result);

    source(3);
    expect(result).not.toHaveBeenCalled();
    source(7);
    expect(result).toHaveBeenCalledWith(7);
  });
});

This operator is very similar to map, so I’ll skip the step-by-step explanation and show the complete implementation instead.

export function filter<T>(predicate: (value: T) => boolean): OperatorFn<T> {
  return (source) => {
    const filteredEvent = event<T>();
    source.subscribe((value) => {
      if (predicate(value)) {
        filteredEvent(value);
      }
    });
    return filteredEvent;
  };
}

The only difference between the operators lies in their behavior. Instead of transforming a value, this operator evaluates whether the value should be emitted.

With this, we now have two operators, map and filter, and the result of each can be used as the source for another. We can leverage that to create a way to chain operators more naturally.

Composing with pipe

Composition is a fundamental part of any reactive system, and operator chaining is the most natural way to express transformations and logic over time. Instead of creating one-off streams and wiring them manually, we can define a clear and readable sequence of operations, just like we do with array methods. I think it would be better if we could chain them naturally, like this:

const result = source.pipe(
  map((x) => x * 2),
  filter((x) => x > 5)
);

This might seem like magic, but all we really need to do is apply the operators one after another, like we’ve been doing manually. We’ll just take advantage of the fact that the output of one stream is the input of the next. It’s not magic, but it’s going to look a lot cleaner.

function pipe<R>(...operators: OperatorFn<any, any>[]): Event<R> {
  const pipeEvent = event<unknown>();
  subscribe(pipeEvent);
  const resultEvent = operators.reduce(
    (acc, op) => op(acc),
    pipeEvent,
  ) as Event<R>;
  return resultEvent;
}

Thanks to the composable API we’ve built, we can chain all the operators into a resulting event simply by creating a new event that subscribes to the source and using it as the initial value in the reducer. That result is what we return from the pipe function.

To integrate this new method in each creator, we just need to add it. By default, it will always return an event, but you could tweak it per creator to make it return the same type. I’ll leave that part up to you, and instead, I’ll show how you can integrate it into event; similarly, you can add it to other creators.

export function event<T = void>(): Event<T> {
  // Same code we had; we only need to update the return.
  return Object.assign(emit, { subscribe, toState, pipe }) as Event<T>;
}

With that, any state, event, or computed we create will have a pipe method that we can use to chain our operators. We’ve got two basic operators, but let’s build a few more to show what this system is capable of.

Other operators

Debounce

This operator won’t emit a new value until a specified amount of time has passed.

export function debounce<T>(delay: number): OperatorFn<T> {
  return (source) => {
    const debouncedEvent = event<T>();
    let timeoutId: ReturnType<typeof setTimeout> | undefined;

    source.subscribe((value) => {
      if (timeoutId != null) {
        clearTimeout(timeoutId);
      }

      timeoutId = setTimeout(() => {
        debouncedEvent(value);
        timeoutId = void 0;
      }, delay);
    });

    return debouncedEvent;
  };
}

Merge

This operator merges two sources into a single stream:

export function merge<S, O>(other: Subscribable<O>): OperatorFn<S, S | O> {
  return (source) => {
    const mergedEvent = event<S | O>();
    source.subscribe(mergedEvent);
    other.subscribe(mergedEvent);
    return mergedEvent;
  };
}

MapAsync

This one is my personal favorite. It allows you to map your source into a promise and receive its result asynchronously.

export function mapAsync<T, R, E = unknown>(
  promiseFn: (value: T) => Promise<R>,
): OperatorFn<T, { data?: T; loading?: boolean; error?: E; }> {
  return (source) => {
    const out = event<{ data?: T; loading?: boolean; error?: E; }>();
    let currentFetchId = 0;
    source.subscribe(async (value) => {
      currentFetchId++;
      const fetchId = currentFetchId;
      out({ loading: true });
      try {
        const result = await promiseFn(value);
        if (fetchId === currentFetchId) {
          out({ data: result });
        }
      } catch (error: any) {
        if (fetchId === currentFetchId) {
          out({ error });
        }
      }
    });
    return out;
  };
}

So simple, yet so useful, and incredibly powerful.

One more thing…

If you’ve been following along, you’ve probably noticed TypeScript complaining about the pipe types. We did not define any types for it, so of course, it will complain. However, typing a pipe method is complex because it requires a level of recursion that can even push TS to its limits. This is the most complex part of this series, and it’s not even about code but about the shape of the types themselves. What I did is similar to what other libraries like rxjs have done: overloads. A lot of overloads.

PipeFn types

export interface PipeFn<T, S extends Pipeable<any>> {
  (): Event<R>;
  <A>(op1: OperatorFn<NonNullable<T>, A>): Event<R>;
  <A, B>(op1: OperatorFn<NonNullable<T>, A>, op2: OperatorFn<A, B>): Event<R>;
  <A, B, C>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
  ): Event<R>;
  <A, B, C, D>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
  ): Event<R>;
  <A, B, C, D, E>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
  ): Event<R>;
  <A, B, C, D, E, F>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
  ): Event<R>;
  <A, B, C, D, E, F, G>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H, I>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
    op9: OperatorFn<H, I>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H, I, J>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
    op9: OperatorFn<H, I>,
    op10: OperatorFn<I, J>,
  ): Event<R>;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  <R>(...ops: OperatorFn<any, any>[]): Event<R>;
}

export interface Pipeable<T> {
  pipe: PipeFn<T, this>;
}

That’s all, folks!

And with that, we’ve reached the end of the series. We started with something as simple as a counter, and we ended up building a complete reactive system, with state, events, operators, async transformations and functional composition.

There’s still plenty we could explore to continue the journey, including more advanced topics like batch updates, dev tools, dependency graph, performance optimizations, and integrations with other libraries. But that would go far beyond the scope and simplicity of what we’ve built so far.

Thank you so much for joining me. I hope you’ve enjoyed the process as much as I did. Now that you know how this works, you might even come up with better ideas than mine, and that would be awesome. I can only hope you’ll share them. Until next time.


This content originally appeared on DEV Community and was authored by Michael De Abreu