Security Tool Integration Guides
Overview
Port Six integrates into your existing security stack. No new portal to check. Production-ready code examples for the most common platforms.
Splunk
Method 1: External Lookup (Recommended)
Use Splunk's external lookup capability to enrich events in real-time.
Step 1: Create External Lookup Script
# /opt/splunk/etc/apps/portsix/bin/portsix_lookup.py
import sys
import json
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.portsix.io/v1"
def enrich_ip(ip):
response = requests.get(
f"{BASE_URL}/ip/{ip}",
headers={"X-API-Key": API_KEY}
)
if response.status_code == 200:
data = response.json()
threat_intel = data.get("threat_intel") or {}
return {
"ip": ip,
"risk_score": data.get("risk_score", 0),
"tags": ",".join(threat_intel.get("tags", [])),
"confidence": threat_intel.get("confidence", 0)
}
return {"ip": ip, "risk_score": 0, "tags": "", "confidence": 0}
if __name__ == "__main__":
for line in sys.stdin:
ip = line.strip()
result = enrich_ip(ip)
print(json.dumps(result))Step 2: Configure transforms.conf
# /opt/splunk/etc/apps/portsix/local/transforms.conf
[portsix_ip_lookup]
external_cmd = portsix_lookup.py ip
fields_list = ip,risk_score,tags,confidenceStep 3: Use in SPL Queries
# Enrich firewall logs
index=firewall sourcetype=palo_alto
| lookup portsix_ip_lookup ip AS dest_ip OUTPUT risk_score, tags, confidence
| where risk_score > 60
| stats count by dest_ip, risk_score, tags
| sort -risk_scoreMethod 2: Saved Search with REST API
# Create saved search that runs every 5 minutes
index=firewall earliest=-5m
| dedup dest_ip
| eval api_url="https://api.portsix.io/api/v1/ip/".dest_ip
| map maxsearches=1000 search="| rest $api_url$ auth=YOUR_API_KEY"
| where risk_score > 60
| outputlookup portsix_threat_ips.csv
# Use the lookup in correlation searches
index=firewall
| lookup portsix_threat_ips.csv dest_ip
| where isnotnull(risk_score)
| stats count by dest_ip, user, applicationAutomated Alerting
# Create alert for critical threats
index=firewall earliest=-15m
| lookup portsix_ip_lookup ip AS dest_ip OUTPUT risk_score, tags, confidence
| where risk_score > 80
| stats count, values(tags) as threat_types by dest_ip, user
| sendemail to="[email protected]" subject="Critical Threat Detected"Elastic Stack (ELK)
Method 1: Logstash Filter Plugin
Step 1: Create Logstash Filter
# /etc/logstash/conf.d/30-portsix-enrichment.conf
filter {
if [dest_ip] {
http {
url => "https://api.portsix.io/api/v1/ip/%{dest_ip}"
headers => {
"X-API-Key" => "YOUR_API_KEY"
}
target_body => "portsix"
add_field => {
"threat_intel_risk_score" => "%{[portsix][risk_score]}"
"threat_intel_tags" => "%{[portsix][threat_intel][tags]}"
"threat_intel_confidence" => "%{[portsix][threat_intel][confidence]}"
}
}
}
}Step 2: Kibana Visualization
# Create index pattern matching enriched logs
GET /firewall-*/_search
{
"query": {
"bool": {
"must": [
{ "range": { "threat_intel_risk_score": { "gte": 60 }}}
]
}
},
"aggs": {
"top_threats": {
"terms": { "field": "dest_ip" },
"aggs": {
"avg_risk": { "avg": { "field": "threat_intel_risk_score" }}
}
}
}
}Method 2: Elasticsearch Ingest Pipeline
# Create ingest pipeline
PUT _ingest/pipeline/portsix-enrich
{
"description": "Enrich IPs with Port Six threat intelligence",
"processors": [
{
"script": {
"lang": "painless",
"source": """
def ip = ctx.dest_ip;
def apiUrl = 'https://api.portsix.io/api/v1/ip/' + ip;
def conn = apiUrl.toURL().openConnection();
conn.setRequestProperty("X-API-Key", "YOUR_API_KEY");
def response = new JsonSlurper().parse(conn.getInputStream());
ctx.threat_intel = response;
"""
}
}
]
}
# Apply to index
PUT /firewall-logs/_settings
{
"index.default_pipeline": "portsix-enrich"
}Palo Alto Firewall (EDL)
External Dynamic List (EDL)
Step 1: Generate Blocklist
#!/bin/bash
# /usr/local/bin/sync_portsix_blocklist.sh
API_KEY="YOUR_API_KEY"
OUTPUT_DIR="/var/www/html/edl"
# Fetch C2 infrastructure
curl -H "X-API-Key: $API_KEY" \
"https://api.portsix.io/api/v1/feeds/behavior/c2?limit=10000" \
| jq -r '.indicators[].value' > $OUTPUT_DIR/portsix_c2_ips.txt
# Fetch phishing domains
curl -H "X-API-Key: $API_KEY" \
"https://api.portsix.io/api/v1/feeds/behavior/phishing?limit=10000" \
| jq -r '.indicators[].value' > $OUTPUT_DIR/portsix_phishing_domains.txt
# Fetch ransomware infrastructure
curl -H "X-API-Key: $API_KEY" \
"https://api.portsix.io/api/v1/feeds/behavior/ransomware?limit=10000" \
| jq -r '.indicators[].value' > $OUTPUT_DIR/portsix_ransomware.txt
# Run every 6 hours
# 0 */6 * * * /usr/local/bin/sync_portsix_blocklist.shStep 2: Configure Palo Alto EDL
# Via GUI:
Objects > External Dynamic Lists > Add
Name: Port Six_C2_Blocklist
Type: IP List
Source: http://YOUR_SERVER/edl/portsix_c2_ips.txt
Check for updates: Every 6 hours
# Apply to security policy:
Policies > Security > Add Rule
Source: any
Destination: Port Six_C2_Blocklist
Action: Deny
Log: YesStep 3: Automated Response via API
#!/usr/bin/env python3
# Real-time blocking via Palo Alto API
import requests
import xml.etree.ElementTree as ET
PAN_HOST = "firewall.company.com"
PAN_API_KEY = "YOUR_PAN_API_KEY"
PORTSIX_API_KEY = "YOUR_PORTSIX_API_KEY"
def block_ip(ip, tags):
# Add to dynamic address group
xml_payload = f"""
<uid-message>
<type>update</type>
<payload>
<register>
<entry ip="{ip}">
<tag>
<member>portsix-blocked</member>
<member>{tags[0]}</member>
</tag>
</entry>
</register>
</payload>
</uid-message>
"""
response = requests.post(
f"https://{PAN_HOST}/api/",
params={
"type": "user-id",
"key": PAN_API_KEY,
"cmd": xml_payload
},
verify=False
)
return response.status_code == 200
# Query Port Six for new C2 IPs in last hour
response = requests.get(
"https://api.portsix.io/api/v1/intel/observables",
params={
"tag": "behavior:c2",
"first_seen_after": "2025-10-28T14:00:00Z"
},
headers={"X-API-Key": PORTSIX_API_KEY}
)
for observable in response.json()["results"]:
if observable["type"] == "ip":
block_ip(observable["value"], observable["tags"])
print(f"Blocked {observable['value']}")Microsoft Sentinel
Azure Logic App for Enrichment
Step 1: Create Logic App
{
"definition": {
"$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
"triggers": {
"When_a_new_sentinel_incident_is_created": {
"type": "ApiConnectionWebhook",
"inputs": {
"host": {
"connection": {
"name": "@parameters('$connections')['azuresentinel']['connectionId']"
}
},
"path": "/incident-creation"
}
}
},
"actions": {
"Parse_IPs_from_incident": {
"type": "ParseJson",
"inputs": {
"content": "@triggerBody()?['properties']?['relatedEntities']",
"schema": {}
}
},
"For_each_IP": {
"type": "Foreach",
"foreach": "@body('Parse_IPs_from_incident')?['entities']",
"actions": {
"Enrich_with_Port Six": {
"type": "Http",
"inputs": {
"method": "GET",
"uri": "https://api.portsix.io/api/v1/ip/@{items('For_each_IP')?['address']}",
"headers": {
"X-API-Key": "YOUR_API_KEY"
}
}
},
"Add_comment_to_incident": {
"type": "ApiConnection",
"inputs": {
"host": {
"connection": {
"name": "@parameters('$connections')['azuresentinel']['connectionId']"
}
},
"method": "post",
"path": "/Incidents/Comment",
"body": {
"incidentArmId": "@triggerBody()?['id']",
"message": "Port Six Enrichment:\n Risk Score: @{body('Enrich_with_Port Six')?['risk_score']}\n Tags: @{join(body('Enrich_with_Port Six')?['tags'], ', ')}"
}
}
}
}
}
}
}
}KQL Query for Correlation
// Create watchlist from Port Six data
// Upload CSV from: https://api.portsix.io/api/v1/intel/observables?format=csv&limit=100000
// Correlate Sentinel logs with Port Six watchlist
CommonSecurityLog
| where TimeGenerated > ago(1h)
| join kind=inner (
_GetWatchlist('Port Six-Malicious-IPs')
| project SearchKey, Tags, RiskScore
) on $left.DestinationIP == $right.SearchKey
| where RiskScore > 70
| summarize count(), make_set(Tags) by DestinationIP, DeviceVendor
| order by count_ descChronicle SIEM
UDM Enrichment Rule
# Chronicle enrichment list
# Upload via: Settings > Reference Lists > Create
rule portsix_ip_enrichment {
meta:
author = "Security Team"
description = "Enrich IPs with Port Six threat intel"
events:
$e.metadata.event_type = "NETWORK_CONNECTION"
$ip = $e.target.ip
match:
$ip in %portsix_malicious_ips
outcome:
$risk_score = max(
if($ip in %portsix_critical, 90) +
if($ip in %portsix_high, 70) +
if($ip in %portsix_medium, 50)
)
$e.metadata.threat.severity = $risk_score
$e.security_result.threat_name = "Port Six Threat Intelligence Match"
condition:
$e
}YARA-L Detection Rule
rule portsix_c2_connection {
meta:
author = "Security Team"
description = "Detect connections to Port Six-identified C2 servers"
severity = "HIGH"
events:
$network.metadata.event_type = "NETWORK_CONNECTION"
$network.target.ip = $c2_ip
$c2_ip in %portsix_c2_list
match:
$c2_ip over 1h
outcome:
$risk_score = 90
$severity = "HIGH"
$alert_message = "Connection to known C2 server: " + $c2_ip
condition:
$network
}API Rate Limiting & Best Practices
Rate Limit Headers
HTTP/1.1 200 OK
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 997
X-RateLimit-Reset: 1698854400
# When rate limited:
HTTP/1.1 429 Too Many Requests
Retry-After: 3600
X-RateLimit-Reset: 1698854400Exponential Backoff
import time
import requests
def api_call_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
raise Exception("Max retries exceeded")Caching Recommendations
# Redis caching for frequently queried IPs
import redis
import json
cache = redis.Redis(host='localhost', port=6379, db=0)
def get_enrichment(ip):
# Check cache first (TTL: 1 hour)
cached = cache.get(f"portsix:ip:{ip}")
if cached:
return json.loads(cached)
# Fetch from API
response = requests.get(
f"https://api.portsix.io/api/v1/ip/{ip}",
headers={"X-API-Key": API_KEY}
)
data = response.json()
# Cache for 1 hour
cache.setex(f"portsix:ip:{ip}", 3600, json.dumps(data))
return dataProcessing Multiple IPs
# Process multiple IPs with rate limiting
import requests
import time
def enrich_ips(ips):
results = []
for ip in ips:
response = requests.get(
f"https://api.portsix.io/api/v1/ip/{ip}",
headers={"X-API-Key": API_KEY}
)
results.append(response.json())
# Respect rate limits (adjust based on your tier)
time.sleep(0.1)
return resultsTesting & Validation
Health Check
# Verify API connectivity
curl -H "X-API-Key: YOUR_API_KEY" \
https://api.portsix.io/api/v1/health
# Expected response:
{
"status": "ok",
"version": "1.0",
"rate_limit": {
"limit": 1000,
"remaining": 999,
"reset": 1700000000
}
}Integration Testing
#!/bin/bash
# Test integration with known threat IP
TEST_IP="185.220.101.45" # Known TOR exit node
echo "Testing Port Six enrichment..."
RESULT=$(curl -s -H "X-API-Key: $API_KEY" \
"https://api.portsix.io/api/v1/ip/$TEST_IP")
RISK_SCORE=$(echo $RESULT | jq -r '.risk_score // 0')
HAS_THREAT_INTEL=$(echo $RESULT | jq -r '.threat_intel != null')
if [ "$HAS_THREAT_INTEL" = "true" ] && [ "$RISK_SCORE" -gt 0 ]; then
echo "Integration working correctly"
echo " Risk Score: $RISK_SCORE"
echo " Tags: $(echo $RESULT | jq -r '.threat_intel.tags // []')"
exit 0
else
echo "Integration test failed - no threat intel returned"
exit 1
fiSupport & Resources
- API Reference: Full endpoint documentation
- Use Cases: Real-world integration scenarios
- Pricing: Plans and credit tiers
- Support: [email protected]