🔥Replaying Slots with Solana yellowstone gRPCs
Start streaming data from a specific slot when establishing a new gRPC connection or reconnecting
Reconnect mechanisms are important for production applications — we don’t want to miss any data due to temporary network issues. But sometimes, reconnecting alone isn’t enough. We also need to replay data from a specific slot, especially if a stream drops during critical updates. Yellowstone gRPC supports this via the fromSlot
field in the subscription request. By storing the last received slot and including it in the next subscription, you can ensure your stream resumes from where it left off — without missing a single block or transaction.
At any given time, the earliest available slot for replay is approximately 150 slots older than the current slot.
The example below illustrates how you can replay from a specific slot upon disconnection. You can paste this code in Replit to see it in action, or simply run the Repl code here.
import Client, { CommitmentLevel } from "@triton-one/yellowstone-grpc";
import { SubscribeRequest } from "@triton-one/yellowstone-grpc/dist/types/grpc/geyser";
import base58 from "bs58";
const GRPC_URL = "https://grpc.ams.shyft.to"; //enter your grpc url here
const X_TOKEN = ""; //enter your grpc access token here
const MAX_RETRY_WITH_LAST_SLOT = 30;
const RETRY_DELAY_MS = 1000;
const ADDRESS_TO_STREAM_FROM = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";
type StreamResult = {
lastSlot?: string;
hasRcvdMSg: boolean;
};
async function handleStream(
client: Client,
args: SubscribeRequest,
lastSlot?: string,
): Promise<StreamResult> {
const stream = await client.subscribe();
let hasRcvdMSg = false;
return new Promise((resolve, reject) => {
stream.on("data", (data) => {
const tx = data.transaction?.transaction?.transaction;
if (tx?.signatures?.[0]) {
const sig = base58.encode(tx.signatures[0]);
console.log("Got tx:", sig);
lastSlot = data.transaction.slot;
hasRcvdMSg = true;
}
});
stream.on("error", (err) => {
stream.end();
reject({ error: err, lastSlot, hasRcvdMSg });
});
const finalize = () => resolve({ lastSlot, hasRcvdMSg });
stream.on("end", finalize);
stream.on("close", finalize);
stream.write(args, (err: any) => {
if (err) reject({ error: err, lastSlot, hasRcvdMSg });
});
});
}
async function subscribeCommand(client: Client, args: SubscribeRequest) {
let lastSlot: string | undefined;
let retryCount = 0;
while (true) {
try {
// checks if the stream is starting from a specific slot
if (args.fromSlot) {
console.log("Starting stream from slot", args.fromSlot);
}
// starts the stream from lastSlot if value is present
const result = await handleStream(client, args, lastSlot);
lastSlot = result.lastSlot;
if (result.hasRcvdMSg) retryCount = 0;
} catch (err: any) {
console.error(
`Stream error, retrying in ${RETRY_DELAY_MS / 1000} second...`,
);
//in case the stream is interrupted, it waits for a while before retrying
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));
lastSlot = err.lastSlot;
if (err.hasRcvdMSg) retryCount = 0;
if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {
console.log(
`#${retryCount} retrying with last slot ${lastSlot}, remaining retries ${
MAX_RETRY_WITH_LAST_SLOT - retryCount
}`,
);
// sets the fromSlot to the last slot received before the stream was interrupted, if it exists
args.fromSlot = lastSlot;
retryCount++;
} else {
//when there is no last slot available, it starts the stream from the latest slot
console.log("Retrying from latest slot (no last slot available)");
delete args.fromSlot;
retryCount = 0;
lastSlot = undefined;
}
}
}
}
const client = new Client(GRPC_URL, X_TOKEN, {
"grpc.keepalive_permit_without_calls": 1,
"grpc.keepalive_time_ms": 10000,
"grpc.keepalive_timeout_ms": 1000,
"grpc.default_compression_algorithm": 2,
});
const req: SubscribeRequest = {
accounts: {},
slots: {},
transactions: {
pumpFun: {
vote: false,
failed: false,
accountInclude: [ADDRESS_TO_STREAM_FROM],
accountExclude: [],
accountRequired: [],
},
},
transactionsStatus: {},
blocks: {},
blocksMeta: {},
entry: {},
accountsDataSlice: [],
commitment: CommitmentLevel.CONFIRMED,
};
subscribeCommand(client, req);
The complete code of implementing a reconnection mechanism is available on GitHub and Replit here.
The above example implements a robust mechanism to reconnect to the gRPC stream and replay from the last known slot to avoid missing any data.
Reconnection: If the stream encounters an error (like a network drop), the stream automatically retries after waiting a short time (denoted by the
RETRY_DELAY_MS
variable). This loop ensures the stream keeps trying to reconnect without manual restart.Replay from Last Slot: Before each retry, the app checks if it previously received any transaction data. If so, it stores the latest
slot
number (lastSlot
). When reconnecting, it includes thislastSlot
in thefromSlot
field of theSubscribeRequest
, telling Yellowstone gRPC to resume streaming from that exact slot, not from the current blockchain tip.
Last updated
Was this helpful?