Shyft
Start BuildingSupportWebsite
  • Welcome
    • 👋Introducing Shyft
    • 🏗️Start Building
  • Solana Infrastructure
    • 🚁Shyft RPCs
  • Yellowstone gRPC Network
    • Decoding gRPC Latency
    • ⚡gRPC Docs
      • Introduction
      • Authentication
      • Subscribe Requests
      • FAQ
      • Getting Started
        • Initializing the Yellowstone Client
        • Making a gRPC connection
        • Adding a Reconnection Mechanism
        • 🔥Replaying Slots with Solana yellowstone gRPCs
        • Modifying your Subscribe Request
        • Closing a gRPC Connection
      • Subscribing to Transactions
        • All Transactions of an address
        • Subscribing to all transactions of a Liquidity Pool
        • Subscribing to all transactions of multiple addresses
        • Subscribing to all transactions of a Token
      • Subscribing to Accounts
        • Account Updates for a Program
        • Account Updates for an Address
        • Account updates using memcmp
      • Streaming Blocks & BlocksMeta
        • Streaming Block Updates
        • Subscribing to BlocksMeta
      • Modifying & Unsubscribing
  • Solana defi data
    • DeFI APIs
      • Get Pool By Address
      • Get Pools By Token Pair
      • Get All Pools for a Token
      • Get Liquidity Details of a Pool
  • Callbacks
    • ☎️What are Callbacks?
      • Transaction Callbacks
      • Account Callbacks
    • 📔Callback APIs
      • Response Structure
      • List Callbacks
      • Register callback
      • Remove callback
      • 🔥Pause a callback
      • 🔥Resume a callback
      • Update Callbacks
      • Add Addresses
      • Remove addresses
  • Solana Super Indexers
    • 🌩️GraphQL APIs
      • Getting Started
      • Building Queries
      • Paginating Response
      • Applying Filters
      • Ordering and Sorting Data
    • 📀Case Studies
      • Tensor
        • Get Active Listings of a Wallet
        • Get Active Bids of a Wallet
        • Get Active Listings of a Collection
        • Get all Bids of a Collection
        • Get all Pools of a Margin Account
        • Get all Pools by Owner
      • Raydium
        • Get Pool By Address
        • Get Pools By Token Address
        • Get Pools Created Between Time
        • Get Pool Burn Percentage
        • Get Liquidity Details of a Pool
        • Get Pool and OpenBook Market Info
        • Get Token Supply Percentage In Pool
      • Orca Whirlpool
        • Get Pool by Address
        • Get Pool by Token Address
        • Get Positions for a Pool
        • Get Positions for a Wallet
        • Get Liquidity Details of a Pool
      • Kamino
        • Get Borrow Details of a Wallet
        • Get Deposit Details of a Wallet
        • Get Reserve Details
      • Cross Marketplace Queries
        • Get active listings across marketplaces for a wallet
        • Get listings for a collection across marketplaces
        • Get floor price of a collection
      • Cross Defi Queries
        • Fetch Liquidity Pools for Token
      • Native Staking
        • Get Stakes for a Wallet
        • Get Stakes For Validator
      • Governance/Realms
        • Get DAO Token Owners
        • Get Proposals For Governing Mint
        • Get All Proposals For DAO
        • Get DAO Treasury Info
        • Get All Active Proposals For Wallet
      • Meteora
        • Get All LB Position Pairs
        • Get Position of a User Wallet
        • Get Pool by Token Addresses
        • Get All Deposits for a User
        • Get All Withdraws for a User
        • Get All Fees Claimed by a User
        • Get All User Positions and Deposits for a Pool
        • Get All User Positions and Withdrawals for a Pool
      • Fluxbeam
        • Get Pool by Address
        • Get Pool by Token Addresses
      • Drift
        • Get User account for Delegate
        • Get User accounts based on authority
        • Get User details based on Referrer
        • Get Borrow/Deposit Amount for an User
        • Get PrepPositions for an User Account
        • Getting OrderId and userOrderId
        • Get OpenOrders for a User Account
      • 🔥Pumpswap
        • 🔥Get Pool by Address
        • 🔥Get Pool by Creator Address
        • 🔥Get Pools by Token Addresses
      • Raydium Launchpad
        • Get Bonding Curve Details by Pool Address
        • Get All Pools for a Creator
        • Get Pools by Token Addresses
        • Get Migration details of a Pool
  • Solana APIs
    • API Reference
    • Transactions
      • Parsed Transaction Structure
      • Transaction APIs
        • History
        • Parse Signature
        • Parse Multiple Signatures
        • Send
        • Send Multiple
    • NFT
      • 🔥Create Gasless
      • Create
      • Read All
      • Burn
      • 🔥Burn Multiple NFTs V2
      • Update
      • 🔥Create NFT from Metadata
      • 🔥Read Wallet Nfts
      • 🔥Read Selected NFTs
      • 🔥Get NFT Owners
      • 🔥Update NFT Metadata Uri
      • 🔥Update V2
      • Search
      • Transfer
      • Transfer Multiple NFTs
      • Mint
      • Read
    • Wallet
      • Get Balance
      • Get Token Balance
      • Get All Tokens Balance
      • Get Portfolio
      • Resolve Address
      • Get All Domains
      • Get Stake Accounts
    • Fungible Tokens
      • Create
      • Mint
      • Burn
      • 🔥Update
      • Get Token Info
      • Transfer
      • Airdrop
  • 📘Dev Guides
    • 📀gRPC Case Studies
      • Pumpfun
        • Streaming and Parsing Pump.fun Transactions
        • Streaming and Parsing Pump.fun Accounts
        • Detecting new Token launches on Pump.fun
        • Detecting Buy/Sell Transactions on Pump.fun
        • Detecting tokens migrating from Pump.fun to Pump Swap AMM
      • PumpSwap AMM
        • Streaming and Parsing Pump Swap Transactions
        • Streaming and Parsing Pump Swap AMM Accounts
        • Detecting Buy/Sell Transactions on Pump Swap AMM
        • Detecting tokens migrating from Pump.fun to Pump Swap AMM
      • Raydium AMM
        • Streaming and Parsing Raydium AMM Transactions
        • Streaming and Parsing Raydium AMM Accounts
        • Detecting Buy/Sell Transactions on Raydium AMM
        • Detecting new Pools on Raydium AMM
      • Raydium CLMM
        • Streaming and Parsing Raydium CLMM Transactions
        • Streaming and Parsing Raydium CLMM Accounts
      • Raydium CPMM
        • Streaming and Parsing Raydium CPMM Transactions
        • Streaming and Parsing Raydium CPMM Accounts
      • Raydium Launchpad
        • Streaming and Parsing Raydium Launchpad Transactions
        • Streaming and Parsing Raydium Launchpad Accounts
Powered by GitBook
On this page
  • Modifying a Subscribe Request
  • Unsubscribing to Subscribed Channels
  • Closing a gRPC Connection

Was this helpful?

  1. Yellowstone gRPC Network
  2. gRPC Docs

Modifying & Unsubscribing

Modifying Yellowstone Subscribe Requests & Unsubscribing the Subscriptions

PreviousStreaming Blocks & BlocksMetaNextDeFI APIs

Last updated 11 days ago

Was this helpful?

  • Modifying Subscriptions: The subscription stream allows bi-directional communication. You can update your subscription filters dynamically by sending a new Subscribe request. For more details please check in "Getting Started" section.

  • Unsubscribing: Send an empty subscription request to stop receiving updates on all streams. This will keep the connection open for future subscriptions.

Modifying a Subscribe Request

Solana Yellowstone gRPC streams, while ideal for real-time data, often need dynamic adjustments. You can modify existing subscriptions without disconnecting the stream, allowing for real-time changes to filters or tracked addresses based on application needs or market shifts. This ensures uninterrupted data flow and immediate responsiveness to live blockchain events. For more details do checkout the section in the Section.

The complete code for updating subscribe requests is available on and on .

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
  ping?: SubscribeRequestPing;
}

const subscribedWalletsA: string[] = [
  "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
  "5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",
  "4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",
  "GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",
  "oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",
];

const subscribedWalletsB: string[] = [
  "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",
];

const subscribeRequest1: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    modifying_A: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: subscribedWalletsA,
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

// Subscribes to account changes for program-owned accounts of subscribedWalletsB
const subscribeRequest2: SubscribeRequest = {
  accounts: {
    modifying_B: {
      account: [],
      filters: [],
      owner: subscribedWalletsB,
    },
  },
  slots: {},
  transactions: {},
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {
    block: [],
  },
  entry: {},
  accountsDataSlice: [],
  ping: undefined,
  commitment: CommitmentLevel.PROCESSED,
};

/**
 * Dynamically updates the current stream subscription with new request parameters.
 */
async function updateSubscription(stream: any, args: SubscribeRequest) {
  try {
    stream.write(args);
  } catch (error) {
    console.error("Failed to send updated subscription request:", error);
  }
}

/**
 * Handles a single streaming session.
 * Automatically switches to a second subscription request after a timeout.
 */
async function handleStream(client: Client, args: SubscribeRequest) {
  const stream = await client.subscribe();

  // Waits for the stream to close or error out
  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (error) => {
      console.error("Stream Error:", error);
      reject(error);
      stream.end();
    });
    stream.on("end", resolve);
    stream.on("close", resolve);
  });

  // Automatically switch subscription after 10 seconds
  setTimeout(async () => {
    console.log("🔁 Switching to second subscription request...");
    await updateSubscription(stream, subscribeRequest2);
  }, 10000);

  // Handle incoming data
  stream.on("data", async (data) => {
    try {
      console.log("📦 Streamed Data:", data);
      // You can add more processing logic here
    } catch (error) {
      console.error("Error processing stream data:", error);
    }
  });

  // Send initial subscription request
  await new Promise<void>((resolve, reject) => {
    stream.write(args, (err: any) => (err ? reject(err) : resolve()));
  }).catch((reason) => {
    console.error("Initial stream write failed:", reason);
    throw reason;
  });

  await streamClosed;
}

