PostgreSQL is one of the most popular relational databases. This guide shows you how to load SolArchive parquet files into PostgreSQL so you can integrate blockchain data with your existing data infrastructure.
First, install and configure PostgreSQL:
# Install PostgreSQL (macOS)
brew install postgresql@16
brew services start postgresql@16
# Create database
createdb solarchive
# Or use Docker
docker run -d \
--name postgres \
-e POSTGRES_DB=solarchive \
-e POSTGRES_PASSWORD=password \
-p 5432:5432 \
postgres:16 Create simplified tables for transactions and tokens:
-- Create transactions table
CREATE TABLE transactions (
signature TEXT PRIMARY KEY,
block_slot BIGINT NOT NULL,
block_timestamp TIMESTAMP NOT NULL,
fee BIGINT NOT NULL,
status TEXT NOT NULL,
err TEXT,
account_keys TEXT[] NOT NULL,
num_accounts INTEGER NOT NULL
);
-- Create index on timestamp for time-based queries
CREATE INDEX idx_transactions_timestamp ON transactions(block_timestamp);
CREATE INDEX idx_transactions_slot ON transactions(block_slot);
-- Create tokens table
CREATE TABLE tokens (
mint TEXT PRIMARY KEY,
symbol TEXT,
name TEXT,
decimals SMALLINT NOT NULL,
supply TEXT,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_tokens_symbol ON tokens(symbol); We'll use DuckDB to read parquet files and psycopg2 to insert into PostgreSQL:
uv add duckdb psycopg2-binary Here's the complete loading script:
import duckdb
import psycopg2
from psycopg2.extras import execute_batch
from pathlib import Path
# PostgreSQL connection
PG_CONN = psycopg2.connect(
host="localhost",
database="solarchive",
user="postgres",
password="password"
)
def load_transactions_partition(partition_dir: Path):
"""Load transaction parquet files into PostgreSQL."""
parquet_files = sorted(partition_dir.glob("*.parquet"))
print(f"Loading {len(parquet_files)} files from {partition_dir.name}...")
# Use DuckDB to read parquet and convert to rows
con = duckdb.connect()
total_rows = 0
for parquet_file in parquet_files:
# Read simplified schema from parquet
query = f"""
SELECT
signature,
block_slot,
block_timestamp,
fee,
status,
err,
account_keys,
list_count(account_keys) as num_accounts
FROM read_parquet('{parquet_file}')
"""
result = con.execute(query).fetchall()
# Insert into PostgreSQL in batches
cursor = PG_CONN.cursor()
execute_batch(cursor, """
INSERT INTO transactions
(signature, block_slot, block_timestamp, fee, status, err, account_keys, num_accounts)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (signature) DO NOTHING
""", result, page_size=1000)
PG_CONN.commit()
cursor.close()
total_rows += len(result)
print(f" ✓ {parquet_file.name}: {len(result):,} rows")
con.close()
print(f" Total: {total_rows:,} rows loaded\n")
def load_tokens_partition(partition_dir: Path):
"""Load token parquet files into PostgreSQL."""
parquet_files = sorted(partition_dir.glob("*.parquet"))
print(f"Loading {len(parquet_files)} files from {partition_dir.name}...")
con = duckdb.connect()
total_rows = 0
for parquet_file in parquet_files:
query = f"""
SELECT mint, symbol, name, decimals, supply, updated_at
FROM read_parquet('{parquet_file}')
"""
result = con.execute(query).fetchall()
cursor = PG_CONN.cursor()
execute_batch(cursor, """
INSERT INTO tokens (mint, symbol, name, decimals, supply, updated_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (mint) DO UPDATE SET
symbol = EXCLUDED.symbol,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
""", result, page_size=1000)
PG_CONN.commit()
cursor.close()
total_rows += len(result)
print(f" ✓ {parquet_file.name}: {len(result):,} rows")
con.close()
print(f" Total: {total_rows:,} rows loaded\n")
def main():
"""Load SolArchive data into PostgreSQL."""
data_dir = Path("data")
# Load transactions
txs_dir = data_dir / "txs"
if txs_dir.exists():
print("Loading TRANSACTIONS dataset:")
partitions = sorted([d for d in txs_dir.iterdir() if d.is_dir()])
for partition_dir in partitions:
load_transactions_partition(partition_dir)
# Load tokens
tokens_dir = data_dir / "tokens"
if tokens_dir.exists():
print("Loading TOKENS dataset:")
partitions = sorted([d for d in tokens_dir.iterdir() if d.is_dir()])
for partition_dir in partitions:
load_tokens_partition(partition_dir)
# Show stats
cursor = PG_CONN.cursor()
cursor.execute("SELECT COUNT(*) FROM transactions")
tx_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM tokens")
token_count = cursor.fetchone()[0]
cursor.close()
print(f"\n✅ PostgreSQL loaded:")
print(f" Transactions: {tx_count:,} rows")
print(f" Tokens: {token_count:,} rows")
PG_CONN.close()
if __name__ == "__main__":
main() Run it:
uv run load_postgres.py Once loaded, you can query with standard PostgreSQL:
-- Find wallet transactions in PostgreSQL
SELECT
block_timestamp,
signature,
fee / 1e9 as fee_sol,
status
FROM transactions
WHERE 'YOUR_WALLET_ADDRESS' = ANY(account_keys)
ORDER BY block_timestamp DESC
LIMIT 100; Use PostgreSQL when:
Use ClickHouse when:
With data in PostgreSQL, you can: