Modifying & Unsubscribing

Modifying Yellowstone Subscribe Requests & Unsubscribing the Subscriptions

  • Modifying Subscriptions: The subscription stream allows bi-directional communication. You can update your subscription filters dynamically by sending a new Subscribe request. For more details please check Modifying Subscribe Request in "Getting Started" section.

  • Unsubscribing: Send an empty subscription request to stop receiving updates on all streams. This will keep the connection open for future subscriptions.

Modifying a Subscribe Request

Solana Yellowstone gRPC streams, while ideal for real-time data, often need dynamic adjustments. You can modify existing subscriptions without disconnecting the stream, allowing for real-time changes to filters or tracked addresses based on application needs or market shifts. This ensures uninterrupted data flow and immediate responsiveness to live blockchain events. For more details do checkout the Modifying your Subscribe Request section in the Getting Started Section.

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
  ping?: SubscribeRequestPing;
}

const subscribedWalletsA: string[] = [
  "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
  "5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",
  "4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",
  "GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",
  "oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",
];

const subscribedWalletsB: string[] = [
  "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",
];

const subscribeRequest1: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    modifying_A: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: subscribedWalletsA,
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

// Subscribes to account changes for program-owned accounts of subscribedWalletsB
const subscribeRequest2: SubscribeRequest = {
  accounts: {
    modifying_B: {
      account: [],
      filters: [],
      owner: subscribedWalletsB,
    },
  },
  slots: {},
  transactions: {},
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {
    block: [],
  },
  entry: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

/**
 * Dynamically updates the current stream subscription with new request parameters.
 */
async function updateSubscription(stream: any, args: SubscribeRequest) {
  try {
    stream.write(args);
  } catch (error) {
    console.error("Failed to send updated subscription request:", error);
  }
}

/**
 * Handles a single streaming session.
 * Automatically switches to a second subscription request after a timeout.
 */
async function handleStream(client: Client, args: SubscribeRequest) {
  const stream = await client.subscribe();

  // Waits for the stream to close or error out
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      console.error("Stream Error:", error);
      reject(error);
      stream.end();
    });
    stream.on("end", resolve);
    stream.on("close", resolve);
  });

  // Automatically switch subscription after 10 seconds
  setTimeout(async () => {
    console.log("🔁 Switching to second subscription request...");
    await updateSubscription(stream, subscribeRequest2);
  }, 10000);

  // Handle incoming data
  stream.on("data", async (data) => {
    try {
      console.log("📦 Streamed Data:", data);
      // You can add more processing logic here
    } catch (error) {
      console.error("Error processing stream data:", error);
    }
  });

  // Send initial subscription request
  await new Promise<void>((resolve, reject) => {
    stream.write(args, (err: any) => (err ? reject(err) : resolve()));
  }).catch((reason) => {
    console.error("Initial stream write failed:", reason);
    throw reason;
  });

  await streamClosed;
}

/**
 * Starts the stream and continuously attempts to reconnect on errors.
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args);
    } catch (error) {
      console.error("Stream error. Retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

const client = new Client(process.env.GRPC_URL, process.env.X_TOKEN, undefined);

// Start streaming with the first subscription
subscribeCommand(client, subscribeRequest1);

Unsubscribing to Subscribed Channels

This unsubscribes to your currently Subscribed Channels. However please note that this does not close the connection, and you may hit IP related limits due to this too.

{
  "slots": {},
  "accounts": {},
  "transactions": {},
  "blocks": {},
  "blocksMeta": {},
  "accountsDataSlice": []
}

This action will unsubscribe from all currently subscribed channels. Please note that, it won't disrupt the connection itself, which means you can still establish new subscriptions in the future without needing to re-connect to the service.

Closing a gRPC Connection

Once the subscription stream is active, we can close it using stream.cancel() method. Please visit closing a connection in "Getting Started" Section to know more.

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import bs58 from "bs58";

const PUBLIC_KEY_TO_LISTEN = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; //can be any address
const RUN_TIME = 5000; //decides how long to run
const GRPC_ENDPOINT = "https://grpc.ams.shyft.to"; //your gRPC endpoint
const X_TOKEN = ""; //your x-token

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel | undefined;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
}

const client = new Client(GRPC_ENDPOINT, X_TOKEN, undefined);

const req: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    raydiumLiquidityPoolV4: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: [PUBLIC_KEY_TO_LISTEN],
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  commitment: CommitmentLevel.PROCESSED,
};

async function handleStream(client: Client, args: SubscribeRequest) {
  console.log(`Subscribing and starting stream...`);
  const stream = await client.subscribe();
  console.log(`Streaming data and printing transactions...`);

  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (err: any) => {
      if (err.code === 1 || err.message.includes("Cancelled")) {
        //this indicates stream was cancelled by user.
        console.log("✅ Stream cancelled by user");
        resolve();
      } else {
        console.error("❌ Stream error:", err);
        reject(err);
      }
    });

    stream.on("close", () => {
      console.log("Stream closed.");
      resolve();
    });
  });

  stream.on("data", (data) => {
    if (data.transaction) {
      console.log(
        "Received: ",
        bs58.encode(data.transaction.transaction.signature),
      );
    }
  });

  // Subscribe
  stream.write(args);

  // Cancel after timeout
  setTimeout(() => {
    console.log("Cancelling stream...");
    try {
      stream.cancel();
      /*
       * stream.end() or stream.destroy();
       * Cancels the stream from the user end, and closes the stream.
       *
       * A "Cancelled on client" error (code 1) will be thrown once this is called,
       * which be caught in stream.on("error") function.
       * This is one of the ways to cancel the stream, and clear your connections, when using Shyft gRPCs.
       *
       * For more information on clearing connections, please check the FAQ section of gRPC docs
       * https://docs.shyft.to/solana-yellowstone-grpc/grpc-docs#solana-grpc-faq
       */
    } catch (error) {
      if (error.code !== "ERR_STREAM_PREMATURE_CLOSE") {
        console.error("Stream cancel error:", error);
      }
    }
  }, RUN_TIME);

  await streamClosed;
}

async function subscribeCommand(client: Client, args: SubscribeRequest) {
  await handleStream(client, args);
}

subscribeCommand(client, req);

Last updated

Was this helpful?