Basic Data Types
In sec some basic data types surface in many use cases when interacting with KurrentDB. In order to establish some basics these are described in the following - More details about the particulars of these types are in API documentation.
StreamId
Streams in KurrentDB have stream identifiers that can be classified as user defined and system defined.
In KurrentDB streams prefixed with $ are reserved for the system, for instance $settings.
Furthermore, KurrentDB also has a concept of metadata streams for streams. Metadata streams for system streams are prefixed
with $$, e.g. the corresponding metadata stream for $settings is $$$settings. Metadata streams for user defined
streams are prefixed with a $, e.g. user_stream has a corresponding metadata stream named $user_stream.
In sec a stream identifier is an ADT called StreamId with variants Id and MetaId - Id
has two variants, Normal and System. The stream identifiers that a user can create are Normal and System,
this is done with StreamId.apply that returns an Either[InvalidInput, Id]. Some examples of StreamId construction
are:
import sec.StreamId
val user = StreamId("user_stream") // Right(Normal("user_stream")
val system = StreamId("$system_stream") // Right(System("system_stream"))
// MetaId for above you get from the metaId method.
// The render method displays the stream identifier as KurrentDB sees it.
user.map(_.metaId.render) // Right("$$user_stream")
system.map(_.metaId.render) // Right("$$$system_stream")
// Invalid stream identifiers
StreamId("") // Left(InvalidInput("id cannot be empty"))
StreamId("$$oops") // Left(InvalidInput("value must not start with $$, but is $$oops"))
Moreover, a few common system stream identififiers are located in the StreamId companion object, for instance:
import sec.StreamId
StreamId.All.render // $all
StreamId.Scavenges.render // $scavenges
StreamId.All.metaId.render // $$$all
StreamId.Scavenges.metaId.render // $$$scavenges
StreamPosition
When you store an event in KurrentDB it is assigned a stream position in the individual stream it belongs to.
In sec a stream position is an ADT called StreamPosition with two variants, StreamPosition.Exact that contains
a Long and StreamPosition.End that is an object representing the end of a stream.
A StreamPosition that you can create is Exact and this is done with StreamPosition.apply that returns an
Exact. Examples of StreamPosition construction:
import sec.StreamPosition
StreamPosition.Start // Exact(0)
StreamPosition(1L) // Exact(1)
StreamPosition.End // End
One use case where you need to construct a StreamPosition is when you to store a pointer of the last processed
event for a particular stream as a Long in a read model and, e.g. after a restart of your application,
need to resume reading from KurrentDB.
LogPosition
All events in KurrentDB have a logical position in the global transaction log. A logical position consists of a commit position value
and a prepare position value. In sec this is modelled as an ADT called LogPosition with two variants, LogPosition.Exact
that contains two Long values and LogPosition.End representing the end of the log.
A LogPosition that you can create from Long values is Exact, this is done with LogPosition.apply that returns an
Either[InvalidInput, Exact]. Examples of LogPosition construction:
import sec.LogPosition
LogPosition.Start // Exact(0, 0)
LogPosition.End // End
LogPosition(1L, 1L) // Right(Exact(1, 1))
LogPosition(0L, 1L) // Left(InvalidInput("commit must be >= prepare, but 0 < 1"))
Cases where you construct a LogPosition is similar to that of StreamPosition, maintaining a pointer of last
processed event. However, here you keep a pointer to the global log, StreamId.All, instead of an individual
stream.
A note on Long usage in positions
KurrentDB uses uint64 that Java does not have a corresponding type for. In order to work around that fact sec has a
type called ULong that is able to represent uint64 by wrapping a regular Long and use this for StreamPosition.Exact
and LogPosition.Exact. As sec provides an instance of cats.Order for ULong it is possible to compare positions larger than Long.MaxValue as well as positions in [0, Long.MaxValue].
Moreover, this means that you can store a LogPosition.Exact pointer for your read model by using toLong on its ULong values. The ULong.toLong method might yield negative values, this is fine as when LogPosition.Exact is constructed again it has an cats.Order instance that works for all numbers in uint64.
StreamState
Some operations, such as appending events to a stream, require that you provide an expectation of what state
the stream currently is in. If the stream state does not fullfil that expectation then an exception will be raised by KurrentDB.
In sec this expected stream state is represented by the ADT StreamState that has four variants:
NoStream- The stream does not exist yet.Any- No expectation about the current state of the stream.StreamExists- The stream, or its metadata stream, is present.StreamPosition.Exact- The stream exists and its last written stream position is expected to beExact.
The StreamState expectation can be used to implement optimistic concurrency. When you retrieve a stream from KurrentDB,
you can take note of the current stream position, then when you append to the stream you can determine if the stream has
been modified in the meantime.
EventData
The event data you store in KurrentDB is composed of an event type, an event id, payload data, metadata and a content type.
In sec this is modelled as EventData with types that are explained below.
EventType
An event type should be supplied for your event data. This is a unique string used to identify the type of event you are saving. One might be tempted to use language runtime types for event types as it might make marshalling more convenient. However, this is not recommended as it couples storage to your types. Instead, you can use a mapping between event types stored in KurrentDB and your concrete runtime types.
The string that KurrentDB uses for the event type is modelled in sec as an ADT with two main variants
Normal and System. The type that you can create is Normal and this is done with EventType.apply
that returns Either[InvalidInput, Normal]. Input is validated for emptiness and not starting with $ that KurrentDB
uses for reserved system defined types such as $> and $metadata.
Examples of EventType construction:
import sec.EventType
EventType("foo.bar.baz") // Right(Normal("foo.bar.baz")
EventType("") // Left(InvalidInput("Event type name cannot be empty"))
EventType("$@") // Left(InvalidInput("value must not start with $, but is $@"))
Common system types are located in the companion of EventType, some examples are:
import sec.EventType
EventType.LinkTo.render // $>
EventType.StreamMetadata.render // $metadata
EventType.Settings.render // $settings
EventType.StreamReference.render // $@
EventId
The format of an event identifier is a uuid and is used by KurrentDB to uniquely identify the event you are trying to append. If two events with the same uuid are appended to the same stream in quick succession KurrentDB only appends one copy of the event to the stream. More information about this is in the KurrentDB docs.
Data
The data field on EventData is a scodec ByteVector encoded representation
of your event data. If you store your data as JSON you can make use of KurrentDB functionality for projections.
However, it is also common to store data in a protocol buffers format.
Metadata
It is common to store additional information along side your event data. This can be correlation id, timestamp, audit,
marshalling info and so on. KurrentDB allows you to store a separate byte array containing this information to keep data
and metadata separate. These extra bytes are stored in a ByteVector field of EventData called metadata.
ContentType
The data and metadata fields on EventData have a content type, ContentType, with
variants Binary and Json. This is used to provide KurrentDB information about whether the data is stored as json or as
binary. As you might have noticed, both ByteVector fields of EventData share the same content type, this is because
KurrentDB does not support different content types for data and metadata.
When EventType is translated to the protocol of KurrentDB it becomes application/octet-stream for Binary and
application/json for Json. In the future there might come more content types, e.g. something that corresponds to
application/proto or application/avro.
Event
Event data arriving from KurrentDB either comes from an individual stream or from the global log. An event is encoded as an
ADT called Event that has the variants EventRecord and ResolvedEvent.
An EventRecord consists of the following data types:
streamId: StreamId- The stream the event belongs to.streamPosition: StreamPosition.Exact- The stream position of the event in its stream.logPosition: LogPosition.Exact- The position of the event in the global stream.eventData: EventData- The data of the event.created: ZonedDateTime- The time the event was created.
A ResolvedEvent is used when consuming streams that link to other streams. It consists of:
event: EventRecord- The linked event.link: EventRecord- The linking event record.
See the API docs for various methods defined for Event.
Later on when using the EsClient API, you will learn about reading from streams and instruct KurrentDB to
resolve links such that you get events of type ResolvedEvent back.