Remove TelemetryPublisherFirestore and refactor Telemetry module.

This commit is contained in:
Denis-Cosmin Nutiu 2021-10-03 23:37:53 +03:00
parent b52f292db1
commit df4dda2b7a
23 changed files with 123 additions and 498 deletions

View file

@ -1,4 +1,4 @@
using System.Collections.Generic; using Newtonsoft.Json.Linq;
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Sensors.Abstractions namespace NucuCar.Sensors.Abstractions
@ -13,7 +13,7 @@ namespace NucuCar.Sensors.Abstractions
{ {
protected bool TelemetryEnabled; protected bool TelemetryEnabled;
public abstract string GetIdentifier(); public abstract string GetIdentifier();
public abstract Dictionary<string, object> GetTelemetryJson(); public abstract JObject GetTelemetryJson();
public abstract bool IsTelemetryEnabled(); public abstract bool IsTelemetryEnabled();
} }
} }

View file

@ -5,6 +5,7 @@ using System.Threading.Tasks;
using Iot.Device.Bmxx80; using Iot.Device.Bmxx80;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using NucuCar.Sensors.Abstractions; using NucuCar.Sensors.Abstractions;
using UnitsNet; using UnitsNet;
using Bme680 = Iot.Device.Bmxx80.Bme680; using Bme680 = Iot.Device.Bmxx80.Bme680;
@ -153,14 +154,14 @@ namespace NucuCar.Sensors.Modules.BME680
return "Environment"; return "Environment";
} }
public override Dictionary<string, object> GetTelemetryJson() public override JObject GetTelemetryJson()
{ {
Dictionary<string, object> returnValue = null; JObject jsonObject = null;
if (_lastMeasurement != null && TelemetryEnabled) if (_lastMeasurement != null && TelemetryEnabled)
{ {
returnValue = new Dictionary<string, object> jsonObject = new JObject()
{ {
["sensor_state"] = CurrentState, ["sensor_state"] = GetState().ToString(),
["temperature"] = _lastMeasurement.Temperature, ["temperature"] = _lastMeasurement.Temperature,
["humidity"] = _lastMeasurement.Humidity, ["humidity"] = _lastMeasurement.Humidity,
["pressure"] = _lastMeasurement.Pressure, ["pressure"] = _lastMeasurement.Pressure,
@ -168,7 +169,7 @@ namespace NucuCar.Sensors.Modules.BME680
}; };
} }
return returnValue; return jsonObject;
} }
public override bool IsTelemetryEnabled() public override bool IsTelemetryEnabled()

View file

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using NucuCar.Sensors.Abstractions; using NucuCar.Sensors.Abstractions;
namespace NucuCar.Sensors.Modules.CpuTemperature namespace NucuCar.Sensors.Modules.CpuTemperature
@ -84,14 +85,14 @@ namespace NucuCar.Sensors.Modules.CpuTemperature
return "CpuTemperature"; return "CpuTemperature";
} }
public override Dictionary<string, object> GetTelemetryJson() public override JObject GetTelemetryJson()
{ {
Dictionary<string, object> returnValue = null; JObject returnValue = null;
if (!double.IsNaN(_lastTemperatureCelsius) && TelemetryEnabled) if (!double.IsNaN(_lastTemperatureCelsius) && TelemetryEnabled)
{ {
returnValue = new Dictionary<string, object> returnValue = new JObject
{ {
["sensor_state"] = CurrentState, ["sensor_state"] = GetState().ToString(),
["cpu_temperature"] = _lastTemperatureCelsius, ["cpu_temperature"] = _lastTemperatureCelsius,
}; };
} }

View file

@ -1,7 +1,9 @@
using System.Collections.Generic; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using NucuCar.Sensors.Abstractions; using NucuCar.Sensors.Abstractions;
namespace NucuCar.Sensors.Modules.Heartbeat namespace NucuCar.Sensors.Modules.Heartbeat
@ -59,15 +61,17 @@ namespace NucuCar.Sensors.Modules.Heartbeat
return "Heartbeat"; return "Heartbeat";
} }
public override Dictionary<string, object> GetTelemetryJson() public override JObject GetTelemetryJson()
{ {
var returnValue = new Dictionary<string, object> if (TelemetryEnabled)
{
return new JObject
{ {
["sensor_state"] = CurrentState, ["sensor_state"] = GetState().ToString(),
["value"] = 1, ["last_seen"] = DateTime.UtcNow,
}; };
}
return returnValue; return null;
} }
public override bool IsTelemetryEnabled() public override bool IsTelemetryEnabled()

