Skip to content

Commit 6def7e9

Browse files
committed
Commit benchmark code
1 parent 73d07a2 commit 6def7e9

File tree

1 file changed

+206
-35
lines changed

1 file changed

+206
-35
lines changed

src/Playground/Program.cs

Lines changed: 206 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,219 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information.
44

5-
using System.Diagnostics.CodeAnalysis;
5+
using System.Diagnostics;
6+
using System.Text.Json;
7+
using System.Text.Json.Serialization;
68

79
using Elastic.Clients.Elasticsearch;
10+
using Elastic.Clients.Elasticsearch.Mapping;
811
using Elastic.Clients.Elasticsearch.Serialization;
912
using Elastic.Transport;
10-
using Elastic.Transport.Extensions;
11-
12-
using Playground;
13-
14-
var pool = new SingleNodePool(new Uri("https://primary.es.europe-west3.gcp.cloud.es.io"));
15-
var settings = new ElasticsearchClientSettings(pool,
16-
sourceSerializer: (_, settings) =>
17-
new DefaultSourceSerializer(settings, PlaygroundJsonSerializerContext.Default)
18-
)
19-
.Authentication(new BasicAuthentication("elastic", "Oov35Wtxj5DzpZNzYAzFb0KZ"))
20-
.DisableDirectStreaming()
21-
.EnableDebugMode(cd =>
22-
{
23-
//var request = System.Text.Encoding.Default.GetString(cd.RequestBodyInBytes);
24-
Console.WriteLine(cd.DebugInformation);
25-
});
2613

27-
var client = new ElasticsearchClient(settings);
14+
var baseOptions = new VectorIngestOptions
15+
{
16+
DatasetFile = "C:\\Users\\Florian\\Desktop\\bench\\open_ai_corpus-initial-indexing-1k.json",
17+
Repetitions = 1,
18+
MaxDegreeOfParallelism = 1,
19+
WarmupPasses = 5,
20+
MeasuredPasses = 3,
21+
ElasticsearchEndpoint = new Uri("http://192.168.100.87:9200"),
22+
ElasticsearchUsername = "elastic",
23+
ElasticsearchPassword = "julCIvcZ"
24+
};
2825

29-
var person = new Person
26+
var cases = new VectorIngestOptions[]
3027
{
31-
FirstName = "Steve",
32-
LastName = "Jobs",
33-
Age = 35,
34-
IsDeleted = false,
35-
Routing = "1234567890",
36-
Id = 1,
37-
Enum = DateTimeKind.Utc,
28+
baseOptions with { UseBase64VectorEncoding = false, ChunkSize = 100 },
29+
baseOptions with { UseBase64VectorEncoding = false, ChunkSize = 250 },
30+
baseOptions with { UseBase64VectorEncoding = false, ChunkSize = 500 },
31+
baseOptions with { UseBase64VectorEncoding = false, ChunkSize = 1000 },
32+
baseOptions with { UseBase64VectorEncoding = true , ChunkSize = 100 },
33+
baseOptions with { UseBase64VectorEncoding = true , ChunkSize = 250 },
34+
baseOptions with { UseBase64VectorEncoding = true , ChunkSize = 500 },
35+
baseOptions with { UseBase64VectorEncoding = true , ChunkSize = 1000 }
3836
};
3937

