Modifying & Unsubscribing

Solana gRPC Geyser Tutorial – Updating Filters and Unsubscribing

  • Modifying Subscriptions: The subscription stream allows bi-directional communication. You can update your subscription filters dynamically by sending a new Subscribe request. For more details please check Modifying Subscribe Request in "Getting Started" section.

  • Unsubscribing: Send an empty subscription request to stop receiving updates on all streams. This will keep the connection open for future subscriptions.

Modifying a Subscribe Request

Solana Yellowstone gRPC streams, while ideal for real-time data, often need dynamic adjustments. You can modify existing subscriptions without disconnecting the stream, allowing for real-time changes to filters or tracked addresses based on application needs or market shifts. This ensures uninterrupted data flow and immediate responsiveness to live blockchain events. For more details do checkout the Modifying your Subscribe Request section in the Getting Started Section.

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
  ping?: SubscribeRequestPing;
}

const subscribedWalletsA: string[] = [
  "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
  "5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",
  "4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",
  "GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",
  "oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",
];

const subscribedWalletsB: string[] = [
  "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",
];

const subscribeRequest1: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    modifying_A: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: subscribedWalletsA,
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

// Subscribes to account changes for program-owned accounts of subscribedWalletsB
const subscribeRequest2: SubscribeRequest = {
  accounts: {
    modifying_B: {
      account: [],
      filters: [],
      owner: subscribedWalletsB,
    },
  },
  slots: {},
  transactions: {},
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {
    block: [],
  },
  entry: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

/**
 * Dynamically updates the current stream subscription with new request parameters.
 */
async function updateSubscription(stream: any, args: SubscribeRequest) {
  try {
    stream.write(args);
  } catch (error) {
    console.error("Failed to send updated subscription request:", error);
  }
}

/**
 * Handles a single streaming session.
 * Automatically switches to a second subscription request after a timeout.
 */
async function handleStream(client: Client, args: SubscribeRequest) {
  const stream = await client.subscribe();

  // Waits for the stream to close or error out
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      console.error("Stream Error:", error);
      reject(error);
      stream.end();
    });
    stream.on("end", resolve);
    stream.on("close", resolve);
  });

  // Automatically switch subscription after 10 seconds
  setTimeout(async () => {
    console.log("πŸ” Switching to second subscription request...");
    await updateSubscription(stream, subscribeRequest2);
  }, 10000);

  // Handle incoming data
  stream.on("data", async (data) => {
    try {
      console.log("πŸ“¦ Streamed Data:", data);
      // You can add more processing logic here
    } catch (error) {
      console.error("Error processing stream data:", error);
    }
  });

  // Send initial subscription request
  await new Promise<void>((resolve, reject) => {
    stream.write(args, (err: any) => (err ? reject(err) : resolve()));
  }).catch((reason) => {
    console.error("Initial stream write failed:", reason);
    throw reason;
  });

  await streamClosed;
}

/**
 * Starts the stream and continuously attempts to reconnect on errors.
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args);
    } catch (error) {
      console.error("Stream error. Retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

const client = new Client(process.env.GRPC_URL, process.env.X_TOKEN, undefined);

// Start streaming with the first subscription
subscribeCommand(client, subscribeRequest1);

Unsubscribing to Subscribed Channels

This unsubscribes to your currently Subscribed Channels. However please note that this does not close the connection, and you may hit IP related limits due to this too.

This action will unsubscribe from all currently subscribed channels. Please note that, it won't disrupt the connection itself, which means you can still establish new subscriptions in the future without needing to re-connect to the service.

Closing a gRPC Connection

Once the subscription stream is active, we can close it using stream.cancel() method. Please visit closing a connection in "Getting Started" Section to know more.

Last updated

Was this helpful?