Hoodie provides the following primitives over datasets on HDFS
- Upsert (how do I change the dataset?)
- Incremental consumption (how do I fetch data that changed?)
In order to achieve this, Hoodie maintains a
timeline of all activity performed on the dataset, that helps provide
instantaenous views of the dataset,
while also efficiently supporting retrieval of data in the order of arrival into the dataset.
Such key activities include
COMMITS- A single commit captures information about an atomic write of a batch of records into a dataset. Commits are identified by a monotonically increasing timestamp, denoting the start of the write operation.
CLEANS- Background activity that gets rid of older versions of files in the dataset, that are no longer needed.
COMPACTIONS- Background activity to reconcile differential data structures within Hoodie e.g: moving updates from row based log files to columnar formats.
Example above shows upserts happenings between 10:00 and 10:20 on a Hoodie dataset, roughly every 5 mins, leaving commit metadata on the hoodie timeline, along
with other background cleaning/compactions. One key observation to make is that the commit time indicates the
arrival time of the data (10:20AM), while the actual data
organization reflects the actual time or
event time, the data was intended for (hourly buckets from 07:00). These are two key concepts when reasoning about tradeoffs between latency and completeness of data.
When there is late arriving data (data intended for 9:00 arriving >1 hr late at 10:20), we can see the upsert producing new data into even older time buckets/folders. With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume only the changed files without say scanning all the time buckets > 07:00.
Hoodie storage types capture how data is indexed & laid out on the filesystem, and how the above primitives and timeline activities are implemented on top of such organization (i.e how data is written). This is not to be confused with the notion of Read Optimized & Near-Real time tables, which are merely how the underlying data is exposed to the queries (i.e how data is read).
Hoodie (will) supports the following storage types.
|Storage Type||Supported Tables|
|Copy On Write||Read Optimized|
|Merge On Read||Read Optimized + Near Real-time|
- Copy On Write : A heavily read optimized storage type, that simply creates new versions of files corresponding to the records that changed.
- Merge On Read : Also provides a near-real time datasets in the order of 5 mins, by shifting some of the write cost, to the reads and merging incoming and on-disk data on-the-fly
Regardless of the storage type, Hoodie organizes a datasets into a directory structure under a
very similar to Hive tables. Dataset is broken up into partitions, which are folders containing files for that partition.
Each partition uniquely identified by its
partitionpath, which is relative to the basepath.
Within each partition, records are distributed into multiple files. Each file is identified by an unique
file id and the
produced the file. Multiple files can share the same file id but written at different commits, in case of updates.
Each record is uniquely identified by a
record key and mapped to a file id forever. This mapping between record key
and file id, never changes once the first version of a record has been written to a file. In short, the
file id identifies a group of files, that contain all versions of a group of records.
Copy On Write
As mentioned above, each commit on Copy On Write storage, produces new versions of files. In other words, we implicitly compact every commit, such that only columnar data exists. As a result, the write amplification (number of bytes written for 1 byte of incoming data) is much higher, where read amplification is close to zero. This is a much desired property for a system like Hadoop, which is predominantly read-heavy.
Following illustrates how this works conceptually, when data written into copy-on-write storage and two queries running on top of it.
As data gets written, updates to existing file ids, produce a new version for that file id stamped with the commit and inserts allocate a new file id and write its first version for that file id. These file versions and their commits are color coded above. Normal SQL queries running against such dataset (eg: select count(*) counting the total records in that partition), first checks the timeline for latest commit and filters all but latest versions of each file id. As you can see, an old query does not see the current inflight commit’s files colored in pink, but a new query starting after the commit picks up the new data. Thus queries are immune to any write failures/partial writes and only run on committed data.
The intention of copy on write storage, is to fundamentally improve how datasets are managed today on Hadoop through
- First class support for atomically updating data at file-level, instead of rewriting whole tables/partitions
- Ability to incremental consume changes, as opposed to wasteful scans or fumbling with heuristical approaches
- Tight control file sizes to keep query performance excellent (small files hurt query performance considerably).
Merge On Read
Merge on read storage is a superset of copy on write, in the sense it still provides a read optimized view of the dataset via the Read Optmized table.
But, additionally stores incoming upserts for each file id, onto a
row based append log, that enables providing near real-time data to the queries
by applying the append log, onto the latest version of each file id on-the-fly during query time. Thus, this storage type attempts to balance read and write amplication intelligently, to provide near real-time queries.
The most significant change here, would be to the compactor, which now carefully chooses which append logs need to be compacted onto
their columnar base data, to keep the query performance in check (larger append logs would incur longer merge times with merge data on query side)
Following illustrates how the storage works, and shows queries on both near-real time table and read optimized table.
There are lot of interesting things happening in this example, which bring out the subleties in the approach.
- We now have commits every 1 minute or so, something we could not do in the other storage type.
- Within each file id group, now there is an append log, which holds incoming updates to records in the base columnar files. In the example, the append logs hold all the data from 10:05 to 10:10. The base columnar files are still versioned with the commit, as before. Thus, if one were to simply look at base files alone, then the storage layout looks exactly like a copy on write table.
- A periodic compaction process reconciles these changes from the append log and produces a new version of base file, just like what happened at 10:05 in the example.
- There are two ways of querying the same underlying storage: ReadOptimized (RO) Table and Near-Realtime (RT) table, depending on whether we chose query performance or freshness of data.
- The semantics around when data from a commit is available to a query changes in a subtle way for the RO table. Note, that such a query running at 10:10, wont see data after 10:05 above, while a query on the RT table always sees the freshest data.
- When we trigger compaction & what it decides to compact hold all the key to solving these hard problems. By implementing a compacting strategy, where we aggressively compact the latest partitions compared to older partitions, we could ensure the RO Table sees data published within X minutes in a consistent fashion.
The intention of merge on read storage, is to enable near real-time processing directly on top of Hadoop, as opposed to copying data out to specialized systems, which may not be able to handle the data volume.