Account Updates for an Address

Subscribe requests related to streaming real-time account updates for a particular address

The account array accepts the pool address for which updates are being streamed. This is especially useful when monitoring the state of specific accounts, such as those in liquidity pools, to track changes in real time.

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";
import bs58 from "bs58";


// Interface for the subscription request structure
interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
  ping?: SubscribeRequestPing;
}

/**
 * Subscribes to the gRPC stream and handles incoming data.
 *
 * @param client - Yellowstone gRPC client
 * @param args - The Subscription request which specifies what data to stream
 */
async function handleStream(client: Client, args: SubscribeRequest) {
  const stream = await client.subscribe();

  // Promise that resolves when the stream ends or errors out
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      console.error("Stream error:", error);
      reject(error);
      stream.end();
    });

    stream.on("end", resolve);
    stream.on("close", resolve);
  });

  // Handle incoming transaction data
  stream.on("data", (data) => {
    if (data?.account) {
      console.log("\nReceived Account Update for:");
      console.log(bs58.encode(data?.account?.account?.pubkey));
      console.log("\n");
      console.log(data?.account);
    }
  });

  // Send the subscription request
  await new Promise<void>((resolve, reject) => {
    stream.write(args, (err: any) => {
      err ? reject(err) : resolve();
    });
  }).catch((err) => {
    console.error("Failed to send subscription request:", err);
    throw err;
  });

  // Wait for the stream to close
  await streamClosed;
}

/**
 * Entry point to start the subscription stream.
 *
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args);
    } catch (error) {
      console.error("Stream error, retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

// Instantiate Yellowstone gRPC client with .env credentials
const client = new Client(
  "YOUR-GRPC-ENDPOINT", //Your Region specific gRPC URL
  "YOUR-GRPC-ACCESS-TOKEN", // your Access Token
  undefined,
);

/**
 * Subscribe Request: The `account` field is  for streaming account updates.
 * The `owner` field is for filtering updates based on the program ID.
 */
const req: SubscribeRequest = {
  slots: {},
  accounts: {
    sol_usdc: {
      account: ["9AnFgHoXFysVcuFFX7QztDmzuH8r5ZFvyP4sYwn1XTj9"], // pool address when streaming pool updates
      filters: [],
      owner: [],
    },
  },
  transactions: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  commitment: CommitmentLevel.PROCESSED, // Subscribe to processed blocks for the fastest updates
  entry: {},
  transactionsStatus: {},
};

// Start the subscription
subscribeCommand(client, req);

Since an account's data can be updated many times during its lifecycle (for example liquidity pools), this subscription request will stream the updated data each time such an update occurs.

Last updated

Was this helpful?