40-
var id = client.Infer.Id(person);
41-
var idByType = IdByType(person.GetType(), person);
42-
Console.WriteLine(id);
43-
Console.WriteLine(idByType);
44-
// This still errors on AOT compilation
45-
Console.WriteLine(client.SourceSerializer.SerializeToString(person));
38+
foreach (var testcase in cases)
39+
{
40+
Console.Write($"Base64: {(testcase.UseBase64VectorEncoding ? '1' : '0')}, Chunk size: {testcase.ChunkSize,4} == ");
41+
await VectorIngest.Ingest(testcase);
42+
}
43+
44+
public sealed record VectorIngestOptions
45+
{
46+
/// <summary>
47+
/// The path to the dataset file.
48+
/// </summary>
49+
public required string DatasetFile { get; init; }
50+
51+
/// <summary>
52+
/// The number of times the dataset is repeated.
53+
/// </summary>
54+
public int Repetitions { get; init; } = 1;
55+
56+
/// <summary>
57+
/// The chunk size for the individual bulk requests.
58+
/// </summary>
59+
public int ChunkSize { get; init; } = 100;
60+
61+
/// <summary>
62+
/// Configures whether vector data is encoded in <c>base64</c> format.
63+
/// </summary>
64+
public bool UseBase64VectorEncoding { get; init; } = true;
65+
66+
/// <summary>
67+
/// The maximum number of concurrent bulk operations allowed when processing tasks in parallel.
68+
/// </summary>
69+
public int MaxDegreeOfParallelism { get; init; } = 1;
70+
71+
/// <summary>
72+
/// The number of warmup passes to perform before measurements begin.
73+
/// </summary>
74+
public int WarmupPasses { get; init; } = 5;
75+
76+
/// <summary>
77+
/// The number of measurement passes to perform.
78+
/// </summary>
79+
public int MeasuredPasses { get; init; } = 3;
80+
81+
public required Uri ElasticsearchEndpoint { get; init; }
82+
public required string ElasticsearchUsername { get; init; }
83+
public required string ElasticsearchPassword { get; init; }
84+
}
85+
86+
public static class VectorIngest
87+
{
88+
public static async Task Ingest(VectorIngestOptions options)
89+
{
90+
var docs = LoadDataset(options.DatasetFile, options.Repetitions);
91+
92+
var sw = new Stopwatch();
93+
var elapsedSeconds = 0.0d;
94+
95+
var client = CreateClient(options);
96+
var indexName = $"bench-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
97+
98+
for (var i = 0; i < options.WarmupPasses + options.MeasuredPasses; i++)
99+
{
100+
await InitializeIndex(client, indexName, docs.First().Embedding.Length);
101+
102+
sw.Restart();
103+
client
104+
.BulkAll(docs, x => x
105+
.Index(indexName)
106+
.MaxDegreeOfParallelism(options.MaxDegreeOfParallelism)
107+
.Size(options.ChunkSize)
108+
)
109+
.Wait(TimeSpan.FromHours(1), _ => {});
110+
sw.Stop();
111+
112+
if (i >= options.WarmupPasses)
113+
{
114+
elapsedSeconds += sw.Elapsed.TotalSeconds;
115+
}
116+
117+
await client.Indices.RefreshAsync(x => x.Indices(indexName)).ConfigureAwait(false);
118+
119+
var count = await client.CountAsync(x => x.Indices(indexName)).ConfigureAwait(false);
120+
if (count.Count != docs.Length)
121+
{
122+
throw new Exception($"Document count mismatch: expected {docs.Length}, got {count.Count}");
123+
}
124+
}
125+
126+
await client.Indices.DeleteAsync(indexName).ConfigureAwait(false);
127+
128+
elapsedSeconds /= options.MeasuredPasses;
129+
var docsPerSec = docs.Length / elapsedSeconds;
130+
131+
Console.WriteLine($"{((int)(elapsedSeconds * 1000)),4}ms / {((int)docsPerSec),4} docs/s");
132+
}
133+
134+
private static OpenAiBenchmarkDocument[] LoadDataset(string filename, int repetitions)
135+
{
136+
return Enumerable.Repeat(0, repetitions).Select((_, i) => Load(filename, i)).Aggregate((a, b) => a.Union(b)).ToArray();
137+
138+
static IEnumerable<OpenAiBenchmarkDocument> Load(string filename, int i)
139+
{
140+
using var fs = File.OpenRead(filename);
141+
return JsonSerializer
142+
.DeserializeAsyncEnumerable<OpenAiBenchmarkDocument>(fs, BenchmarkJsonSerializerContext.Default.OpenAiBenchmarkDocument, true)
143+
.ToBlockingEnumerable()
144+
.Select(x => x! with { Id = $"{i}_{x.Id}"})
145+
.ToArray();
146+
}
147+
}
148+
149+
private static ElasticsearchClient CreateClient(VectorIngestOptions options)
150+
{
151+
var pool = new SingleNodePool(options.ElasticsearchEndpoint);
152+
153+
var settings = new ElasticsearchClientSettings(pool,
154+
sourceSerializer: (_, settings) =>
155+
new DefaultSourceSerializer(settings, BenchmarkJsonSerializerContext.Default/*, x => x.Converters.Remove(x.Converters.OfType<JsonConverter<float>>().Single())*/)
156+
)
157+
.Authentication(new BasicAuthentication(options.ElasticsearchUsername, options.ElasticsearchPassword))
158+
.MemoryStreamFactory(new RecyclableMemoryStreamFactory())
159+
.EnableHttpCompression(false)
160+
.FloatVectorDataEncoding(options.UseBase64VectorEncoding ? FloatVectorDataEncoding.Base64 : FloatVectorDataEncoding.Legacy);
161+
162+
return new ElasticsearchClient(settings);
163+
}
164+
165+
private static async Task InitializeIndex(ElasticsearchClient client, string indexName, int vectorDimensions)
166+
{
167+
if (await client.Indices.ExistsAsync(indexName).ConfigureAwait(false) is { Exists: true })
168+
{
169+
await client.Indices.DeleteAsync(indexName).ConfigureAwait(false);
170+
}
171+
172+
await client.Indices
173+
.CreateAsync<OpenAiBenchmarkDocument>(x => x
174+
.Index(indexName)
175+
.Mappings(x => x
176+
.Properties(x => x
177+
.Keyword(x => x.Id)
178+
.DenseVector(x => x.Embedding, x => x
179+
.Dims(vectorDimensions)
180+
.ElementType(DenseVectorElementType.Float)
181+
.IndexOptions(x => x.
182+
Type(DenseVectorIndexOptionsType.Flat)
183+
)
184+
)
185+
.Text(x => x.Title)
186+
.Text(x => x.Text)
187+
)
188+
)
189+
.WaitForActiveShards(1)
190+
)
191+
.ConfigureAwait(false);
192+
193+
await client.Indices.RefreshAsync(x => x.Indices(indexName)).ConfigureAwait(false);
194+
}
195+
}
196+
197+
[JsonSerializable(typeof(OpenAiBenchmarkDocument))]
198+
[JsonSerializable(typeof(OpenAiBenchmarkDocument[]))]
199+
[JsonSerializable(typeof(object))]
200+
public sealed partial class BenchmarkJsonSerializerContext :
201+
JsonSerializerContext
202+
{
203+
204+
}
205+
206+
public sealed record OpenAiBenchmarkDocument
207+
{
208+
[JsonPropertyName("docid")]
209+
public required string Id { get; init; }
210+
211+
[JsonPropertyName("title")]
212+
public required string Title { get; init; }
213+
214+
[JsonPropertyName("text")]
215+
public required string Text { get; init; }
46216

47-
[UnconditionalSuppressMessage("Trimming", "IL2072", Justification = "Can only annotate our implementation")]
48-
[UnconditionalSuppressMessage("Trimming", "IL2067", Justification = "Can only annotate our implementation")]
49-
string? IdByType(Type type, object instance) => client.Infer.Id(type, instance);
217+
[JsonConverter(typeof(FloatVectorDataConverter))]
218+
[JsonPropertyName("emb")]
219+
public ReadOnlyMemory<float> Embedding { get; init; }
220+
}

0 commit comments

Comments
 (0)