View file

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using NucuCar.Sensors.Abstractions; using NucuCar.Sensors.Abstractions;
using PMS5003; using PMS5003;
using PMS5003.Exceptions; using PMS5003.Exceptions;
@ -124,15 +125,15 @@ namespace NucuCar.Sensors.Modules.PMS5003
return "Pms5003"; return "Pms5003";
} }
public override Dictionary<string, object> GetTelemetryJson() public override JObject GetTelemetryJson()
{ {
Dictionary<string, object> returnValue = null; JObject returnValue = null;
if (_pms5003Data != null && TelemetryEnabled) if (_pms5003Data != null && TelemetryEnabled)
{ {
// The telemetry handled by FirebaseRestTranslator wants the values to be int or double. // The telemetry handled by FirebaseRestTranslator wants the values to be int or double.
returnValue = new Dictionary<string, object> returnValue = new JObject()
{ {
["sensor_state"] = GetState(), ["sensor_state"] = GetState().ToString(),
["Pm1Atmospheric"] = _pms5003Data.Pm1Atmospheric, ["Pm1Atmospheric"] = _pms5003Data.Pm1Atmospheric,
["Pm1Standard"] = _pms5003Data.Pm1Standard, ["Pm1Standard"] = _pms5003Data.Pm1Standard,
["Pm10Atmospheric"] = _pms5003Data.Pm10Atmospheric, ["Pm10Atmospheric"] = _pms5003Data.Pm10Atmospheric,

View file

@ -7,6 +7,7 @@ using NucuCar.Sensors.Modules.Heartbeat;
using NucuCar.Sensors.Modules.PMS5003; using NucuCar.Sensors.Modules.PMS5003;
using NucuCar.Telemetry; using NucuCar.Telemetry;
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Abstractions;
using NucuCar.Telemetry.Publishers;
namespace NucuCar.Sensors namespace NucuCar.Sensors
{ {
@ -21,14 +22,14 @@ namespace NucuCar.Sensors
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) => .ConfigureServices((hostContext, services) =>
{ {
services.Configure<TelemetryConfig>(hostContext.Configuration.GetSection("Telemetry")); services.Configure<Config>(hostContext.Configuration.GetSection("Telemetry"));
services.Configure<Bme680Config>(hostContext.Configuration.GetSection("Bme680Sensor")); services.Configure<Bme680Config>(hostContext.Configuration.GetSection("Bme680Sensor"));
services.Configure<CpuTempConfig>(hostContext.Configuration.GetSection("CpuTemperatureSensor")); services.Configure<CpuTempConfig>(hostContext.Configuration.GetSection("CpuTemperatureSensor"));
services.Configure<HeartbeatConfig>(hostContext.Configuration.GetSection("HeartbeatSensor")); services.Configure<HeartbeatConfig>(hostContext.Configuration.GetSection("HeartbeatSensor"));
services.Configure<Pms5003Config>(hostContext.Configuration.GetSection("Pms5003Sensor")); services.Configure<Pms5003Config>(hostContext.Configuration.GetSection("Pms5003Sensor"));
// Singletons // Singletons
services.AddSingleton<ITelemetryPublisher, TelemetryPublisherProxy>(); services.AddSingleton<ITelemetryPublisher, PublisherProxy>();
services.AddSingleton<ISensor<Bme680Sensor>, Bme680Sensor>(); services.AddSingleton<ISensor<Bme680Sensor>, Bme680Sensor>();
services.AddSingleton<ISensor<CpuTempSensor>, CpuTempSensor>(); services.AddSingleton<ISensor<CpuTempSensor>, CpuTempSensor>();
services.AddSingleton<ISensor<HeartbeatSensor>, HeartbeatSensor>(); services.AddSingleton<ISensor<HeartbeatSensor>, HeartbeatSensor>();

View file

@ -14,7 +14,7 @@
"Telemetry": true "Telemetry": true
}, },
"HeartbeatSensor": { "HeartbeatSensor": {
"Enabled": false, "Enabled": true,
"Telemetry": true "Telemetry": true
}, },
"Pms5003Sensor": { "Pms5003Sensor": {

View file

@ -1,10 +1,11 @@
using System.Collections.Generic; using Newtonsoft.Json.Linq;
using NucuCar.Telemetry.Publishers;
namespace NucuCar.Telemetry.Abstractions namespace NucuCar.Telemetry.Abstractions
{ {
/// <summary> /// <summary>
/// Interface that specifies that the component implementing it is willing to provide telemetry data and can be /// Interface that specifies that the component implementing it is willing to provide telemetry data and can be
/// registered to a publisher such as <see cref="TelemetryPublisher"/>. /// registered to a publisher such as <see cref="BasePublisher"/>.
/// </summary> /// </summary>
public interface ITelemeter public interface ITelemeter
{ {
@ -14,14 +15,13 @@ namespace NucuCar.Telemetry.Abstractions
/// <returns>An identifier for the telemetry source.</returns> /// <returns>An identifier for the telemetry source.</returns>
string GetIdentifier(); string GetIdentifier();
// TODO: Perhaps here it's better if we return a string or a json object from Newtonsoft.
/// <summary> /// <summary>
/// This function should return a dictionary containing the telemetry data. /// This function should return a dictionary containing the telemetry data.
/// When implementing this function you should return null if the telemetry is disabled. /// When implementing this function you should return null if the telemetry is disabled.
/// See: <see cref="IsTelemetryEnabled"/> /// See: <see cref="IsTelemetryEnabled"/>
/// </summary> /// </summary>
/// <returns>The telemetry data. It should be JSON serializable.</returns> /// <returns>The telemetry data as a Newtonsoft JObject.</returns>
Dictionary<string, object> GetTelemetryJson(); JObject GetTelemetryJson();
/// <summary> /// <summary>
/// This function should return whether the sensor has telemetry enabled or not. /// This function should return whether the sensor has telemetry enabled or not.

View file

@ -1,14 +1,14 @@
// ReSharper disable UnusedAutoPropertyAccessor.Global // ReSharper disable UnusedAutoPropertyAccessor.Global
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Publishers;
namespace NucuCar.Telemetry namespace NucuCar.Telemetry
{ {
public class TelemetryConfig public class Config
{ {
/// <summary> /// <summary>
/// The Publisher is used by <see cref="TelemetryPublisherFactory"/> to instantiate /// The Publisher is used by <see cref="PublisherFactory"/> to instantiate
/// the correct <see cref="TelemetryPublisher"/>. For available types see <see cref="TelemetryPublisherType"/> /// the correct <see cref="BasePublisher"/>. For available types see <see cref="PublisherType"/>
/// </summary> /// </summary>
public string Publisher { get; set; } public string Publisher { get; set; }

View file

@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
// ReSharper disable UnusedAutoPropertyAccessor.Global
// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global
namespace NucuCar.Telemetry
{
/**
* <see cref="DataAggregate"/> is an entity holding the telemetry data body.
* It contains the telemetry data from all the telemeters.
*/
public class DataAggregate
{
public string Source { get; set; }
public DateTime Timestamp { get; set; }
public List<JObject> Data { get; set; }
public DataAggregate(string source, List<JObject> data)
{
Source = source;
Data = data;
Timestamp = DateTime.UtcNow;
}
}
}

View file

@ -3,55 +3,55 @@ using Microsoft.Extensions.Logging;
using NucuCar.Core.Utilities; using NucuCar.Core.Utilities;
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Abstractions;
using NucuCar.Telemetry.Publishers; using NucuCar.Telemetry.Publishers;
using Console = NucuCar.Telemetry.Publishers.Console;
namespace NucuCar.Telemetry namespace NucuCar.Telemetry
{ {
/// <summary> /// <summary>
/// The TelemetryPublisherFactory is used instantiate TelemetryPublishers. /// The PublisherFactory is used instantiate TelemetryPublishers.
/// </summary> /// </summary>
public static class TelemetryPublisherFactory public static class PublisherFactory
{ {
/// <summary> /// <summary>
/// Creates an instance of <see cref="TelemetryPublisher"/>. See <see cref="TelemetryPublisherType"/> /// Creates an instance of <see cref="BasePublisher"/>. See <see cref="PublisherType"/>
/// </summary> /// </summary>
/// <param name="type">The type of the publisher. <see cref="TelemetryPublisherType"/> </param> /// <param name="type">The type of the publisher. <see cref="PublisherType"/> </param>
/// <param name="connectionString">Device connection string for the telemetry publisher.</param> /// <param name="connectionString">Device connection string for the telemetry publisher.</param>
/// <param name="telemetrySource">String that is used to identify the source of the telemetry data.</param> /// <param name="telemetrySource">String that is used to identify the source of the telemetry data.</param>
/// <param name="logger">An <see cref="ILogger"/> logger instance. </param> /// <param name="logger">An <see cref="ILogger"/> logger instance. </param>
/// <returns>A <see cref="TelemetryPublisher"/> instance.</returns> /// <returns>A <see cref="BasePublisher"/> instance.</returns>
public static ITelemetryPublisher Create(string type, string connectionString, public static ITelemetryPublisher Create(string type, string connectionString,
string telemetrySource, ILogger logger) string telemetrySource, ILogger logger)
{ {
Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString); Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString);
Guard.ArgumentNotNullOrWhiteSpace(nameof(telemetrySource), telemetrySource); Guard.ArgumentNotNullOrWhiteSpace(nameof(telemetrySource), telemetrySource);
Guard.ArgumentNotNull(nameof(logger), logger); Guard.ArgumentNotNull(nameof(logger), logger);
var opts = new TelemetryPublisherOptions var opts = new PublisherOptions
{ConnectionString = connectionString, TelemetrySource = telemetrySource, Logger = logger}; {ConnectionString = connectionString, TelemetrySource = telemetrySource, Logger = logger};
return SpawnPublisher(type, opts); return SpawnPublisher(type, opts);
} }
/// <summary> /// <summary>
/// Creates an instance of <see cref="TelemetryPublisher"/>. /// Creates an instance of <see cref="BasePublisher"/>.
/// </summary> /// </summary>
/// <param name="type">The type of the publisher. See <see cref="TelemetryPublisherType"/> </param> /// <param name="type">The type of the publisher. See <see cref="PublisherType"/> </param>
/// <param name="connectionString">The device connection string for the selected publisher.</param> /// <param name="connectionString">The device connection string for the selected publisher.</param>
/// <returns>A <see cref="TelemetryPublisher"/> instance.</returns> /// <returns>A <see cref="BasePublisher"/> instance.</returns>
public static ITelemetryPublisher CreateFromConnectionString(string type, string connectionString) public static ITelemetryPublisher CreateFromConnectionString(string type, string connectionString)
{ {
Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString); Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString);
var opts = new TelemetryPublisherOptions() var opts = new PublisherOptions()
{ConnectionString = connectionString, TelemetrySource = "NucuCar.Sensors"}; {ConnectionString = connectionString, TelemetrySource = "NucuCar.Sensors"};
return SpawnPublisher(type, opts); return SpawnPublisher(type, opts);
} }
private static ITelemetryPublisher SpawnPublisher(string type, TelemetryPublisherOptions opts) private static ITelemetryPublisher SpawnPublisher(string type, PublisherOptions opts)
{ {
return type switch return type switch
{ {
TelemetryPublisherType.Azure => new TelemetryPublisherAzure(opts), PublisherType.Azure => new Azure(opts),
TelemetryPublisherType.Disk => new TelemetryPublisherDisk(opts), PublisherType.Disk => new Disk(opts),
TelemetryPublisherType.Firestore => new TelemetryPublisherFirestore(opts), PublisherType.Console => new Console(opts),
TelemetryPublisherType.Console => new TelemetryPublisherConsole(opts),
_ => throw new ArgumentException($"Invalid TelemetryPublisher type: {type}.") _ => throw new ArgumentException($"Invalid TelemetryPublisher type: {type}.")
}; };
} }

View file

@ -1,12 +1,12 @@
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Publishers;
namespace NucuCar.Telemetry namespace NucuCar.Telemetry
{ {
/// <summary> /// <summary>
/// This class contains options for the <see cref="TelemetryPublisher"/>. /// This class contains options for the <see cref="BasePublisher"/>.
/// </summary> /// </summary>
public class TelemetryPublisherOptions public class PublisherOptions
{ {
/// <summary> /// <summary>
/// The ConnectionString used by the publisher to connect to the cloud service. /// The ConnectionString used by the publisher to connect to the cloud service.

View file

@ -7,7 +7,7 @@ using NucuCar.Telemetry.Abstractions;
// ReSharper disable ClassWithVirtualMembersNeverInherited.Global // ReSharper disable ClassWithVirtualMembersNeverInherited.Global
namespace NucuCar.Telemetry namespace NucuCar.Telemetry
{ {
public class TelemetryPublisherProxy : ITelemetryPublisher public class PublisherProxy : ITelemetryPublisher
{ {
// TODO: Add support for chaining publishers. // TODO: Add support for chaining publishers.
private ITelemetryPublisher Publisher { get; } private ITelemetryPublisher Publisher { get; }
@ -16,15 +16,15 @@ namespace NucuCar.Telemetry
/// Class used together with the DI, holds a Publisher instance that's being create by options from /// Class used together with the DI, holds a Publisher instance that's being create by options from
/// TelemetryConfig. /// TelemetryConfig.
/// </summary> /// </summary>
public TelemetryPublisherProxy() public PublisherProxy()
{ {
} }
public TelemetryPublisherProxy(ILogger<TelemetryPublisherProxy> logger, IOptions<TelemetryConfig> options) public PublisherProxy(ILogger<PublisherProxy> logger, IOptions<Config> options)
{ {
if (options.Value.ServiceEnabled) if (options.Value.ServiceEnabled)
{ {
Publisher = TelemetryPublisherFactory.Create(options.Value.Publisher, options.Value.ConnectionString, Publisher = PublisherFactory.Create(options.Value.Publisher, options.Value.ConnectionString,
"NucuCar.Sensors", logger); "NucuCar.Sensors", logger);
} }
else else

View file

@ -1,11 +1,11 @@
using NucuCar.Telemetry.Abstractions; using NucuCar.Telemetry.Publishers;
namespace NucuCar.Telemetry namespace NucuCar.Telemetry
{ {
/// <summary> /// <summary>
/// TelemetryPublisherType holds constants for instantiating <see cref="TelemetryPublisher"/>, /// TelemetryPublisherType holds constants for instantiating <see cref="BasePublisher"/>,
/// </summary> /// </summary>
public static class TelemetryPublisherType public static class PublisherType
{ {
public const string Azure = "Azure"; public const string Azure = "Azure";
public const string Disk = "Disk"; public const string Disk = "Disk";

View file

@ -5,22 +5,21 @@ using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client; using Microsoft.Azure.Devices.Client;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Telemetry.Publishers namespace NucuCar.Telemetry.Publishers
{ {
/// <summary> /// <summary>
/// Constructs an instance of <see cref="TelemetryPublisherAzure"/>. It is used to publish telemetry to Microsoft /// Constructs an instance of <see cref="Azure"/>. It is used to publish telemetry to Microsoft
/// Azure IotHub /// Azure IotHub
/// <remarks> /// <remarks>
/// The connection string can be found in your Azure IotHub. /// The connection string can be found in your Azure IotHub.
/// </remarks> /// </remarks>
/// </summary> /// </summary>
public class TelemetryPublisherAzure : TelemetryPublisher public class Azure : BasePublisher
{ {
protected readonly DeviceClient DeviceClient; protected readonly DeviceClient DeviceClient;
public TelemetryPublisherAzure(TelemetryPublisherOptions opts) : base(opts) public Azure(PublisherOptions opts) : base(opts)
{ {
try try
{ {

View file

@ -3,13 +3,15 @@ using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Telemetry.Abstractions namespace NucuCar.Telemetry.Publishers
{ {
/// <summary> /// <summary>
/// The TelemetryPublisher is an abstract class, which provides a base for implementing telemetry publishers. /// The TelemetryPublisher is an abstract class, which provides a base for implementing telemetry publishers.
/// </summary> /// </summary>
public abstract class TelemetryPublisher : IDisposable, ITelemetryPublisher public abstract class BasePublisher : IDisposable, ITelemetryPublisher
{ {
/// <summary> /// <summary>
/// Raw connection string that is used to connect to the cloud service. Should be parsed if required. /// Raw connection string that is used to connect to the cloud service. Should be parsed if required.
@ -35,16 +37,16 @@ namespace NucuCar.Telemetry.Abstractions
/// <summary> /// <summary>
/// Parameter less constructor, mainly used for testing. /// Parameter less constructor, mainly used for testing.
/// </summary> /// </summary>
public TelemetryPublisher() public BasePublisher()
{ {
RegisteredTelemeters = new List<ITelemeter>(10); RegisteredTelemeters = new List<ITelemeter>(10);
} }
/// <summary> /// <summary>
/// Constructor for <see cref="TelemetryPublisher"/>. /// Constructor for <see cref="BasePublisher"/>.
/// </summary> /// </summary>
/// <param name="opts">TelemetryPublisher options, see: <see cref="TelemetryPublisherOptions"/></param> /// <param name="opts">TelemetryPublisher options, see: <see cref="PublisherOptions"/></param>
protected TelemetryPublisher(TelemetryPublisherOptions opts) protected BasePublisher(PublisherOptions opts)
{ {
ConnectionString = opts.ConnectionString; ConnectionString = opts.ConnectionString;
TelemetrySource = opts.TelemetrySource; TelemetrySource = opts.TelemetrySource;
@ -97,10 +99,11 @@ namespace NucuCar.Telemetry.Abstractions
/// Iterates through the registered telemeters and returns the telemetry data as dictionary. /// Iterates through the registered telemeters and returns the telemetry data as dictionary.
/// It also adds metadata information such as: source and timestamp. /// It also adds metadata information such as: source and timestamp.
/// </summary> /// </summary>
/// <returns>A dictionary containing all telemetry data.</returns> /// <returns>A dictionary containing all telemetry data. <see cref="DataAggregate"/></returns>
protected virtual Dictionary<string, object> GetTelemetry() protected virtual DataAggregate GetTelemetry()
{ {
var data = new List<Dictionary<string, object>>(); var source = TelemetrySource ?? nameof(BasePublisher);
var allTelemetryData = new List<JObject>();
foreach (var telemeter in RegisteredTelemeters) foreach (var telemeter in RegisteredTelemeters)
{ {
var telemetryData = telemeter.GetTelemetryJson(); var telemetryData = telemeter.GetTelemetryJson();
@ -110,17 +113,10 @@ namespace NucuCar.Telemetry.Abstractions
continue; continue;
} }
telemetryData["_id"] = telemeter.GetIdentifier(); telemetryData["sensor_name"] = telemeter.GetIdentifier();
data.Add(telemetryData); allTelemetryData.Add(telemetryData);
} }
return new DataAggregate(source, allTelemetryData);
var metadata = new Dictionary<string, object>
{
["source"] = TelemetrySource ?? nameof(TelemetryPublisher),
["timestamp"] = DateTime.UtcNow,
["data"] = data.ToArray()
};
return metadata;
} }
} }
} }

View file

@ -2,14 +2,13 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Telemetry.Publishers namespace NucuCar.Telemetry.Publishers
{ {
public class TelemetryPublisherConsole : TelemetryPublisher public class Console : BasePublisher
{ {
public TelemetryPublisherConsole(TelemetryPublisherOptions opts) : base(opts) public Console(PublisherOptions opts) : base(opts)
{ {
} }

View file

@ -7,20 +7,19 @@ using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Newtonsoft.Json; using Newtonsoft.Json;
using NucuCar.Core.Utilities; using NucuCar.Core.Utilities;
using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Telemetry.Publishers namespace NucuCar.Telemetry.Publishers
{ {
/// <summary> /// <summary>
/// The TelemetryPublisherDisk is used to publish telemetry data to a file on the disk. /// The TelemetryPublisherDisk is used to publish telemetry data to a file on the disk.
/// </summary> /// </summary>
public class TelemetryPublisherDisk : TelemetryPublisher public class Disk : BasePublisher
{ {
private readonly FileStream _fileStream; private readonly FileStream _fileStream;
private readonly string _separator; private readonly string _separator;
/// <summary> /// <summary>
/// Constructs an instance of <see cref="TelemetryPublisherDisk"/>. /// Constructs an instance of <see cref="Disk"/>.
/// <remarks> /// <remarks>
/// The connection string must contain the following options: /// The connection string must contain the following options:
/// Filename (optional) - The path of the filename in which to log telemetry data. /// Filename (optional) - The path of the filename in which to log telemetry data.
@ -30,7 +29,7 @@ namespace NucuCar.Telemetry.Publishers
/// </remarks> /// </remarks>
/// </summary> /// </summary>
/// <param name="opts"></param> /// <param name="opts"></param>
public TelemetryPublisherDisk(TelemetryPublisherOptions opts) : base(opts) public Disk(PublisherOptions opts) : base(opts)
{ {
var connectionStringParams = ConnectionStringParser.Parse(opts.ConnectionString); var connectionStringParams = ConnectionStringParser.Parse(opts.ConnectionString);
var fileName = connectionStringParams.GetValueOrDefault("FileName", "telemetry"); var fileName = connectionStringParams.GetValueOrDefault("FileName", "telemetry");

View file

@ -1,183 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NucuCar.Core.Http;
using NucuCar.Core.Utilities;
using NucuCar.Telemetry.Abstractions;
namespace NucuCar.Telemetry.Publishers
{
/// <summary>
/// This class is used to publish the telemetry data to Google's Cloud Firestore.
/// Requires the environment variable: GOOGLE_APPLICATION_CREDENTIALS to be set.
/// See: https://cloud.google.com/docs/authentication/getting-started
/// or Firebase > Project Settings > Service Accounts (Authentication is not implemented!)
/// <remarks>
/// The connection string has the following parameters:
/// ProjectId (required) — The string for the Firestore project id.
/// CollectionName (required) — The string for the Firestore collection name.
/// WebApiKey (optional) — The web api key of the firebase project.
/// WebApiEmail (optional) — An email to use when requesting id tokens.
/// WebApiPassword (optional) — The password to use when requesting id tokens.
/// Timeout (optional) — The number in milliseconds in which to timeout if publishing fails. Default: 10000
/// </remarks>
/// </summary>
public class TelemetryPublisherFirestore : TelemetryPublisher
{
protected MinimalHttpClient HttpClient;
private string _idToken;
private DateTime _authorizationExpiryTime;
// Variables used for authentication
private readonly string _webEmail;
private readonly string _webPassword;
private readonly string _webApiKey;
public TelemetryPublisherFirestore(TelemetryPublisherOptions opts) : base(opts)
{
// Parse Options
var options = ConnectionStringParser.Parse(opts.ConnectionString);
if (!options.TryGetValue("ProjectId", out var firestoreProjectId))
{
Logger?.LogCritical(
"Can't start {Name}! Malformed connection string! Missing ProjectId!",
nameof(TelemetryPublisherFirestore));
throw new ArgumentException("Malformed connection string!");
}
if (!options.TryGetValue("CollectionName", out var firestoreCollection))
{
Logger?.LogCritical(
"Can't start {Name}! Malformed connection string! Missing CollectionName!",
nameof(TelemetryPublisherFirestore));
throw new ArgumentException("Malformed connection string!");
}
var timeout = int.Parse(options.GetValueOrDefault("Timeout", "10000") ?? "10000");
_webApiKey = options.GetValueOrDefault("WebApiKey", null);
_webEmail = options.GetValueOrDefault("WebApiEmail", null);
_webPassword = options.GetValueOrDefault("WebApiPassword", null);
// Setup HttpClient
var requestUrl = $"https://firestore.googleapis.com/v1/projects/{firestoreProjectId}/" +
$"databases/(default)/documents/{firestoreCollection}/";
HttpClient = new MinimalHttpClient(requestUrl) {Timeout = timeout, Logger = Logger};
Logger?.LogInformation("Initialized {Name}", nameof(TelemetryPublisherFirestore));
Logger?.LogInformation("ProjectId: {FirestoreProjectId}; CollectionName: {FirestoreCollection}",
firestoreProjectId, firestoreCollection);
}
private async Task SetupAuthorization()
{
HttpClient.ClearAuthorizationHeader();
// https://cloud.google.com/identity-platform/docs/use-rest-api#section-sign-in-email-password
var requestUrl = $"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={_webApiKey}";
var data = new Dictionary<string, object>()
{
["email"] = _webEmail,
["password"] = _webPassword,
["returnSecureToken"] = true
};
var response = await HttpClient.PostAsync(requestUrl, data);
if (response?.StatusCode == HttpStatusCode.OK)
{
Logger?.LogInformation("Firestore authentication OK!");
var jsonContent = await response.GetJson();
_idToken = jsonContent.GetProperty("idToken").ToString();
// Setup next expire.
var expiresIn = double.Parse(jsonContent.GetProperty("expiresIn").ToString());
_authorizationExpiryTime = DateTime.UtcNow.AddSeconds(expiresIn);
HttpClient.Authorization(_idToken);
}
else
{
Logger?.LogError("Firestore authentication request failed! {StatusCode}!", response?.StatusCode);
if (response != null)
{
var contentBody = await response.Content.ReadAsStringAsync();
Logger?.LogDebug("{Body}", contentBody);
}
}
}
private async Task CheckAndSetupAuthorization()
{
// If there are no credentials or partial credentials supplies there must be no authorization.
if (_webApiKey == null || _webEmail == null || _webPassword == null)
{
return;
}
// Check if the token is about to expire in the next 15 minutes.
if (DateTime.UtcNow.AddMinutes(15) < _authorizationExpiryTime)
{
return;
}
await SetupAuthorization();
}
public override async Task PublishAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
var data = FirebaseRestTranslator.Translator.Translate(null, GetTelemetry());
HttpResponseMessage responseMessage = null;
try
{
await CheckAndSetupAuthorization();
responseMessage = await HttpClient.PostAsync("", data);
}
// ArgumentException occurs during json serialization errors.
catch (ArgumentException e)
{
Logger?.LogWarning("{Message}", e.Message);
}
switch (responseMessage?.StatusCode)
{
case HttpStatusCode.OK:
Logger?.LogInformation("Published data to Firestore!");
break;
case HttpStatusCode.Forbidden:
case HttpStatusCode.Unauthorized:
{
Logger?.LogError("Failed to publish telemetry data! {StatusCode}. Retrying...",
responseMessage.StatusCode);
await SetupAuthorization();
responseMessage = await HttpClient.PostAsync("", data);
if (responseMessage != null && responseMessage.IsSuccessStatusCode)
{
Logger?.LogInformation("Published data to Firestore on retry!");
}
else
{
Logger?.LogError("Failed to publish telemetry data! {StatusCode}", responseMessage?.StatusCode);
}
break;
}
default:
Logger?.LogError("Failed to publish telemetry data! {StatusCode}", responseMessage?.StatusCode);
break;
}
}
public override void Dispose()
{
}
}
}

View file

@ -69,21 +69,3 @@ See the source code for comments on the ConnectionString.
You will need to parse the file by yourself. You will need to parse the file by yourself.
--- ---
## Firebase Firestore Database
### Publisher
Publishes telemetry on the firestore.
The `Telemetry:Publisher` must be set to: Firestore
Example connection string:
`ProjectId=nucuhub;CollectionName=sensors-telemetry-test;Timeout=1000`
If you want to use Authentication you can do so by providing the following keys
in the connection string: WebApiEmail, WebApiPassword, WebApiKey.
### Reader
You will need use a firebase client or rest API.

View file

@ -18,7 +18,7 @@ namespace NucuCar.Telemetry
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly ITelemetryPublisher _telemetryPublisher; private readonly ITelemetryPublisher _telemetryPublisher;
public TelemetryWorker(ILogger<TelemetryWorker> logger, IOptions<TelemetryConfig> options, public TelemetryWorker(ILogger<TelemetryWorker> logger, IOptions<Config> options,
ITelemetryPublisher telemetryPublisherProxy) ITelemetryPublisher telemetryPublisherProxy)
{ {
_logger = logger; _logger = logger;

View file

@ -13,8 +13,8 @@ namespace NucuCar.UnitTests.NucuCar.Telemetry
const string connectionString = const string connectionString =
"HostName=something.azure-devices.net;DeviceId=something;SharedAccessKey=test"; "HostName=something.azure-devices.net;DeviceId=something;SharedAccessKey=test";
var telemetryPublisher = var telemetryPublisher =
TelemetryPublisherFactory.CreateFromConnectionString(TelemetryPublisherType.Azure, connectionString); PublisherFactory.CreateFromConnectionString(PublisherType.Azure, connectionString);
Assert.IsType<TelemetryPublisherAzure>(telemetryPublisher); Assert.IsType<Azure>(telemetryPublisher);
} }
[Fact] [Fact]
@ -23,18 +23,8 @@ namespace NucuCar.UnitTests.NucuCar.Telemetry
const string connectionString = const string connectionString =
"Filename=test;BufferSize=4096"; "Filename=test;BufferSize=4096";
var telemetryPublisher = var telemetryPublisher =
TelemetryPublisherFactory.CreateFromConnectionString(TelemetryPublisherType.Disk, connectionString); PublisherFactory.CreateFromConnectionString(PublisherType.Disk, connectionString);
Assert.IsType<TelemetryPublisherDisk>(telemetryPublisher); Assert.IsType<Disk>(telemetryPublisher);
}
[Fact]
private void Test_Build_TelemetryPublisherFiresstore()
{
const string connectionString =
"ProjectId=test;CollectionName=test";
var telemetryPublisher =
TelemetryPublisherFactory.CreateFromConnectionString(TelemetryPublisherType.Firestore, connectionString);
Assert.IsType<TelemetryPublisherFirestore>(telemetryPublisher);
} }
[Fact] [Fact]
@ -42,7 +32,7 @@ namespace NucuCar.UnitTests.NucuCar.Telemetry
{ {
Assert.Throws<ArgumentException>(() => Assert.Throws<ArgumentException>(() =>
{ {
TelemetryPublisherFactory.CreateFromConnectionString("_1", "a=b"); PublisherFactory.CreateFromConnectionString("_1", "a=b");
}); });
} }
} }

View file

@ -1,192 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using NucuCar.Core.Http;
using NucuCar.Telemetry;
using NucuCar.Telemetry.Publishers;
using Xunit;
namespace NucuCar.UnitTests.NucuCar.Telemetry
{
/// <summary>
/// Class used to test the TelemetryPublisherFirestore by mocking the GetTelemetry method and HttpClient field.
/// </summary>
internal class MockTelemetryPublisherFirestore : TelemetryPublisherFirestore
{
private Dictionary<string, object> _mockData;
public MockTelemetryPublisherFirestore(TelemetryPublisherOptions opts) : base(opts)
{
_mockData = new Dictionary<string, object>();
}
public void SetHttpClient(MinimalHttpClient client)
{
HttpClient = client;
}
public void SetMockData(Dictionary<string, object> data)
{
_mockData = data;
}
protected override Dictionary<string, object> GetTelemetry()
{
return _mockData;
}
}
public class TelemetryPublisherFirestoreTest
{
[Fact]
private void Test_Construct_BadProjectId()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString = "ProjectIdBAD=test;CollectionName=test"
};
// Run & Assert
Assert.Throws<ArgumentException>(() => { new MockTelemetryPublisherFirestore(opts); });
}
[Fact]
private void Test_Construct_BadCollectionName()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString = "ProjectId=test;CollectionNameBAD=test"
};
// Run & Assert
Assert.Throws<ArgumentException>(() => { new MockTelemetryPublisherFirestore(opts); });
}
[Fact]
private async Task Test_PublishAsync_OK()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString = "ProjectId=test;CollectionName=test;WebApiKey=TAPIKEY;WebApiEmail=t@emai.com;WebApiPassword=tpass"
};
var publisher = new MockTelemetryPublisherFirestore(opts);
var mockHttpClient = new MockMinimalHttpClient("http://testing.com");
var authResponse = new HttpResponseMessage(HttpStatusCode.OK)
{Content = new StringContent("{\"idToken\": \"1\",\"expiresIn\": \"3600\"}")};
mockHttpClient.SendAsyncResponses.Add(authResponse);
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK));
publisher.SetHttpClient(mockHttpClient);
publisher.SetMockData(new Dictionary<string, object> {["testData"] = 1});
// Run
await publisher.PublishAsync(CancellationToken.None);
// Assert
var request = mockHttpClient.SendAsyncArgCalls[1];
Assert.Equal(HttpMethod.Post, request.Method);
Assert.Equal(new Uri("http://testing.com"), request.RequestUri);
Assert.Equal("{\"fields\":{\"testData\":{\"integerValue\":1}}}",
request.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
[Fact]
private async Task Test_PublishAsync_InvalidJson()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString = "ProjectId=test;CollectionName=test;WebApiKey=TAPIKEY;WebApiEmail=t@emai.com;WebApiPassword=tpass"
};
var publisher = new MockTelemetryPublisherFirestore(opts);
var mockHttpClient = new MockMinimalHttpClient("http://testing.com");
var authResponse = new HttpResponseMessage(HttpStatusCode.OK)
{Content = new StringContent("{\"idToken\": \"1\",\"expiresIn\": \"3600\"}")};
mockHttpClient.SendAsyncResponses.Add(authResponse);
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK));
publisher.SetHttpClient(mockHttpClient);
publisher.SetMockData(new Dictionary<string, object> {["testData"] = double.PositiveInfinity});
// Run
await publisher.PublishAsync(CancellationToken.None);
// Assert only auth request made.
Assert.Single(mockHttpClient.SendAsyncArgCalls);
}
[Fact]
private async Task Test_PublishAsync_Cancel()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString = "ProjectId=test;CollectionName=test"
};
var publisher = new MockTelemetryPublisherFirestore(opts);
var mockHttpClient = new MockMinimalHttpClient("http://testing.com");
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK));
publisher.SetHttpClient(mockHttpClient);
publisher.SetMockData(new Dictionary<string, object> {["testData"] = 1});
var cts = new CancellationTokenSource();
cts.Cancel();
// Run
await publisher.PublishAsync(cts.Token);
// Assert
Assert.Empty(mockHttpClient.SendAsyncArgCalls);
}
[Fact]
private async Task Test_PublishAsync_Authorization_Refresh()
{
// Setup
var opts = new TelemetryPublisherOptions()
{
ConnectionString =
"ProjectId=test;CollectionName=test;WebApiKey=TAPIKEY;WebApiEmail=t@emai.com;WebApiPassword=tpass"
};
var publisher = new MockTelemetryPublisherFirestore(opts);
var mockHttpClient = new MockMinimalHttpClient("http://testing.com");
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK)
{Content = new StringContent("{\"idToken\": \"1\",\"expiresIn\": \"0\"}")});
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK));
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK)
{Content = new StringContent("{\"idToken\": \"1\",\"expiresIn\": \"3600\"}")});
mockHttpClient.SendAsyncResponses.Add(new HttpResponseMessage(HttpStatusCode.OK));
publisher.SetHttpClient(mockHttpClient);
publisher.SetMockData(new Dictionary<string, object> {["testData"] = 1});
// Run
await publisher.PublishAsync(CancellationToken.None);
await publisher.PublishAsync(CancellationToken.None);
// Assert
Assert.Equal(4, mockHttpClient.SendAsyncArgCalls.Count);
// 1st request auth
Assert.Equal(HttpMethod.Post, mockHttpClient.SendAsyncArgCalls[0].Method);
Assert.Equal("https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key=TAPIKEY",
mockHttpClient.SendAsyncArgCalls[0].RequestUri.ToString());
// 2st request payload
Assert.Equal(HttpMethod.Post, mockHttpClient.SendAsyncArgCalls[1].Method);
// 3rd request auth
Assert.Equal(HttpMethod.Post, mockHttpClient.SendAsyncArgCalls[2].Method);
Assert.Equal("https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key=TAPIKEY",
mockHttpClient.SendAsyncArgCalls[2].RequestUri.ToString());
// 4th request payload
Assert.Equal(HttpMethod.Post, mockHttpClient.SendAsyncArgCalls[1].Method);
}
}
}