-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Is zstd splitabble in hadoop/spark/etc? #395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
We have plan to re-use and extend the skip format based on skippable frames implemented in |
Hi, @trixpan, I talked to the 4mc's author, It would be possible to write metadata in zstd skippable frames and make the whole file splittable by hadoop(need a special InputFormat). But the idea would be same with 4mc in the essential. |
@advancedxy I saw 4mc previously. I think people's concern about 4mc is the limited community (although I must confess compression algorithms don't strike me as pieces of code with hundreds of contributors) |
Is the format in which pzstd uses splittable frames official? If it is then I can update the hadoop project to utilize anything compressed with zstd-cli then copied into hadoop. Have you guys decided or finalized the format of pzstd files? Thank you |
We don't have yet a splittable format. This is an active topic, we are working on it, and expect to make progresses throughout March for this objective. The final format will likely be different from current |
@Cyan4973 thanks for the update, I think having zstd splittable in hadoop would be very attractive to a lot of users. I would like to be involved if possible discussing the proposed splittable format. Thank you. |
cc @iburinoc |
I definitely agree with this approach: 4mc is nice but we need this as mainstream in zstd. |
Agreed. Please involve me with discussion and implementation if possible. |
Here's an overview of our current thoughts on how to implement this, any feedback is appreciated! Splittable/Seekable FormatA splittable and/or seekable Zstandard format could be useful in a variety The proposed general format of a Zstandard frame would consist of a Given this outline, there's a few choices left to be made: Jump Table PositionWhere in the file should the jump table be placed?
The end of file approach seems best as I'm not aware of any scenarios Jump Table FormatThe format of the Jump Table would be a Zstandard skippable frame, Potentially useful data to have in the table is, for each frame:
Compressed SizeThere are two main options for storing the compressed size:
Thoughts: we could leave this as an option to be chosen by a flag, Uncompressed SizeHere we have the same offset/size problem as with compressed size, so that section more or less applies to both. Another point with uncompressed size is that it's likely that in many cases, I think the optional solution for uncompressed size would be to resolve the ChecksumIt might be desirable to have a checksum accompany each frame so we can So the questions to be answered here are: Do we include a checksum? Which algorithm should be used? Should it be over the compressed or uncompressed data? I think uncompressed data makes more sense, especially if we're mixing various Dictionary CompressionOne way to recoup the compression-ratio losses of splitting the input Some obvious impacts this usage could have are:
We should make sure to put some thought into these and other possible related situations |
I definitely like this. Here's my 2 comments/preferences:
|
My 2 cents:
|
I'm not sure no flags is the best option, it leaves very little room to expand. Potential flags I can think of currently:
Since the implementation will be in the library itself, we can handle some complexity there, and we want to make sure we have the flexibility to handle all use-cases that could benefit from this. |
On second thought, It's indeed better to use flags to handle various use cases. |
One scenario where seeking is not possible is when reading from a pipe. Granted, then, also, no frame can be skipped. But a performance improvement can be achieved by not processing frames one is not interested in. As there are distinct advantages and limitations to having index frames prepended or appended, it might be useful to provide both. It is already possible to make a distributed index by prepending by using For appending, I think that a similar simple format would be best. |
I've created a first round implementation. The format description is here, comments/feedback is appreciated. As for the API, the planned compression API looks like this:
which looks similar to the regular On the decompression side it's a bit more complex, since a decompressor needs to be able to seek. There's a few cases I can think of here that we can provide for an API:
Other relevant notes here:
|
folks would you mind if I ask you to clarify what is current status of this? Reason I ask is because I got quite confused trying to use git commits to understand what is going on... 😃 eg. despite multi-threading now being part of the main tool, pzstd still included as part of the contrib source directory and while pzstd refer to Skippable format, the RFC related to this ticket suggest the Seekable format. I am confused. 😃 |
The proposed format is implemented in this demo :
|
@iburinoc Direct lookup of the seek table is going to be critical to the Hadoop InputFormat case, correct? Since some byte range spanning one or more chunks is going to be distributed as an InputSplit to different hosts. |
@adamkennedy The seek table itself won't be split up though, and it's not hard to traverse the seek table to convert it into a direct lookup to determine what data is where in the source data. |
I solved similar problem - break large zstd/lz4/lzma files for parallel processing, by extending zstd/lz4 framing format. I am breaking input data into large chunks (recomending 1MB), with each rec reside in full inside chunk (record not broken between chunks). Each chunk is compressed independently (lz4 frame, zstd frame, or lzma (no frame), prepended by transport skippable frame and sequentially written. Transport frame contains size of compressed, size of uncompressed data, type of compression (I support lz4, zstd, lzma, and it can be a mix of compressions in file). And custom data tags. Transport frame protected by CRC. |
It would be great if the seekable format plays well with the tabix index format (https://samtools.github.io/hts-specs/tabix.pdf ) widely used in bioinformatics; this would speed up a bunch of workflows by up to ~2-3x over BGZF. (I'd be happy to write some tabix indexing and seeking code, if the basic library functions are all ready.) |
@chrchang : Would you mind testing the experimental splittable format API for tabix use case ? The goal would be to know if this API is suitable enough for such usage, and if not or partially, how to make it evolve to better fit the need. |
Experimental splittable API was not enough for me. I implemented my own extended version of splittable API. Here are reasons and some details:
Experimental splittable API is of no use for me. I am searching for transport frames, read block, and inside block do data scan. That guarantee that I am always have valid record boundaries. Splittable API returns data from rather arbitrary offset - you have to guess record boundaries. As additional feature, my API transparently support zstd, lz4 and lzma compressions. And uncompressed data. File can have several compression formats inside. Each compression block have it's own compression format, and API hide compression type (or absence of compression) from client. I implemented API with a lot of multithreading, and I am not hiding parallel nature of compressor/decompressor. That allow for more direct integration with multithreaded clients. I communicate with clients with blocks of data, corresponding to compression blocks in file. That was interesting to hear about tabbix format. I think I implemented a small subset of tabbix format. |
Okay, after taking a look at the current seekable API, it appears to be adequate for a basic single-threaded tabix implementation, but it's missing parallel-compression and decompression-stream support which would really make seekable .zst files a joy to work with. It looks like parallel compression wouldn't take much work to add: just create a version of ZSTD_seekable_compressStream that calls ZSTD_compress_generic? Possible decompression stream interface (which I've previously written for BGZF files): during decompressor initialization, you pass an additional uint64_t* containing a sequence of uint64 [decompressed start pos, decompressed end pos) intervals (not necessarily in increasing-position order, and not necessarily disjoint), as well as the number of such intervals. (So if there are N intervals, the uint64_t* points to a length-2N array.) A compile-time constant can be defined to denote the decompressed file size, so N=1 with a [0, DECOMPRESSED_FILE_SIZE) interval would request a normal decompression stream. Ideally, initialization succeeds on a non-seekable zstd file when the intervals are disjoint and in increasing order. A nice property of this interface is that it supports parallel decompression, though that can be left out of the initial implementation. |
Trying to lay an egg here. I'm writing Rust bindings to this seekable API, in order to use it as the new patch format for Pijul (https://pijul.org). I can read from files fine (using |
@P-E-Meunier , could you describe a test case ? It could be a bug, or an incorrect API usage, suggesting an incomplete documentation. |
My binding is on https://nest.pijul.com/pmeunier/zstd-seekable. The test is: #[test]
fn test() {
let mut cstream = SeekableCStream::new(10, 256).unwrap();
let input = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Ut diam ante, sollicitudin a dolor et, volutpat elementum nulla. Etiam nec efficitur nibh, quis rutrum risus. Maecenas quis lorem malesuada, aliquet mi vel, viverra nunc. Donec et nulla sed velit sagittis varius. Suspendisse vestibulum, neque lobortis ornare vestibulum, orci turpis vulputate nisi, ut sodales neque purus eget magna. Nunc condimentum, diam eu consequat venenatis, est nisl semper lorem, et lobortis velit justo sed nulla. Nunc sit amet tempor nunc, vel posuere ipsum. Cras erat tortor, pulvinar ac pretium eu, auctor ac nibh. Duis iaculis porta magna, eu lobortis elit. Duis vitae massa eros. Nulla non magna accumsan, egestas quam sit amet, laoreet lectus.";
let mut input_pos = 0;
let mut output = vec![0; input.len()];
let mut output_pos = 0;
while input_pos < input.len() {
let (a, b) = cstream.compress(&mut output[output_pos..], &input[input_pos..]).unwrap();
output_pos += a;
input_pos += b;
}
while let Ok(n) = cstream.end_stream(&mut output[output_pos..]) {
if n == 0 {
break
}
output_pos += n;
}
output.truncate(output_pos);
{
use std::io::Write;
let mut file = std::fs::File::create("test").unwrap();
file.write_all(&output).unwrap();
}
println!("input len = {:?}, pos = {:?}", input.len(), output_pos);
let mut decomp = Vec::new();
let mut s = {
Seekable::init_buf(&output).unwrap()
};
for frame in 0..s.get_num_frames() {
let size = s.get_frame_decompressed_size(frame);
println!("{:?} {:?}", size, s.get_frame_decompressed_offset(frame));
let n = decomp.len();
decomp.extend(std::iter::repeat(0).take(size));
s.decompress_frame(&mut decomp[n..], frame);
}
println!("{:?}", std::str::from_utf8(&decomp).unwrap())
} If I replace |
Also, here is the code for the two functions, calling the actual library: pub fn init_buf(input: &'a [u8]) -> Result<Self, Error> {
unsafe {
let p = ZSTD_seekable_create();
if p.is_null() {
return Err(Error::Null)
}
let result = ZSTD_seekable_initBuff(p, input.as_ptr() as *const c_void, input.len());
if ZSTD_isError(result) != 0 {
return Err(Error::ZSTD(result))
}
Ok(Seekable {
p,
f: std::ptr::null_mut(),
marker: std::marker::PhantomData,
})
}
}
/// Initialise a decompressor with a file. This method opens the file, and dropping the resulting `Seekable` closes the file.
pub fn init_file(name: &str) -> Result<Self, Error> {
unsafe {
let name = CString::new(name).unwrap();
let f: *mut libc::FILE = fopen(name.as_ptr(), "rb\0".as_ptr() as *const i8);
assert!(!f.is_null());
let p = ZSTD_seekable_create();
if p.is_null() {
return Err(Error::Null)
}
let result = ZSTD_seekable_initFile(p, f as *mut _IO_FILE);
if ZSTD_isError(result) != 0 {
return Err(Error::ZSTD(result))
}
Ok(Seekable {
p,
f,
marker: std::marker::PhantomData,
})
}
} |
Looks like it was my fault, that's a little embarrassing. Fixed in #1695. |
We have a had a request to implement a zstd decompression filter in nbdkit. We already have a filter which supports xz. If implemented it would work like this: (1) The compressed disk image is prepared by one user and uploaded to a website:
(2) Another user can serve this file uncompressed to their local network using:
(The equivalent already works for xz; note the COW filter makes the image writable but we don't require writing to the underlying zstd file). For xz we can seek to a block boundary and only uncompress a single block which makes it relatively efficient as long as a smallish block size was chosen by the person who originally compressed the file. As I understand it from the preceding comments, there is no support yet for the seekable format in the |
Hello all, I've just used the contrib seekable code to put together a SQLite extension enabling it to read from a database file compressed in the proposed format. It was very easy to hook up and seems to work well. Since this application entails frame sizes of just a few KB, I'm next interested in following through on the allusion to adding a dictionary inline. I can come up with some proposal, but just wondering if @iburinoc @Cyan4973 or anyone else have any specific ideas in mind about how that should work |
Seeing as xz is a compression container format that specifically is designed to provide seek-ability to underlying compression algorithms (which it abstracts over, similar to a zip file), instead of adding this support to zstd--which then would have its own framing mechanism and have some weird detection on its input and tradeoff in its output--why not just add support to xz for zstd by defining a zstd filter (that's the xz terminology: note that earlier in this thread a mention is used to "filter" near "xz" in a way that is unrelated)? |
"Filter" in that comment referred to nbdkit filters. |
@rwmjones Right. I 100% understood that, which is why I felt a need to make explicit that I was using a different, "unrelated" definition of "filter", lest someone get confused. (Ironically, I see someone still got confused, but I honestly am not sure how I could have been even more explicit in order to ensure no one got confused.) |
As a related work there is RAC(random access compression) format https://github.com/google/wuffs/blob/master/doc/spec/rac-spec.md |
I am the author of the RAC format. Happy to discuss that here if you want. It is not "battle-proven" yet. I started the experiment in 2019, but like many other people, I found 2020 disruptive in terms of getting work done. A comparison with XZ is briefly discussed in the RAC related work doc. For actual numbers (not just a spec), here's one highlight from the "Extended Example" of the ractool docs:
|
After looking at xz corruption-prone spec analyzis, I'm not sure if it's good idea to even start extending xz. @saurik , in ycombinator discussion of 2016 you was seemingly in liberate-me-from-xz camp (:D), had something changed since then? |
@amerlyq So, the idea that "camps" exist in software development is part of why it is so hard to have reasonable discussions :(. The fact that xz is often used for things it is ineffective at (I personally believe it should probably never have been supported in dpkg and it was a devastating mistake for them to have removed lzma support) or that people like to praise it in incorrect contexts due to misunderstandings about why it even exists (such as claiming it is a compression algorithm instead of a file format, as many did on that HN thread) does not imply that it is without any value or that it is never a choice to consider... it certainly doesn't mean that someone who has argued against the format (such as myself) must switch "camps" in order to suggest its use :(. I brought up xz because it seems like a fast path to get seekability of zstd without zstd having to define yet another new file format, as the only reason xz even exists is to be a seekable compression container that has extensible algorithm choice... aka, exactly what is being looked for here today. You do not have to like xz to appreciate that it has become pretty widely used, and you don't have to believe it is an optimal file format under every possible criteria to use it... you don't even need to make it the only format people use: it just already happens to exist. I can simultaneously argue against xz in one context and for it in another. Someone here mentioned xz as a thing they were supporting for something else, and so there is value I think in noting that it is already an extensible format and so arguably already is the solution... if only there were a zstd filter. But like, seriously: this is an issue that has been open since 2016 that frankly seems unlikely to me to every come to consensus on a new format soon (and like, that's fine: no rush from me! I am not complaining, I am just being practical). It seems like it would take a lot less time to come to consensus on what xz filter number to assign to zstd--assuming the xz people would be amenable to that, which I admit they probably wouldn't be :/--and then the people who are waiting on this feature could have pretty widespread and inter-compatible support for streaming zstd within a few days. The best part: doing that wouldn't in any way prevent zstd from coming up with their own file format for this that was infinitely better when confronted with random bit flips or massively parallel CPUs or high-cost random access storage. So like, do what you want: I don't even remember why I cared at this point ;P I imagine I first subscribed to this issue in mid-2019 as that was when I was working on a project that used zstd... but I ended up working on something else instead ;P... but please just don't assume the world is always divided into "camps" and that something must have "changed" in order to cause someone who argues against something in one context to argue for it in another. Some of us are honestly just trying to accomplish tasks ;P. |
Yep, both hands up! Having .xz is better than having nothing. In the meanwhile in parallel universe... to preview (partial extract) .zstd archives in FUSE the 15th standard was hacked :D |
A common use case not mentioned here is indexing of compressed tar archives. The lzip thing tangentially mentioned earlier has a tarlz program for that. A “pixz” variant of xz also indexes tars. tar integration would be a good way both to test the feature and give end-users a reason to use it. |
I'm the author of pixz. Just chiming in to say that the way to break the chicken-and-egg problem for xz was for someone to ship seekable-xz to a lot of users, which is what pixz + its distro packagers did. Once it became accessible, the approach became tested and well-liked, and then upstream was more easily convinced. So someone ought to arrange to actually ship seekable zstd to end-users, so they can try it out. Either:
|
@Cyan4973 When you say it needs to be battle-tested, production-ready, etc, what does that look like? Is there some kind of threshold you have in mind or is it a "I'll know it when I see it" kind of thing? I guess this question also applies to I bring this up because it was mentioned in some of the discussions around XZ, that this is one of the few features which zstd cannot currently provide, which handicap it as a replacement for XZ. |
If you rather mean "the format", then I guess the
Yeah, that's a difficult one. The real issue now is that, in order to make this format "official", we would have to guarantee its maintenance moving forward. And that's a tall order. Unfortunately, I feel we are a bit stretched at the moment to carry that weight. |
I know the 4mc container adds splitability to zstd but is the standard by itself able to do so without the help of 3rd party implementations?
Note: I am aware zstd still not officially implemented in Hadoop so this is more like a theoretical question.
The text was updated successfully, but these errors were encountered: