Pakistan Ka Pehla Professional Trading Bot CourseModule 3

3.4Hands-On Lab: Complete Data Pipeline Build

30 min 5 code blocks Quiz (4Q)

Hands-On Lab: Complete Data Pipeline Build

Assalam-o-alaikum, traders! Umeed hai pichle lessons clear honge.

Yeh lesson iss module ka sab se important lesson hai. Kyun? Kyun ke aaj hum woh exact data pipeline banayenge jo mera Polymarket Oracle, theta_sniper, live production mein use karta hai. No theory, no BS, seedha code jo aap copy-paste karke apne bot mein daal sakte hain. Yahi woh engine hai jo kachra markets ko filter karke sirf sone ke danay (golden opportunities) aapke AI ke saamne rakhta hai.

COURSE: Pakistan Ka Pehla Professional Trading Bot Course

MODULE 3: Market Data Pipeline — API Se Real-Time Data Kaise Fetch Karein

LESSON 3.4: Hands-On Lab: Complete Data Pipeline Build

Pichle Lessons ka Quick Review

Ab tak hum ne 3 alag alag cheezein seekhi hain:

  1. Scanner (scanner.py): API se saari active markets ka data uthana.
  2. Filters: Fuzool markets ko conditions (expiry time, price, volume) laga kar side pe karna.
  3. Cache (db.py): Jin markets ko pehle analyze kar chuke hain, unko baar baar check na karna taake API calls aur AI ka paisa bache.

Aaj hum in teeno cheezon ko ek single, powerful function mein jor denge: get_candidates(). Yeh function aapke bot ka dil (heart) hai. Isne aacha kaam kiya, toh bot paisa banayega. Isne ghalat markets utha li, toh AI bhi confuse ho jayega aur aapka loss hoga.

The Core Engine: get_candidates() Function

Chalo bhai, code dekhte hain. Yeh function hamare strategies/theta_sniper.py module ka core hissa hai. Iska maqsad hai hazaron markets mein se sirf woh 10-15 "best" candidates nikalna jin par hamara AI apna dimagh lagaye.

Yeh raha woh function jisko hum abhi line-by-line samjhenge.

python
# Yeh function aapke bot ki "ankhain" aur "dimaagh" ka pehla hissa hai.
# It finds the best markets to analyze.

def get_candidates(max_hours=96, min_price=60, max_price=97, min_vol=3000):
    """Complete pipeline: Fetch → Filter → Cache → Score → Rank"""
    # 1. Fetch
    markets = fetch_active_markets(200)
    print(f'[PIPELINE] Fetched {len(markets)} markets')
    
    # 2. Filter
    expiring = filter_expiring_soon(markets, 24, max_hours)
    priced = filter_price_range(expiring, min_price, max_price)
    liquid = filter_volume(priced, min_vol)
    
    # 3. Cache check
    fresh = []
    for m in liquid:
        mid = m.get('conditionId', m.get('id'))
        if should_reanalyze(mid, m['_yes_price']):
            fresh.append(m)
            update_cache(mid, m['_yes_price'])
    
    # 4. Sort by soonest expiry (fastest capital rotation)
    fresh.sort(key=lambda x: x['_hours_to_expiry'])
    
    print(f'[PIPELINE] {len(markets)} → {len(liquid)} liquid → {len(fresh)} fresh candidates')
    return fresh[:15]  # Top 15 candidates for AI analysis

Dekhne mein simple lag raha hai, lekin iske har step ke peeche ek solid logic hai. Let's break it down.

Step 1: Fetch - Market Se Saara Maal Uthao

python
# 1. Fetch
markets = fetch_active_markets(200)
print(f'[PIPELINE] Fetched {len(markets)} markets')

Yeh line seedha hamare scanner.py mein likhe hue function ko call karti hai. Hum Polymarket API ko kehte hain, "Bhai, jitni bhi active markets hain, un mein se top 200 (by volume) utha ke le aao."

  • fetch_active_markets(200): Yeh function API call maarta hai. 200 ek aacha number hai. Is se kam loge toh shayad aachi opportunity miss ho jaye. Is se zyada loge toh agle steps mein fuzool data process karne mein time lagega.
  • print(): Apne bot mein logging daalna bohot zaroori hai. Aapko har step pe pata hona chahiye ke kya ho raha hai. Yahan hum print kar rahe hain ke total kitni markets uthayi hain.

Step 2: Filter - Kachra Saaf Karo

Ab hamare paas 200 markets ka data hai. In mein se 95% hamare liye bekaar hain. Koi market 6 mahine baad expire hogi, kisi mein 2 dollar ka volume hai, kisi ka price 1 cent hai. Aisi markets pe AI ka time aur paisa zaya karna bewakoofi hai.

Toh hum ek "filter chain" chalate hain. Output of first filter becomes input of the second.

python
# 2. Filter
expiring = filter_expiring_soon(markets, 24, max_hours)
priced = filter_price_range(expiring, min_price, max_price)
liquid = filter_volume(priced, min_vol)
  1. filter_expiring_soon(markets, 24, max_hours):

    • Logic: Sirf woh markets rakho jo agle 24 ghante se 96 ghante (4 din) ke andar expire ho rahi hain.
    • Kyun? Hum "theta decay" strategy pe kaam kar rahe hain. Iska matlab hai hum time value se paisa banate hain. Jo market jaldi expire hogi, uski price tezi se 100 ya 0 ki taraf jayegi. Capital bhi jaldi free hoga agli trade ke liye. 6 mahine wait kaun karega bhai?
  2. filter_price_range(expiring, min_price, max_price):

    • Logic: Ab jo markets bachi hain, un mein se sirf woh rakho jinka 'YES' price 60 cents aur 97 cents ke beech mein hai.
    • Kyun? Hum high-probability events pe bet kar rahe hain. 60c se upar ka matlab hai market already "likely to happen" maan rahi hai. 97c se upar hum isliye nahi jaate kyunke wahan risk/reward aacha nahi hota. 3 cent jeetne ke liye 97 cent risk pe lagana aqalmandi nahi.
  3. filter_volume(priced, min_vol):

    • Logic: Aakhri filter. Sirf woh markets rakho jin mein kam se kam $3000 (yaani taqreeban 8-9 lakh PKR) ka trading volume hai.
    • Kyun? Liquidity is king! Agar market mein volume hi nahi hai, toh aap apne shares khareed ya bech hi nahi sakenge. Aap $500 ka order daaloge aur price aasmaan pe chali jayegi (slippage). Low volume markets mein manipulation bhi asaan hoti hai. Hum sirf un markets mein khelenge jahan aacha khasa paisa laga hua hai.

Is step ke baad, 200 markets shayad 20-30 reh jayengi. Yeh hain hamari "potential" opportunities.

💡 Pro Tip: Dynamic Filtering

Bhai, yeh jo min_price, max_price, min_vol ke numbers hain na, yeh pathar pe lakeer nahi hain. Main apne production bot mein inko market conditions ke hisaab se adjust karta hoon. For example, agar market bohot volatile hai (jaise election ke din), toh main min_price ko 70c kar deta hoon aur max_price ko 95c. Is se risk kam hojata hai. Yeh advanced concept hai, lekin abhi se dimaagh mein rakho. Apne bot ko itna smart banao ke woh environment ke hisaab se apne parameters khud change kar sake.

Step 3: Cache Check - Purani Cheezon ko Ignore Karo

Ab jo 20-30 markets bachi hain, ho sakta hai in mein se 15 ko hamare bot ne 5 minute pehle hi analyze kiya ho. Agar market price mein koi khaas tabdeeli nahi aayi, toh us par dobara AI (Gemini/Haiku) ki expensive API call maarna fazool hai. Yahan hamara cache, yaani db.py ka system, kaam aata hai.

python
# 3. Cache check
fresh = []
for m in liquid:
    mid = m.get('conditionId', m.get('id'))
    if should_reanalyze(mid, m['_yes_price']):
        fresh.append(m)
        update_cache(mid, m['_yes_price'])
  • Logic: Hum har market ke liye should_reanalyze() function call karte hain.
  • should_reanalyze(market_id, current_price): Yeh function database mein check karta hai:
    1. Kya hum ne is market ko pehle dekha hai?
    2. Agar haan, toh pichli dafa jab dekha tha, uss waqt ke price aur ab ke price mein kitna farq hai?

Preventing duplicate analysis ensures efficient resource usage and avoids redundant AI calls.

📺 Recommended Videos & Resources

  • SQLite Database Design — Lightweight database for bots
    • Type: Official Website
    • Link description: Learn SQLite for persistent data storage
  • Python sqlite3 Module — Database operations in Python
    • Type: Python Official Docs
    • Link description: Reference for SQL queries and transactions
  • Query Optimization & Indexing — Making database faster
    • Type: YouTube
    • Link description: Search "SQL indexing query optimization"
  • State Management in Bots — Tracking bot state
    • Type: Wikipedia
    • Link description: Learn about state machines for trading bot logic
  • Change Detection Patterns — Identifying price movements
    • Type: YouTube
    • Link description: Search "change detection algorithms Python"

🎯 Mini-Challenge

5-Minute Practical Task: Create a simple SQLite table called "market_analysis_cache" with columns: market_id, last_price, last_analyzed_time. Write functions to: (1) store an analysis result, (2) retrieve the last analysis for a market, (3) check if re-analysis is needed (if price changed >5%). Test with sample data.

🖼️ Visual Reference

code
📊 Market State Tracking & Re-analysis Decision
┌──────────────────────────────┐
│ New Market Detected          │
│ market_id: POL_123           │
│ current_price: 75%           │
└─────────────┬────────────────┘
              │
              ▼
   ┌──────────────────────┐
   │ Query Cache:         │
   │ Is market_123        │
   │ in database?         │
   └──────────────────────┘
        │          │
        │          └─────────────┐
        │ (Not in cache)        │ (In cache)
        ▼                       ▼
    ┌────────────┐      ┌──────────────┐
    │ NEW: Add   │      │ CHECK:       │
    │ & Analyze  │      │ Price moved  │
    │            │      │ > 5%?        │
    └────────────┘      └──────────────┘
                             │       │
                     ┌───────┘       └──────┐
                     │ (No)                 │ (Yes)
                     ▼                      ▼
                  ┌────────┐         ┌──────────────┐
                  │ SKIP   │         │ RE-ANALYZE   │
                  │ (Cache)│         │ (Fresh AI)   │
                  └────────┘         └──────────────┘

Lesson Summary

5 runnable code examples4-question knowledge check below

Quiz: Hands-On Lab: Complete Data Pipeline Build

4 questions to test your understanding. Score 60% or higher to pass.