Modifying Subscriptions: The subscription stream allows bi-directional communication. You can update your subscription filters dynamically by sending a new Subscribe request. For more details please check in"Getting Started" section.
Unsubscribing: Send an empty subscription request to stop receiving updates on all streams. This will keep the connection open for future subscriptions.
Modifying a Subscribe Request
Solana Yellowstone gRPC streams, while ideal for real-time data, often need dynamic adjustments. You can modify existing subscriptions without disconnecting the stream, allowing for real-time changes to filters or tracked addresses based on application needs or market shifts. This ensures uninterrupted data flow and immediate responsiveness to live blockchain events. For more details do checkout the section in the Section.
The complete code for updating subscribe requests is available on and on .
import Client, {
CommitmentLevel,
SubscribeRequestAccountsDataSlice,
SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry,
SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";
interface SubscribeRequest {
accounts: { [key: string]: SubscribeRequestFilterAccounts };
slots: { [key: string]: SubscribeRequestFilterSlots };
transactions: { [key: string]: SubscribeRequestFilterTransactions };
transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
blocks: { [key: string]: SubscribeRequestFilterBlocks };
blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
entry: { [key: string]: SubscribeRequestFilterEntry };
commitment?: CommitmentLevel;
accountsDataSlice: SubscribeRequestAccountsDataSlice[];
ping?: SubscribeRequestPing;
}
const subscribedWalletsA: string[] = [
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
"5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",
"4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",
"GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",
"oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",
];
const subscribedWalletsB: string[] = [
"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",
];
const subscribeRequest1: SubscribeRequest = {
accounts: {},
slots: {},
transactions: {
modifying_A: {
vote: false,
failed: false,
signature: undefined,
accountInclude: subscribedWalletsA,
accountExclude: [],
accountRequired: [],
},
},
transactionsStatus: {},
entry: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.PROCESSED,
};
// Subscribes to account changes for program-owned accounts of subscribedWalletsB
const subscribeRequest2: SubscribeRequest = {
accounts: {
modifying_B: {
account: [],
filters: [],
owner: subscribedWalletsB,
},
},
slots: {},
transactions: {},
transactionsStatus: {},
blocks: {},
blocksMeta: {
block: [],
},
entry: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.PROCESSED,
};
/**
* Dynamically updates the current stream subscription with new request parameters.
*/
async function updateSubscription(stream: any, args: SubscribeRequest) {
try {
stream.write(args);
} catch (error) {
console.error("Failed to send updated subscription request:", error);
}
}
/**
* Handles a single streaming session.
* Automatically switches to a second subscription request after a timeout.
*/
async function handleStream(client: Client, args: SubscribeRequest) {
const stream = await client.subscribe();
// Waits for the stream to close or error out
const streamClosed = new Promise<void>((resolve, reject) => {
stream.on("error", (error) => {
console.error("Stream Error:", error);
reject(error);
stream.end();
});
stream.on("end", resolve);
stream.on("close", resolve);
});
// Automatically switch subscription after 10 seconds
setTimeout(async () => {
console.log("🔁 Switching to second subscription request...");
await updateSubscription(stream, subscribeRequest2);
}, 10000);
// Handle incoming data
stream.on("data", async (data) => {
try {
console.log("📦 Streamed Data:", data);
// You can add more processing logic here
} catch (error) {
console.error("Error processing stream data:", error);
}
});
// Send initial subscription request
await new Promise<void>((resolve, reject) => {
stream.write(args, (err: any) => (err ? reject(err) : resolve()));
}).catch((reason) => {
console.error("Initial stream write failed:", reason);
throw reason;
});
await streamClosed;
}
/**
* Starts the stream and continuously attempts to reconnect on errors.
*/
async function subscribeCommand(client: Client, args: SubscribeRequest) {
while (true) {
try {
await handleStream(client, args);
} catch (error) {
console.error("Stream error. Retrying in 1 second...", error);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}
const client = new Client(process.env.GRPC_URL, process.env.X_TOKEN, undefined);
// Start streaming with the first subscription
subscribeCommand(client, subscribeRequest1);
This action will unsubscribe from all currently subscribed channels. Please note that, it won't disrupt the connection itself, which means you can still establish new subscriptions in the future without needing to re-connect to the service.
Closing a gRPC Connection
import Client, {
CommitmentLevel,
SubscribeRequestAccountsDataSlice,
SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry,
SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import bs58 from "bs58";
const PUBLIC_KEY_TO_LISTEN = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; //can be any address
const RUN_TIME = 5000; //decides how long to run
const GRPC_ENDPOINT = "https://grpc.ams.shyft.to"; //your gRPC endpoint
const X_TOKEN = ""; //your x-token
interface SubscribeRequest {
accounts: { [key: string]: SubscribeRequestFilterAccounts };
slots: { [key: string]: SubscribeRequestFilterSlots };
transactions: { [key: string]: SubscribeRequestFilterTransactions };
transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
blocks: { [key: string]: SubscribeRequestFilterBlocks };
blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
entry: { [key: string]: SubscribeRequestFilterEntry };
commitment?: CommitmentLevel | undefined;
accountsDataSlice: SubscribeRequestAccountsDataSlice[];
}
const client = new Client(GRPC_ENDPOINT, X_TOKEN, undefined);
const req: SubscribeRequest = {
accounts: {},
slots: {},
transactions: {
raydiumLiquidityPoolV4: {
vote: false,
failed: false,
signature: undefined,
accountInclude: [PUBLIC_KEY_TO_LISTEN],
accountExclude: [],
accountRequired: [],
},
},
transactionsStatus: {},
entry: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
commitment: CommitmentLevel.PROCESSED,
};
async function handleStream(client: Client, args: SubscribeRequest) {
console.log(`Subscribing and starting stream...`);
const stream = await client.subscribe();
console.log(`Streaming data and printing transactions...`);
const streamClosed = new Promise<void>((resolve, reject) => {
stream.on("error", (err: any) => {
if (err.code === 1 || err.message.includes("Cancelled")) {
//this indicates stream was cancelled by user.
console.log("✅ Stream cancelled by user");
resolve();
} else {
console.error("❌ Stream error:", err);
reject(err);
}
});
stream.on("close", () => {
console.log("Stream closed.");
resolve();
});
});
stream.on("data", (data) => {
if (data.transaction) {
console.log(
"Received: ",
bs58.encode(data.transaction.transaction.signature),
);
}
});
// Subscribe
stream.write(args);
// Cancel after timeout
setTimeout(() => {
console.log("Cancelling stream...");
try {
stream.cancel();
/*
* stream.end() or stream.destroy();
* Cancels the stream from the user end, and closes the stream.
*
* A "Cancelled on client" error (code 1) will be thrown once this is called,
* which be caught in stream.on("error") function.
* This is one of the ways to cancel the stream, and clear your connections, when using Shyft gRPCs.
*
* For more information on clearing connections, please check the FAQ section of gRPC docs
* https://docs.shyft.to/solana-yellowstone-grpc/grpc-docs#solana-grpc-faq
*/
} catch (error) {
if (error.code !== "ERR_STREAM_PREMATURE_CLOSE") {
console.error("Stream cancel error:", error);
}
}
}, RUN_TIME);
await streamClosed;
}
async function subscribeCommand(client: Client, args: SubscribeRequest) {
await handleStream(client, args);
}
subscribeCommand(client, req);
This unsubscribes to your currently Subscribed Channels. However please note that this does not close the connection, and you may hit due to this too.
Once the subscription stream is active, we can close it using stream.cancel() method. Please visit in "" Section to know more.
The complete code of closing a connection is available on .