Skip to content

lalinsky/nats.zig

Repository files navigation

NATS.zig

A Zig client library for NATS, the cloud-native messaging system.

Development Status

This library is mostly feature-complete but lacks production testing. The API is functional and covers the core NATS and JetStream features, but may still undergo changes based on feedback.

Testers are welcome! If you're interested in testing the library in your projects, please report any issues or feedback. The goal is to be as feature complete as the official NATS client libraries. It is based on the NATS C and Go libraries.

⚠️ Note: While the library is mostly finished, the API may still change before the 1.0 release. Use at your own discretion in production environments.

Installation

  1. Add nats.zig as a dependency in your build.zig.zon:
zig fetch --save "git+https://github.com/lalinsky/nats.zig"
  1. In your build.zig, add the nats module as a dependency to your program:
const nats = b.dependency("nats", .{
    .target = target,
    .optimize = optimize,
});

// the executable from your call to b.addExecutable(...)
exe.root_module.addImport("nats", nats.module("nats"));

Examples

Connect

var nc = nats.Connection.init(allocator, .{});
defer nc.deinit();

try nc.connect("nats://localhost:4222");

Publish message

try nc.publish("hello", "Hello, NATS!");

Subscribe synchronously

// Create synchronous subscription
var counter: u32 = 0;
const sub = try nc.subscribeSync("hello");

// Wait for message with 5 second timeout
while (true) {
    var msg = sub.nextMsg(5000) catch |err| {
        if (err == error.Timeout) continue;
        return err;
    };
    defer msg.deinit();

    counter += 1;
    std.debug.print("Message #{d}: {s}\n", .{ counter, msg.data });
}

Subscribe asynchronously (with callback)

// Define message handler
fn messageHandler(msg: *nats.Message, counter: *u32) void {
    defer msg.deinit();

    counter.* += 1;
    std.debug.print("Message #{d}: {s}\n", .{ counter.*, msg.data });
}

// Subscribe with callback handler
var counter: u32 = 0;
const sub = try nc.subscribe("hello", messageHandler, .{&counter});

Send request and wait for reply

// Send request and wait for reply with 5 second timeout
const reply = try nc.request("help", "need assistance", 5000);
defer reply.deinit();

std.debug.print("Received reply: {s}\n", .{reply.data});

Send request and wait for multiple replies

// Request multiple responses from different responders
var messages = try nc.requestMany("services.status", "ping all", 5000, .{
    .max_messages = 10,    // Stop after 10 responses
    .stall_ms = 100,       // Stop if no new responses for 100ms
});

while (messages.pop()) |msg| {
    defer msg.deinit();
    std.debug.print("Response: {s}\n", .{msg.data});
}

Handle requests

// Define request handler
fn echoHandler(msg: *nats.Message, context: *MyContext) !void {
    defer msg.deinit();
    
    // Send reply
    try msg.reply(msg.data);
}

// Subscribe to handle requests
var context = MyContext{};
const sub = try nc.subscribe("echo", echoHandler, .{&context});

JetStream Stream Management

// Create JetStream context
var js = nc.jetstream(.{});
defer js.deinit();

// Create a stream
const stream_config = nats.StreamConfig{
    .name = "ORDERS",
    .subjects = &.{"orders.*"},
    .retention = .limits,
    .storage = .file,
    .max_msgs = 1000,
};

var stream_info = try js.addStream(stream_config);
defer stream_info.deinit();

JetStream Consumer Management

// Create a durable consumer
const consumer_config = nats.ConsumerConfig{
    .durable_name = "order_processor", 
    .ack_policy = .explicit,
    .deliver_policy = .all,
};

var consumer_info = try js.addConsumer("ORDERS", consumer_config);
defer consumer_info.deinit();

JetStream Push Subscriptions

// Push subscription with callback handler
fn orderHandler(js_msg: *nats.JetStreamMessage, count: *u32) !void {
    defer js_msg.deinit();
    count.* += 1;
    try js_msg.ack(); // Acknowledge message
    std.debug.print("Order: {s}\n", .{js_msg.data});
}

var processed: u32 = 0;
var push_sub = try js.subscribe("orders.*", orderHandler, .{&processed}, .{
    .stream = "ORDERS",
    .durable = "order_processor",
});
defer push_sub.deinit();

JetStream Pull Subscriptions

// Pull subscription (fetch messages manually)
var pull_sub = try js.pullSubscribe("orders.*", "batch_processor", .{
    .stream = "ORDERS",
});
defer pull_sub.deinit();

var batch = try pull_sub.fetch(10, 5000); // Fetch up to 10 msgs, 5s timeout
defer batch.deinit();
for (batch.messages) |js_msg| {
    try js_msg.ack();
}

Building

# Build the library
zig build

# Build examples
zig build examples

Testing

The project includes both unit tests and end-to-end tests:

# Run all tests (unit + e2e)
zig build test

# Run only unit tests
zig build test-unit

# Run only end-to-end tests
zig build test-e2e

The end-to-end tests automatically start and stop the required NATS servers using Docker Compose.

About

A Zig client library for NATS #zig #nats

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages