# I am the Watcher. I am your guide through this vast new twtiverse.
# 
# Usage:
#     https://watcher.sour.is/api/plain/users              View list of users and latest twt date.
#     https://watcher.sour.is/api/plain/twt                View all twts.
#     https://watcher.sour.is/api/plain/mentions?uri=:uri  View all mentions for uri.
#     https://watcher.sour.is/api/plain/conv/:hash         View all twts for a conversation subject.
# 
# Options:
#     uri     Filter to show a specific users twts.
#     offset  Start index for quey.
#     limit   Count of items to return (going back in time).
# 
# twt range = 1 13
# self = https://watcher.sour.is/conv/fuhaoaa
Hi, I am playing with making an event sourcing database. Its super alpha but I thought I would share since others are talking about databases and such.

It's super basic. Using tidwall/wal as the disk backing. The first use case I am playing with is an implementation of msgbus. I can post events to it and read them back in reverse order.



I plan to expand it to handle other event sourcing type things like aggregates and projections.

Find it here: sour-is/ev

@prologic @movq @lyse
@xuu Nice!
@xuu Nice!
Very interesting implementation 🤔
Very interesting implementation 🤔
Thanks, @xuu, I'll try to check it out at the weekend.
I have updated my eventDB to have subscriptions! It now has websockets like msgbus. I have also added a in memory store that can be used along side the disk backed wal.
I have updated my eventDB to have subscriptions! It now has websockets like msgbus. I have also added a in memory store that can be used along side the disk backed wal.
@xuu Nice! 👌
@xuu Nice! 👌
Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.


type PA[T any] interface {
\tevent.Aggregate
\t*T
}

// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
\tctx, span := logz.Span(ctx)
\tdefer span.End()

\tagg = new(A)
\tagg.SetStreamID(streamID)

\tif err = es.Load(ctx, agg); err != nil {
\t\treturn
\t}

\tif err = event.NotExists(agg); err != nil {
\t\treturn
\t}

\tif err = fn(ctx, agg); err != nil {
\t\treturn
\t}

\tvar i uint64
\tif i, err = es.Save(ctx, agg); err != nil {
\t\treturn
\t}

\tspan.AddEvent(fmt.Sprint("wrote events = ", i))

\treturn
}

fig. 1

This lets me do something like this:


a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
\t\treturn agg.OnUserRegister(nick, key)
})

fig. 2

I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.
Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.


type PA[T any] interface {
	event.Aggregate
	*T
}

// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
	ctx, span := logz.Span(ctx)
	defer span.End()

	agg = new(A)
	agg.SetStreamID(streamID)

	if err = es.Load(ctx, agg); err != nil {
		return
	}

	if err = event.NotExists(agg); err != nil {
		return
	}

	if err = fn(ctx, agg); err != nil {
		return
	}

	var i uint64
	if i, err = es.Save(ctx, agg); err != nil {
		return
	}

	span.AddEvent(fmt.Sprint("wrote events = ", i))

	return
}

fig. 1

This lets me do something like this:


a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
		return agg.OnUserRegister(nick, key)
})

fig. 2

I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.
Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.


type PA[T any] interface {
\tevent.Aggregate
\t*T
}

// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
\tctx, span := logz.Span(ctx)
\tdefer span.End()

\tagg = new(A)
\tagg.SetStreamID(streamID)

\tif err = es.Load(ctx, agg); err != nil {
\t\treturn
\t}

\tif err = event.NotExists(agg); err != nil {
\t\treturn
\t}

\tif err = fn(ctx, agg); err != nil {
\t\treturn
\t}

\tvar i uint64
\tif i, err = es.Save(ctx, agg); err != nil {
\t\treturn
\t}

\tspan.AddEvent(fmt.Sprint("wrote events = ", i))

\treturn
}

fig. 1

This lets me do something like this:


a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
\t\treturn agg.OnUserRegister(nick, key)
})

fig. 2

I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.