English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Analisi dettagliata della libreria di esecuzione multi-threading Nexus.js in JavaScript

Prima di tutto, se non sei familiare con questo progetto, ti consiglio di leggere una serie di articoli che ho scritto prima. Se non vuoi leggerli, non preoccuparti. Questi contenuti saranno anche trattati qui.

Ora, iniziamo.

L'anno scorso, ho iniziato a implementare Nexus.js, una libreria JavaScript per server multi-threading basata su Webkit/JavaScript kernel. Per un po' ho abbandonato questo progetto per alcune ragioni che non posso controllare, principalmente: non posso farmi lavorare a lungo.

Quindi, iniziamo a discutere dell'architettura di Nexus e di come funziona.

Ciclo degli eventi

Non c'è un ciclo degli eventi

C'è un pool di thread con un oggetto task (senza lock)

Ogni volta che si chiama setTimeout o setImmediate o si crea una Promise, il compito viene messo in coda alla coda dei compiti.

Ogni volta che si pianifica un compito, la thread disponibile per prima viene scelta e eseguita.

Elabora Promise sui core CPU. La chiamata a Promise.all() risolve in parallelo le Promise.

ES6

Supporta async/await e si raccomanda di utilizzarlo

Supporta for await(...)

Supporta la destrutturazione

Supporta async try/catch/finally

Modulo

Non supporta CommonJS. (require(...) e module.exports)

Tutti i moduli utilizzano la sintassi ES6 di import/export

Supporta l'importazione dinamica tramite import('file-or-packge').then(...)

Supporta import.meta, ad esempio: import.meta.filename e import.meta.dirname, ecc.

Funzionalità aggiuntive: supporta l'importazione diretta da URL, ad esempio:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

Nexus implementa la classe EventEmitter basata su Promise

Il gestore degli eventi viene ordinato su tutte le thread e viene eseguito in parallelo.

Il valore di ritorno di EventEmitter.emit(...) è una Promise, che può essere risolta in un array di valori restituiti dal gestore degli eventi.

ad esempio:

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on('test', value => { console.log(`fired test ${i}!`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on('returns-a-value', v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit('test', { payload: 'test 1' });
 console.log('testo primo completato!');
 await test.emit('test', { payload: 'test 2' });
 console.log('testo secondo completato!');
 const values = await test.emit('returns-a-value', 10);
 console.log('testo terzo completato, i valori restituiti sono:'); console.inspect(values);
}
start().catch(console.error);

I/O

Tutti gli input/output sono completati attraverso tre atomi: Device, Filter e Stream.

Tutti gli atomi di input/output implementano la classe EventEmitter.

Per utilizzare il Device, è necessario creare un ReadableStream o WritableStream sopra il Device.

Per operare sui dati, aggiungere i Filters a ReadableStream o WritableStream.

Infine, utilizzare source.pipe(...destinationStreams) e attendere source.resume() per elaborare i dati.

Tutte le operazioni di input/output sono eseguite utilizzando l'oggetto ArrayBuffer.

Filter ha tentato di utilizzare il metodo process(buffer) per elaborare i dati.

Ad esempio: utilizzare due file di output indipendenti per convertire UTF-8 in UTF6.

const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice('enwik8');
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));
  console.log('piping...');
  stream.pipe(...wstreams);
  console.log('streaming...');
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`finito in ${(Date.now() * startTime) / 1000} secondi!`);
 } catch (e) {
  console.error('Si è verificato un errore: ', e);
 }
}
start().catch(console.error);

TCP/UDP

Nexus.js fornisce una classe Acceptor, responsabile di associare l'indirizzo IP/porta e di ascoltare le connessioni

Ogni volta che si riceve una richiesta di connessione, viene attivato l'evento connection e viene fornito un dispositivo Socket.

Ogni istanza di Socket è un dispositivo I/O bidirezionale.

Puoi utilizzare ReadableStream e WritableStream per operare su Socket.

