Subscribe requests related to streaming real-time account updates for a particular address
The account array accepts the pool address for which updates are being streamed. This is especially useful when monitoring the state of specific accounts, such as those in liquidity pools, to track changes in real time.
import Client, {
CommitmentLevel,
SubscribeRequestAccountsDataSlice,
SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry,
SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";
import bs58 from "bs58";
// Interface for the subscription request structure
interface SubscribeRequest {
accounts: { [key: string]: SubscribeRequestFilterAccounts };
slots: { [key: string]: SubscribeRequestFilterSlots };
transactions: { [key: string]: SubscribeRequestFilterTransactions };
transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
blocks: { [key: string]: SubscribeRequestFilterBlocks };
blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
entry: { [key: string]: SubscribeRequestFilterEntry };
commitment?: CommitmentLevel;
accountsDataSlice: SubscribeRequestAccountsDataSlice[];
ping?: SubscribeRequestPing;
}
/**
* Subscribes to the gRPC stream and handles incoming data.
*
* @param client - Yellowstone gRPC client
* @param args - The Subscription request which specifies what data to stream
*/
async function handleStream(client: Client, args: SubscribeRequest) {
const stream = await client.subscribe();
// Promise that resolves when the stream ends or errors out
const streamClosed = new Promise<void>((resolve, reject) => {
stream.on("error", (error) => {
console.error("Stream error:", error);
reject(error);
stream.end();
});
stream.on("end", resolve);
stream.on("close", resolve);
});
// Handle incoming transaction data
stream.on("data", (data) => {
if (data?.account) {
console.log("\nReceived Account Update for:");
console.log(bs58.encode(data?.account?.account?.pubkey));
console.log("\n");
console.log(data?.account);
}
});
// Send the subscription request
await new Promise<void>((resolve, reject) => {
stream.write(args, (err: any) => {
err ? reject(err) : resolve();
});
}).catch((err) => {
console.error("Failed to send subscription request:", err);
throw err;
});
// Wait for the stream to close
await streamClosed;
}
/**
* Entry point to start the subscription stream.
*
*/
async function subscribeCommand(client: Client, args: SubscribeRequest) {
while (true) {
try {
await handleStream(client, args);
} catch (error) {
console.error("Stream error, retrying in 1 second...", error);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}
// Instantiate Yellowstone gRPC client with .env credentials
const client = new Client(
"YOUR-GRPC-ENDPOINT", //Your Region specific gRPC URL
"YOUR-GRPC-ACCESS-TOKEN", // your Access Token
undefined,
);
/**
* Subscribe Request: The `account` field is for streaming account updates.
* The `owner` field is for filtering updates based on the program ID.
*/
const req: SubscribeRequest = {
slots: {},
accounts: {
sol_usdc: {
account: ["9AnFgHoXFysVcuFFX7QztDmzuH8r5ZFvyP4sYwn1XTj9"], // pool address when streaming pool updates
filters: [],
owner: [],
},
},
transactions: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
commitment: CommitmentLevel.PROCESSED, // Subscribe to processed blocks for the fastest updates
entry: {},
transactionsStatus: {},
};
// Start the subscription
subscribeCommand(client, req);
Since an account's data can be updated many times during its lifecycle (for example liquidity pools), this subscription request will stream the updated data each time such an update occurs.