/**
 * Starts the stream and continuously attempts to reconnect on errors.
 */
async function subscribeCommand(client: Client, args: SubscribeRequest) {
  while (true) {
    try {
      await handleStream(client, args);
    } catch (error) {
      console.error("Stream error. Retrying in 1 second...", error);
      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }
}

const client = new Client(process.env.GRPC_URL, process.env.X_TOKEN, undefined);

// Start streaming with the first subscription
subscribeCommand(client, subscribeRequest1);

Unsubscribing to Subscribed Channels

{
  "slots": {},
  "accounts": {},
  "transactions": {},
  "blocks": {},
  "blocksMeta": {},
  "accountsDataSlice": []
}

This action will unsubscribe from all currently subscribed channels. Please note that, it won't disrupt the connection itself, which means you can still establish new subscriptions in the future without needing to re-connect to the service.

Closing a gRPC Connection

import Client, {
  CommitmentLevel,
  SubscribeRequestAccountsDataSlice,
  SubscribeRequestFilterAccounts,
  SubscribeRequestFilterBlocks,
  SubscribeRequestFilterBlocksMeta,
  SubscribeRequestFilterEntry,
  SubscribeRequestFilterSlots,
  SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import bs58 from "bs58";

const PUBLIC_KEY_TO_LISTEN = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; //can be any address
const RUN_TIME = 5000; //decides how long to run
const GRPC_ENDPOINT = "https://grpc.ams.shyft.to"; //your gRPC endpoint
const X_TOKEN = ""; //your x-token

interface SubscribeRequest {
  accounts: { [key: string]: SubscribeRequestFilterAccounts };
  slots: { [key: string]: SubscribeRequestFilterSlots };
  transactions: { [key: string]: SubscribeRequestFilterTransactions };
  transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
  blocks: { [key: string]: SubscribeRequestFilterBlocks };
  blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
  entry: { [key: string]: SubscribeRequestFilterEntry };
  commitment?: CommitmentLevel | undefined;
  accountsDataSlice: SubscribeRequestAccountsDataSlice[];
}

const client = new Client(GRPC_ENDPOINT, X_TOKEN, undefined);

const req: SubscribeRequest = {
  accounts: {},
  slots: {},
  transactions: {
    raydiumLiquidityPoolV4: {
      vote: false,
      failed: false,
      signature: undefined,
      accountInclude: [PUBLIC_KEY_TO_LISTEN],
      accountExclude: [],
      accountRequired: [],
    },
  },
  transactionsStatus: {},
  entry: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  commitment: CommitmentLevel.PROCESSED,
};

async function handleStream(client: Client, args: SubscribeRequest) {
  console.log(`Subscribing and starting stream...`);
  const stream = await client.subscribe();
  console.log(`Streaming data and printing transactions...`);

  const streamClosed = new Promise<void>((resolve, reject) => {
    stream.on("error", (err: any) => {
      if (err.code === 1 || err.message.includes("Cancelled")) {
        //this indicates stream was cancelled by user.
        console.log("✅ Stream cancelled by user");
        resolve();
      } else {
        console.error("❌ Stream error:", err);
        reject(err);
      }
    });

    stream.on("close", () => {
      console.log("Stream closed.");
      resolve();
    });
  });

  stream.on("data", (data) => {
    if (data.transaction) {
      console.log(
        "Received: ",
        bs58.encode(data.transaction.transaction.signature),
      );
    }
  });

  // Subscribe
  stream.write(args);

  // Cancel after timeout
  setTimeout(() => {
    console.log("Cancelling stream...");
    try {
      stream.cancel();
      /*
       * stream.end() or stream.destroy();
       * Cancels the stream from the user end, and closes the stream.
       *
       * A "Cancelled on client" error (code 1) will be thrown once this is called,
       * which be caught in stream.on("error") function.
       * This is one of the ways to cancel the stream, and clear your connections, when using Shyft gRPCs.
       *
       * For more information on clearing connections, please check the FAQ section of gRPC docs
       * https://docs.shyft.to/solana-yellowstone-grpc/grpc-docs#solana-grpc-faq
       */
    } catch (error) {
      if (error.code !== "ERR_STREAM_PREMATURE_CLOSE") {
        console.error("Stream cancel error:", error);
      }
    }
  }, RUN_TIME);

  await streamClosed;
}

async function subscribeCommand(client: Client, args: SubscribeRequest) {
  await handleStream(client, args);
}

subscribeCommand(client, req);

This unsubscribes to your currently Subscribed Channels. However please note that this does not close the connection, and you may hit due to this too.

Once the subscription stream is active, we can close it using stream.cancel() method. Please visit in "" Section to know more.

The complete code of closing a connection is available on .

⚡
Modifying Subscribe Request
Modifying your Subscribe Request
Getting Started
GitHub
Replit here
IP related limits
Replit here
Getting Started
closing a connection