Multivariate_Anomoly_Series6
# ─────────────────────────────────────────────
# │ MarketFragments.com | DNA & Market │
# │ info@marketfragments.com │
# │ www.marketfragments.com │
# ──────────────────────────────────────────────
# Time →
# │
# █ █ █│ █
# █ █ █ │ █ █
# █ █ █ │ █ █ █ ╭─╮
# █ █ █ │ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ ╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ █ █╭─╯ ╰─╮
# █ █ █ │ █ █ █ █ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ █ ╰─╮ ╭─╯
# █ █ █ │ █ ╰─────╯
# ──────┴──────────────────────────────────────────────────────────────
# T1 T2 T3 T4 T5 T6
#
# Name: multivariate_anomoly_series6 Anomaly Detection
script erf {
input z = 0;
def t = 1.0 / (1.0 + 0.5 * AbsValue(z));
# use Horner's method
def ans = 1 - t * Exp( -z * z - 1.26551223 +
t * ( 1.00002368 +
t * ( 0.37409196 +
t * ( 0.09678418 +
t * (-0.18628806 +
t * ( 0.27886807 +
t * (-1.13520398 +
t * ( 1.48851587 +
t * (-0.82215223 +
t * ( 0.17087277))))))))));
plot return = if z >= 0.0 then ans else -ans;
}
#Inputs
def O = open;
def H = high;
def C = close;
def L = low;
def V = volume;
def tcnt = tick_count ;
def tickS = If(!IsNaN(TickSize()), TickSize(), .01);
def bn = BarNumber();
def nan = Double.NaN;
def range = H - L;
# pivot logic. signal proximity to pivot
input Nx = 10;
def plow = l == GetValue(l, GetMinValueOffset(l, Nx), Nx);
def phigh = h == GetValue(h, GetMaxValueOffset(h, Nx), Nx);
def plowcnt = if plow then 1 else plowcnt[1]+1;
def phighcnt = if phigh then 1 else phighcnt[1]+1;
def plowl = if plow then l else plowl[1];
def plowh = if plow then h else plowh[1];
def phighh = if phigh then h else phighh[1];
def phighl = if phigh then l else phighl[1];
def islowr = if plowcnt<=4 && (between(hl2,plowl,plowh) or between(l,plowl,plowh) or between(h,plowl,plowh)) then 1 else 0;
def ishighr = if phighcnt<=4 && (between(hl2,phighl,phighh) or between(l,phighl,phighh) or between(h,phighl,phighh)) then 1 else 0;
def TrRange = (V / Max(TrueRange(H, C, L), tickS));
def TickTrRange = (V / tcnt / Max(TrueRange(H, C, L), tickS));
def isVolume= V;
def Shares= (V / tcnt);
def buyingx1 = erf( TrRange * (C - L) / (H - L));
def buyinga1 = if range == 0 && range[1]!=0 then buyingx1[1] else buyinga1[1];
def buying1 = if range == 0 then buyinga1 else buyingx1;
def sellingx1 = erf(TrRange * (H - C) / (H - L));
def sellinga1 = if range == 0 && range[1]!=0 then sellingx1[1] else sellinga1[1];
def selling1 = if range == 0 then sellinga1 else sellingx1 ;
def buyingx2 = erf( TickTrRange * (C - L) / (H - L));
def buyinga2 = if range == 0 && range[1]!=0 then buyingx2[1] else buyinga2[1];
def buying2 = if range == 0 then buyinga2 else buyingx2;
def sellingx2 = erf(TickTrRange * (H - C) / (H - L));
def sellinga2 = if range == 0 && range[1]!=0 then sellingx2[1] else sellinga2[1];
def selling2 = if range == 0 then sellinga2 else sellingx2 ;
def buyingx3 = erf( isVolume * (C - L) / (H - L));
def buyinga3 = if range == 0 && range[1]!=0 then buyingx3[1] else buyinga3[1];
def buying3 = if range == 0 then buyinga3 else buyingx3;
def sellingx3 = erf(isVolume * (H - C) / (H - L));
def sellinga3 = if range == 0 && range[1]!=0 then sellingx3[1] else sellinga3[1];
def selling3 = if range == 0 then sellinga3 else sellingx3 ;
# Calculate multivariate Gaussian distribution parameters
input length = 500; # You can adjust this value for the desired window size
def buyingMean1 = Average( buying1 , length);
def sellingMean1 = Average(selling1 , length);
def buyingStDev1 = StDev( buying1 , length);
def sellingStDev1 = StDev(selling1 , length);
def buyingMean2 = Average( buying2 , length);
def sellingMean2 = Average(selling2 , length);
def buyingStDev2 = StDev( buying2 , length);
def sellingStDev2 = StDev(selling2 , length);
def buyingMean3= Average( buying3 , length);
def sellingMean3 = Average(selling3 , length);
def buyingStDev3 = StDev( buying3 , length);
def sellingStDev3 = StDev(selling3 , length);
def series1 =buying1;
def series2 =selling1;
def series3 =buying2;
def series4 =selling2;
def series5 =buying3;
def series6 =selling3;
# complete
# CovarianceMatrix Subscript
# Calculate the covariance matrix for two input series
script CovarianceMatrix {
input series1 = 0;
input series2 = 1;
input series3 = 2;
input series4 = 3;
input series5 = 4;
input series6 = 5;
input length = 10;
def cov = fold i = 0 to length - 1 with sum do sum +
(
(series1[i] - average(series1, length)) * (series2[i] - average(series2, length)) +
(series3[i] - average(series3, length)) * (series4[i] - average(series4, length)) +
(series5[i] - average(series5, length)) * (series6[i] - average(series6, length))
);
def varianceX_ = fold i2 = 0 to length - 1 with sumX do sumX +
(
Sqr(series1[i2] - average(series1, length)) +
Sqr(series3[i2] - average(series3, length)) +
Sqr(series5[i2] - average(series5, length))
);
def varianceY_ = fold i3 = 0 to length - 1 with sumY do sumY +
(
Sqr(series2[i3] - average(series2, length)) +
Sqr(series4[i3] - average(series4, length)) +
Sqr(series6[i3] - average(series6, length))
);
def correlation_ = cov / (Sqrt(varianceX_) * Sqrt(varianceY_));
plot Covariance = cov / length;
def VarianceX = varianceX_ / length;
def VarianceY = varianceY_ / length;
def Correlation = correlation_;
}
# Calculate covariance matrix for the 'buying' and 'selling' series
def covMatrix = if !IsNaN(buying1) && !IsNaN(selling1) &&
!IsNaN(buying2) && !IsNaN(selling2) &&
!IsNaN(buying3) && !IsNaN(selling3) &&
!IsNaN(buyingMean1) && !IsNaN(sellingMean1) &&
!IsNaN(buyingMean2) && !IsNaN(sellingMean2) &&
!IsNaN(buyingMean3) && !IsNaN(sellingMean3) &&
!IsNaN(buyingStDev1) && !IsNaN(sellingStDev1) &&
!IsNaN(buyingStDev2) && !IsNaN(sellingStDev2) &&
!IsNaN(buyingStDev3) && !IsNaN(sellingStDev3)
then CovarianceMatrix(buying1, selling1, buying2, selling2, buying3, selling3, length)
else Double.NaN;
# Access the covariance matrix values
def cov1 = CovarianceMatrix(series1, series2, length).Covariance;
def cov2 = CovarianceMatrix(series3, series4, length).Covariance;
def cov3 = CovarianceMatrix(series5, series6, length).Covariance;
def varianceX1_ = CovarianceMatrix(series1, series2, length).VarianceX;
def varianceY1_ = CovarianceMatrix(series1, series2, length).VarianceY;
def correlation1_ = CovarianceMatrix(series1, series2, length).Correlation;
def varianceX2_ = CovarianceMatrix(series3, series4, length).VarianceX;
def varianceY2_ = CovarianceMatrix(series3, series4, length).VarianceY;
def correlation2_ = CovarianceMatrix(series3, series4, length).Correlation;
def varianceX3_ = CovarianceMatrix(series5, series6, length).VarianceX;
def varianceY3_ = CovarianceMatrix(series5, series6, length).VarianceY;
def correlation3_ = CovarianceMatrix(series5, series6, length).Correlation;
# Regularization factor
def lambda = 0.1; # You can adjust this value for regularization
# Covariance matrix regularization
def covRegularized1 = if !IsNaN(cov1) && !IsNaN(varianceX1_) && !IsNaN(varianceY1_) then cov1 + lambda * varianceX1_ * varianceY1_ else covRegularized1[1];
def covRegularized2 = if !IsNaN(cov2) && !IsNaN(varianceX2_) && !IsNaN(varianceY2_) then cov2 + lambda * varianceX2_ * varianceY2_ else covRegularized2[1];
def covRegularized3 = if !IsNaN(cov3) && !IsNaN(varianceX3_) && !IsNaN(varianceY3_) then cov3 + lambda * varianceX3_ * varianceY3_ else covRegularized3[1];
# Define the multivariate Gaussian probability function with covariance matrix regularization
script MultivariateGaussianProbability {
Input data1 = 0;
Input data2 = 0;
Input data3 = 0;
Input data4 = 0;
Input data5 = 0;
Input data6 = 0;
Input mean1 = 0;
Input mean2 = 0;
Input mean3 = 0;
Input mean4 = 0;
Input mean5 = 0;
Input mean6 = 0;
Input stdev1 = 0;
Input stdev2 = 0;
Input stdev3 = 0;
Input stdev4 = 0;
Input stdev5= 0;
Input stdev6 = 0;
Input covariance1 = 0;
Input covariance2 = 0;
Input covariance3 = 0;
input CorrelationThreshold = 0.5;
def correlation1 = covariance1 / (stdev1 * stdev2);
def correlation2 = covariance2 / (stdev3 * stdev4);
def correlation3 = covariance3 / (stdev5 * stdev6);
def exponent = -0.5 * (
Sqr((data1 - mean1) / stdev1) +
Sqr((data2 - mean2) / stdev2) +
Sqr((data3 - mean3) / stdev3) +
Sqr((data4 - mean4) / stdev4) +
Sqr((data5 - mean5) / stdev5) +
Sqr((data6 - mean6) / stdev6)
-
2 * correlation1 * (data1 - mean1) * (data2 - mean2) / (stdev1 * stdev2) +
2 * correlation2 * (data3 - mean3) * (data4 - mean4) / (stdev3 * stdev4) +
2 * correlation3 * (data5 - mean5) * (data6 - mean6) / (stdev5 * stdev6)
);
def normalization = 1 / (2 * Double.Pi * stdev1 * stdev2 * stdev3 * stdev4 * stdev5 * stdev6 *
Sqrt(1 - Sqr(correlation1)) * Sqrt(1 - Sqr(correlation2)) * Sqrt(1 - Sqr(correlation3)));
plot return = normalization * Exp(exponent);
}
# Calculate the probability of each dataA point being within the multivariate Gaussian distribution with covariance matrix regularization
def probabilityRegularized = MultivariateGaussianProbability(
buying1, selling1, buying2, selling2, buying3, selling3,
buyingMean1, sellingMean1, buyingMean2, sellingMean2, buyingMean3, sellingMean3,
buyingStDev1, sellingStDev1, buyingStDev2, sellingStDev2, buyingStDev3, sellingStDev3,
covRegularized1, covRegularized2, covRegularized3, .5
);
# Define threshold for anomaly detection (you can adjust this value)
def anomalyThreshold = 0.03;
# Identify anomalies using probability
def isAnomaly = probabilityRegularized < anomalyThreshold;
# Plot probability
plot ProbabilityPlot = probabilityRegularized;
ProbabilityPlot.AssignvalueColor(
if isAnomaly && (plow or islowr) then Color.green else
if isAnomaly && (phigh or ishighr)then Color.red else
if isAnomaly then Color.YELLOW
else Color.gray);
ProbabilityPlot.SetLineWeight(2);
# Highlight bars with anomalies
#AssignBackgroundColor(if isAnomaly then Color.YELLOW else Color.CURRENT);
addLabel(1, "length:"+length,color.white);
# Cross-validation
def trainSize = 0.8; # You can adjust the training set size here (0.8 = 80% training, 20% testing)
def totalBars =barNumber();
def trainBars = Round(totalBars * trainSize);
def testBars = totalBars - trainBars;
# Plot performance labels
addLabel(1, "Train Bars: " + trainBars + " | Test Bars: " + testBars, Color.LIGHT_GRAY);
# Evaluate performance on the testing set using cross-validation
def cvBuy = buying1;
def cvSell = selling1;
def cvProb = MultivariateGaussianProbability(cvBuy, cvSell, buyingMean1, sellingMean1, Sqrt(covRegularized1), Sqrt(covRegularized1), .5);
def cvAnomaly = if cvProb <.06 then 1 else 0;
def cvAccuracy = totalSum(cvAnomaly) / testBars;
addLabel(1, "Cross-validation Accuracy: " + Round(cvAccuracy* 100, 2) + "%", Color.LIGHT_GRAY);

