Congestion Detection Using Isolation Forest

# █ █ █│ █
# █ █ █ │ █ █
# █ █ █ │ █ █ █ ╭─╮
# █ █ █ │ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ █ █╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ ╰─────╯
# ──────┴──────────────────────────────────────────────────────────────
# T1 T2 T3 T4 T5 T6
#
# Name : Congestion Detection Using Isolation Forest
declare lower;
# Inputs for the Isolation Forest
input numTrees = 100; # Number of trees in the forest
input subSampleSize = 256; # Size of the subsample used for building each tree
input maxDepth = 10; # Maximum depth for isolation trees
input Data = close; # Data series to analyze
input movingAverageLength = 20; # Length for moving average and slope calculation
input atrLength = 14; # Length for ATR calculation
input congestionThreshold = 0.4; # Threshold for congestion detection
# --- Feature Engineering ---
def rollingATR = Average(TrueRange(high, close, low), atrLength);
def priceRange = high - low;
def movingAverage = Average(close, movingAverageLength);
def movingAverageSlope = (movingAverage - movingAverage[movingAverageLength]) / movingAverageLength;
# --- Feature Engineering ---
def feature = (priceRange + rollingATR + AbsValue(movingAverageSlope)) / 3;
# Ensure feature has valid values for all bars
def adjustedFeature = if !isnan(feature) then feature else adjustedFeature[1];
script RandomValueBetween {
input min = 0.0;
input max = 1.0;
plot value = min + (max - min) * Random();
}
# --- Isolation Forest Calculation ---
script IsolationTree {
input dataSeries = close;
input subSampleSize = 256;
input maxDepth = 10;
def randomIndex = Ceil(RandomValueBetween(min = 0, max = subSampleSize - 1));
def splitValue = GetValue(dataSeries, -randomIndex);
def depth = fold i = 0 to maxDepth with p = 0 while p < maxDepth do
if GetValue(dataSeries, -i) < splitValue then p + 1 else p;
plot finalDepth = depth;
}
script AvgTreeDepth {
input dataSeries = close;
input subSampleSize = 256;
input maxDepth = 10;
input numTrees = 100;
def totalDepth = fold tree = 1 to numTrees with d = 0 do
d + IsolationTree(dataSeries, subSampleSize, maxDepth);
plot return = totalDepth / numTrees;
}
# Calculate anomaly score
def avgDepth = AvgTreeDepth(adjustedFeature, subSampleSize, maxDepth, numTrees);
def anomalyScore = 1 - avgDepth / maxDepth;
# --- Normalized Anomaly Score ---
def smoothedAnomalyScore = CompoundValue(1, 0.2 * anomalyScore + 0.8 * smoothedAnomalyScore[1], anomalyScore);
# --- Congestion Detection ---
def isCongestion = smoothedAnomalyScore < congestionThreshold;
# --- Plot Results ---
plot AnomalyScorei = smoothedAnomalyScore;
AnomalyScorei.AssignValueColor(if isCongestion then Color.YELLOW else Color.BLUE);
def scorefilter = inertia(smoothedAnomalyScore,25);
def scorefilterFinal = inertia(scorefilter,5);
plot AnomalyScoreFilter = scorefilterFinal ;
AnomalyScoreFilter.AssignValueColor(if AnomalyScoreFilter <congestionThreshold then Color.YELLOW else Color.BLUE);
def nan = double.nan;
input rollingWindow = 14; # Rolling window for feature calculations
# Separate Metrics for Signal and Non-Signal Areas
def signalATR = if isCongestion then rollingATR else nan;
def nonSignalATR = if !isCongestion then rollingATR else nan;
def signalPriceRange = if isCongestion then priceRange else nan;
def nonSignalPriceRange = if !isCongestion then priceRange else nan;
# Count of Signal and Non-Signal Areas
def countSignal = if isCongestion then 1 else nan;
def countNonSignal = if !isCongestion then 1 else nan;
# Summing Values and Counts
def totalSignalATR = CompoundValue(1,if !isnan(signalATR) then totalSignalATR[1] + signalATR else totalSignalATR[1] , 0);
def totalNonSignalATR = CompoundValue(1, if !isnan(nonSignalATR) then totalNonSignalATR[1] + nonSignalATR else totalNonSignalATR[1], 0);
def totalSignalPriceRange = CompoundValue(1,if !isnan(signalPriceRange) then totalSignalPriceRange[1] + signalPriceRange else totalSignalPriceRange[1] , 0);
def totalNonSignalPriceRange = CompoundValue(1,if !isnan(nonSignalPriceRange) then totalNonSignalPriceRange[1] + nonSignalPriceRange else totalNonSignalPriceRange[1], 0);
def totalCountSignal = CompoundValue(1,if !isnan(countSignal) then totalCountSignal[1] + countSignal else totalCountSignal[1], 0);
def totalCountNonSignal = CompoundValue(1,if !isnan(countNonSignal) then totalCountNonSignal[1] + countNonSignal else totalCountNonSignal[1] , 0);
# Overall Averages
def avgSignalATR = if totalCountSignal > 0 then totalSignalATR / totalCountSignal else avgSignalATR [1];
def avgNonSignalATR = if totalCountNonSignal > 0 then totalNonSignalATR / totalCountNonSignal else avgNonSignalATR[1];
def avgSignalPriceRange = totalSignalPriceRange / totalCountSignal;
def avgNonSignalPriceRange = totalNonSignalPriceRange / totalCountNonSignal;
# Add Labels for Review
AddLabel(yes, "Avg ATR (Congestion): " + Round(avgSignalATR, 2), Color.GREEN);
AddLabel(yes, "Avg ATR (Non-Congestion): " + Round(avgNonSignalATR, 2), Color.RED);
AddLabel(yes, "Avg Price Range (Congestion): " + Round(avgSignalPriceRange, 2), Color.CYAN);
AddLabel(yes, "Avg Price Range (Non-Congestion): " + Round(avgNonSignalPriceRange, 2), Color.MAGENTA);

