A Zig client library for NATS, the cloud-native messaging system.
This library is mostly feature-complete but lacks production testing. The API is functional and covers the core NATS and JetStream features, but may still undergo changes based on feedback.
Testers are welcome! If you're interested in testing the library in your projects, please report any issues or feedback. The goal is to be as feature complete as the official NATS client libraries. It is based on the NATS C and Go libraries.
- Add nats.zig as a dependency in your
build.zig.zon
:
zig fetch --save "git+https://github.com/lalinsky/nats.zig"
- In your
build.zig
, add thenats
module as a dependency to your program:
const nats = b.dependency("nats", .{
.target = target,
.optimize = optimize,
});
// the executable from your call to b.addExecutable(...)
exe.root_module.addImport("nats", nats.module("nats"));
var nc = nats.Connection.init(allocator, .{});
defer nc.deinit();
try nc.connect("nats://localhost:4222");
try nc.publish("hello", "Hello, NATS!");
// Create synchronous subscription
var counter: u32 = 0;
const sub = try nc.subscribeSync("hello");
// Wait for message with 5 second timeout
while (true) {
var msg = sub.nextMsg(5000) catch |err| {
if (err == error.Timeout) continue;
return err;
};
defer msg.deinit();
counter += 1;
std.debug.print("Message #{d}: {s}\n", .{ counter, msg.data });
}
// Define message handler
fn messageHandler(msg: *nats.Message, counter: *u32) void {
defer msg.deinit();
counter.* += 1;
std.debug.print("Message #{d}: {s}\n", .{ counter.*, msg.data });
}
// Subscribe with callback handler
var counter: u32 = 0;
const sub = try nc.subscribe("hello", messageHandler, .{&counter});
// Send request and wait for reply with 5 second timeout
const reply = try nc.request("help", "need assistance", 5000);
defer reply.deinit();
std.debug.print("Received reply: {s}\n", .{reply.data});
// Request multiple responses from different responders
var messages = try nc.requestMany("services.status", "ping all", 5000, .{
.max_messages = 10, // Stop after 10 responses
.stall_ms = 100, // Stop if no new responses for 100ms
});
while (messages.pop()) |msg| {
defer msg.deinit();
std.debug.print("Response: {s}\n", .{msg.data});
}
// Define request handler
fn echoHandler(msg: *nats.Message, context: *MyContext) !void {
defer msg.deinit();
// Send reply
try msg.reply(msg.data);
}
// Subscribe to handle requests
var context = MyContext{};
const sub = try nc.subscribe("echo", echoHandler, .{&context});
// Create JetStream context
var js = nc.jetstream(.{});
defer js.deinit();
// Create a stream
const stream_config = nats.StreamConfig{
.name = "ORDERS",
.subjects = &.{"orders.*"},
.retention = .limits,
.storage = .file,
.max_msgs = 1000,
};
var stream_info = try js.addStream(stream_config);
defer stream_info.deinit();
// Create a durable consumer
const consumer_config = nats.ConsumerConfig{
.durable_name = "order_processor",
.ack_policy = .explicit,
.deliver_policy = .all,
};
var consumer_info = try js.addConsumer("ORDERS", consumer_config);
defer consumer_info.deinit();
// Push subscription with callback handler
fn orderHandler(js_msg: *nats.JetStreamMessage, count: *u32) !void {
defer js_msg.deinit();
count.* += 1;
try js_msg.ack(); // Acknowledge message
std.debug.print("Order: {s}\n", .{js_msg.data});
}
var processed: u32 = 0;
var push_sub = try js.subscribe("orders.*", orderHandler, .{&processed}, .{
.stream = "ORDERS",
.durable = "order_processor",
});
defer push_sub.deinit();
// Pull subscription (fetch messages manually)
var pull_sub = try js.pullSubscribe("orders.*", "batch_processor", .{
.stream = "ORDERS",
});
defer pull_sub.deinit();
var batch = try pull_sub.fetch(10, 5000); // Fetch up to 10 msgs, 5s timeout
defer batch.deinit();
for (batch.messages) |js_msg| {
try js_msg.ack();
}
# Build the library
zig build
# Build examples
zig build examples
The project includes both unit tests and end-to-end tests:
# Run all tests (unit + e2e)
zig build test
# Run only unit tests
zig build test-unit
# Run only end-to-end tests
zig build test-e2e
The end-to-end tests automatically start and stop the required NATS servers using Docker Compose.