Esempio di base: (inviare “Hello World” al client)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on('connection', (socket, endpoint) => {
 const connId = count++;
 console.log(`connessione #${connId} da ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = 'Hello World!\n';
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on('data', buffer => console.log(`ricevuto messaggio: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} al ${endpoint.address}:${endpoint.port} disconnesso!`));
 console.log(`inviando saluto a #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind('127.0.0.1', 10000);
acceptor.listen();
console.log('server ready');

Http

Nexus fornisce la classe Nexus.Net.HTTP.Server, che基本上erita da TCPAcceptor

Interfacce di base

Quando il server ha completato l'analisi/verifica dei headers HTTP in entrata di base, utilizzerà la connessione e le stesse informazioni per scatenare l'evento connection

Ogni istanza di connessione ha un oggetto request e response. Questi sono dispositivi di input/output.

Puoi costruire ReadableStream e WritableStream per manipolare request/response.

Se connetti tramite pipeline a un oggetto Response, lo stream di input utilizzerà il modello di codifica a blocchi. Altrimenti, puoi utilizzare response.write() per scrivere una stringa regolare.

Esempio complesso: (server HTTP di base con codifica a blocchi, dettagli omessi)

....
/**
 * Crea uno stream di input da un percorso.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
funzione async createInputStream(path) {
 se (path.startsWith('/')) // Se inizia con '/', omettilo.
  path = path.substr(1);
 se (path.startsWith('.')) // Se inizia con '.', rifiutalo.
  lancia una nuova NotFoundError(path);
 se (path === '/' || !path) // Se è vuoto, imposta a index.html.
  path = 'index.html';
 /**
  * `import.meta.dirname` e `import.meta.filename` sostituiscono l'antico CommonJS `__dirname` e `__filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path);
 try {
  // Stat il percorso di destinazione.
  const {type} = await Nexus.FileSystem.stat(filePath);
  se (type === Nexus.FileSystem.FileType.Directory) // Se è una directory, restituisci il suo 'index.html'
   restituisci createInputStream(Nexus.FileSystem.join(filePath, 'index.html'));
  altrimenti se (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // Se non trovato, lancia NotFound.
   lancia una nuova NotFoundError(path);
 } catch(e) {
  se (e.code)
   lancia e;
  lancia una nuova NotFoundError(path);
 }
 try {
  // Prima di tutto, creiamo un dispositivo.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Poi restituiamo un nuovo ReadableStream creato utilizzando il nostro dispositivo di origine.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Contatore delle connessioni.
 */
let connections = 0;
/**
 * Crea un nuovo server HTTP.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// Un errore del server significa che è accaduto un errore mentre il server era in ascolto sulle connessioni.
// Possiamo ignorare tali errori, li mostriamo comunque.
server.on('error', e => {
 console.error(FgRed + Bright + 'Errore del server: ' + e.message + '\n' + e.stack, Reset);
});
/**
 * Ascolta le connessioni.
 */
server.on('connection', async (connection, peer) => {
 // Inizia con un ID di connessione di 0, incrementa con ogni nuova connessione.
 const connId = connections++;
 // Registra l'ora di inizio per questa connessione.
 const startTime = Date.now();
 // La destrutturazione è supportata, perché non usarla?
 const { request, response } = connection;
 // Parse the URL parts.
 const { path } = parseURL(request.url);
 // Here we'll store any errors that occur during the connection.
 const errors = [];
 // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
 let inStream, outStream;
 try {
  // Log the request.
  console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Set the 'Server' header.
  response.set('Server', `nexus.js/0.1.1`);
  // Create our input stream.
  inStream = await createInputStream(path);
  // Create our output stream.
  outStream = new Nexus.IO.WritableStream(response);
  // Hook all `error` events, add any errors to our `errors` array.
  inStream.on('error', e => { errors.push(e); });
  request.on('error', e => { errors.push(e); });
  response.on('error', e => { errors.push(e); });
  outStream.on('error', e => { errors.push(e); });
  // Imposta il tipo di contenuto e lo stato della richiesta.
  response
   .set('Content-Type', mimeType(path))
   .status(200);
  // Collega l'input all'output(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Riprendi il nostro file stream, questo fa sì che lo stream passi a una codifica chunked HTTP.
   // Questo restituirà una promessa che si risolverà solo dopo che l'ultimo byte (chunk HTTP) è stato scritto.
   await inStream.resume();
  } catch (e) {
   // Captura eventuali errori che si verificano durante lo streaming.
   errors.push(e);
  }
  // Disconnetti tutti i callback creati da `.pipe()`.
  return disconnect();
 } catch(e) {
  // Se si è verificato un errore, pusha nell'array.
  errors.push(e);
  // Imposta il tipo di contenuto, lo stato e scrivi un messaggio di base.
  response
   .set('Content-Type', 'text/plain')
   .status(e.code || 500)
   .send(e.message || 'Si è verificato un errore.');
 finally {
  // Chiudi gli stream manualmente. Questo è importante perché potremmo esaurire i gestori di file altrimenti.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Close the connection, has no real effect with keep-alive connections.
  await connection.close();
  // Grab the response's status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   '200': Bright + FgGreen, // Green for 200 (OK),
   '404': Bright + FgYellow, // Yellow for 404 (Not Found)
   '500': Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = '0.0.0.0', port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

Benchmark

I think I've covered everything that has been implemented so far. So now let's talk about performance.

This is the current benchmark of the aforementioned Http server, with 100 concurrent connections and a total of 10,000 requests:

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient).....done
Software del server:    nexus.js/0.1.1
Nome host del server:    localhost
Porta del server:      3000
Percorso del documento:     /
Lunghezza del documento:    8673 byte
Livello di conconrenza:   100
Tempo impiegato per i test:  9.991 secondi
Richieste completate:   10000
Richieste fallite:    0
Totale trasferito:   87880000 byte
HTML trasferito:    86730000 byte
Richieste al secondo:  1000.94 [#/sec] (media)
Tempo per richiesta:    99.906 [ms] (media)
Tempo per richiesta:    0.999 [ms] (media, su tutte le richieste contemporanee)
Velocità di trasferimento:     8590.14 [Kbytes/sec] ricevuti
Tempi di connessione (ms)
       min media[+/-sd] mediana  max
Connessione:    0  0  0.1   0    1
Elaborazione:   6  99 36.6   84   464
In attesa:    5  99 36.4   84   463
Totale:     6 100 36.6   84   464
Percentuale delle richieste servite entro un certo tempo (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (richiesta più lunga)

1000 richieste al secondo. Su un vecchio i7, su cui girano questo software di benchmark, un IDE che occupa 5GB di memoria e il server stesso.

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processore: 0
id fornitore: GenuineIntel
famiglia cpu: 6
modello: 60
nome modello: Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
step: 3
microcode: 0x22
MHz cpu: 3392.093
dimensione cache: 8192 KB
id fisico: 0
fratelli: 8
id del core: 0
core del cpu: 4
apicid: 0
apicid iniziale: 0
fpu: sì
fpu_exception: sì
livello cpuid: 13
wp: sì
flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bug: }}
bogomips: 6784.18
Dimensione clflush: 64
Allineamento cache: 64
Dimensioni indirizzo: 39 bit fisici, 48 bit virtuali
Gestione energia:

Ho provato 1000 richieste concorrenti, ma ApacheBench è andato in timeout a causa di molti socket aperti. Ho provato httperf, ecco i risultati:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: avviso: limite di file aperti > FD_SETSIZE; limitando il numero massimo di file aperti a FD_SETSIZE
Lunghezza massima impulso connessione: 262
Totale: connessioni 9779 richieste 9779 risposte 9779 durata test 10.029 s
Tasso connessione: 975.1 conn/s (1.0 ms/conn, <=1022 connessioni contemporanee)
Tempo connessione [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Tempo connessione [ms]: connessione 207.3
Lunghezza connessione [risposte/connessione]: 1.000
Tasso richiesta: 975.1 req/s (1.0 ms/req)
Dimensione richiesta [B]: 62.0
回复速率[回复/秒]:最小 903.5 平均 974.6 最大 1045.7 标准差 100.5(2个样本)
回复时间[ms]:响应 129.5 传输 1.1
回复大小[B]:头部 89.0 内容 8660.0 尾部 2.0(总计 8751.0)
回复状态:1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
CPU时间[s]:用户 0.35 系统 9.67(用户 3.5% 系统 96.4% 总计 99.9%)
网络I/O:8389.9 KB/s (68.7*10^6 bps)
错误:总计 221 客户端超时 0 套接字超时 0 连接被拒绝 0 连接重置 0
错误:fd-unavail 221 addrunavail 0 ftab-full 0 other 0

正如你看到的,它仍然能工作。尽管由于压力,有些连接会超时。我仍在研究导致这个问题的原因。

以上就是这篇关于Nexus.js学习知识的全部内容,大家有问题可以在下方留言讨论,感谢对呐喊教程的支持。

声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:notice#oldtoolbag.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

Ti potrebbe interessare