C
C#2mo ago
malkav

Kusto Ingestion queue

Hey all, wondering if anyone knows this: I'm trying to queue a list of models into a kusto cluster (ADX) and I figured I could basically turn each of these data models into a string and queue from MemoryStream, though I do realize that might not be optimal for the system. Adding to queue from a local database might be better but that creates other latency issues that I wish to avoid. So for now the memorystream is my to-go option. However... I would need to know if I can queue all items in one go? Like for example:
foreach (T model in models) {
stringStream.Append($"[{model.ToString}]");
}
foreach (T model in models) {
stringStream.Append($"[{model.ToString}]");
}
Where I have overwrite ToString() on the models that look like:
public override ToString() {
return $"{prop1},{prop2},...";
}
public override ToString() {
return $"{prop1},{prop2},...";
}
Is this the right way to do this? or should I queue all items individually?
1 Reply
malkav
malkav2mo ago
To note, this is my current Create method (called Queue)
public async Task<string> Queue<T>(IEnumerable<T> models)
{
using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database, $"{_table} | count", null))
{
Console.WriteLine("\nNumber of rows in " + _table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}

foreach (T model in models)
{
Console.WriteLine("\nIngesting data from stream of models");
MemoryStream stream = new(System.Text.Encoding.UTF8.GetBytes(model.ToString()));
_ = await _kustoIngestClient
.IngestFromStreamAsync(stream, _ingestProps, new StreamSourceOptions { Size = stream.Length });

Console.WriteLine("\nWaiting 30 seconds for ingestion to complete");
Thread.Sleep(TimeSpan.FromSeconds(30));

using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database, $"{_table} | count", null))
{
Console.WriteLine("\nNumber of rows in " + _table + " AFTER ingestion:");
PrintResultsAsValueList(response);
}
}

using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database,
$"{_table} | top {models.Count()} by ingestion_time()", null))
{
Console.WriteLine($"\nLast {models.Count()} ingested rows:");
PrintResultsAsValueList(response);
}

return "";
}
public async Task<string> Queue<T>(IEnumerable<T> models)
{
using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database, $"{_table} | count", null))
{
Console.WriteLine("\nNumber of rows in " + _table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}

foreach (T model in models)
{
Console.WriteLine("\nIngesting data from stream of models");
MemoryStream stream = new(System.Text.Encoding.UTF8.GetBytes(model.ToString()));
_ = await _kustoIngestClient
.IngestFromStreamAsync(stream, _ingestProps, new StreamSourceOptions { Size = stream.Length });

Console.WriteLine("\nWaiting 30 seconds for ingestion to complete");
Thread.Sleep(TimeSpan.FromSeconds(30));

using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database, $"{_table} | count", null))
{
Console.WriteLine("\nNumber of rows in " + _table + " AFTER ingestion:");
PrintResultsAsValueList(response);
}
}

using (IDataReader? response = await _kustoClient.ExecuteQueryAsync(_database,
$"{_table} | top {models.Count()} by ingestion_time()", null))
{
Console.WriteLine($"\nLast {models.Count()} ingested rows:");
PrintResultsAsValueList(response);
}

return "";
}
and what's more, does the MemoryStream dispose of the used memory after usage in the query? or do I have to dispose of that memory myself still?