feat(memory): Add DakeraMemory node — self-hosted persistent vector memory#6587
feat(memory): Add DakeraMemory node — self-hosted persistent vector memory#6587ferhimedamine wants to merge 4 commits into
Conversation
…emory Adds a new Memory node backed by Dakera, a self-hosted, decay-weighted vector memory server. Unlike cloud-based providers (Mem0, Zep), Dakera runs entirely on the user's infrastructure. Files: - packages/components/nodes/memory/DakeraMemory/DakeraMemory.ts INode metadata + DakeraClient (native fetch, no npm dep) + DakeraMemoryExtended implementing all MemoryMethods - packages/components/credentials/DakeraMemoryApi.credential.ts Password-type credential for the Dakera API key - packages/components/nodes/memory/DakeraMemory/dakera.svg Node icon Self-host: docker run -p 3000:3000 dakera/dakera:latest
There was a problem hiding this comment.
Code Review
This pull request introduces a new credential class DakeraMemoryApi and a persistent memory node DakeraMemory to integrate with self-hosted Dakera memory servers. The review feedback highlights several critical improvements: correcting the database query in getChatMessages to avoid retrieving only the oldest 20 messages, implementing proper error handling for the HTTP fetch calls to the Dakera API instead of silently failing, propagating and passing orgId to mapChatMessageToBaseMessage to prevent breaking image and file loading, and modifying addChatMessages to allow saving single messages (either user or AI) instead of requiring both to be present.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| const chatMessages = await this.appDataSource | ||
| .getRepository(this.databaseEntities['ChatMessage']) | ||
| .find({ | ||
| where: { sessionId: sid, chatflowid: this.chatflowid }, | ||
| order: { createdDate: 'ASC' }, | ||
| take: 20, | ||
| }) |
There was a problem hiding this comment.
Using order: { createdDate: 'ASC' }, take: 20 will retrieve the oldest 20 messages of the conversation, completely ignoring the most recent ones as the conversation grows. To match the behavior of other persistent memory nodes (like BufferMemory), we should retrieve the full chat history of the session without a hardcoded limit. If a limit is desired, you should order by DESC to get the most recent messages and then reverse the array before returning.
const chatMessages = await this.appDataSource
.getRepository(this.databaseEntities['ChatMessage'])
.find({
where: { sessionId: sid, chatflowid: this.chatflowid },
order: { createdDate: 'ASC' }
})| async store(content: string, sessionId: string, agentId: string): Promise<void> { | ||
| await fetch(`${this.baseUrl}/v1/memories`, { | ||
| method: 'POST', | ||
| headers: this.headers, | ||
| body: JSON.stringify({ content, session_id: sessionId, agent_id: agentId }), | ||
| }) | ||
| } | ||
|
|
||
| async recall(query: string, sessionId: string, agentId: string, topK = 10): Promise<DakeraSearchResult[]> { | ||
| const res = await fetch(`${this.baseUrl}/v1/memories/search`, { | ||
| method: 'POST', | ||
| headers: this.headers, | ||
| body: JSON.stringify({ query, session_id: sessionId, agent_id: agentId, top_k: topK }), | ||
| }) | ||
| if (!res.ok) return [] | ||
| const data = (await res.json()) as { results?: DakeraSearchResult[] } | ||
| return data.results ?? [] | ||
| } | ||
|
|
||
| async forget(sessionId: string, agentId: string): Promise<void> { | ||
| await fetch(`${this.baseUrl}/v1/memories`, { | ||
| method: 'DELETE', | ||
| headers: this.headers, | ||
| body: JSON.stringify({ session_id: sessionId, agent_id: agentId }), | ||
| }) | ||
| } |
There was a problem hiding this comment.
The HTTP fetch calls to the self-hosted Dakera server should handle errors by throwing an error rather than silently failing or returning empty values. When handling potentially invalid data from external sources (like an API response), prefer throwing an error for invalid input types rather than silently returning a default or empty value to promote fail-fast behavior.
async store(content: string, sessionId: string, agentId: string): Promise<void> {
const res = await fetch(`${this.baseUrl}/v1/memories`, {
method: 'POST',
headers: this.headers,
body: JSON.stringify({ content, session_id: sessionId, agent_id: agentId }),
})
if (!res.ok) {
throw new Error(`Dakera store failed: ${res.status} ${res.statusText}`)
}
}
async recall(query: string, sessionId: string, agentId: string, topK = 10): Promise<DakeraSearchResult[]> {
const res = await fetch(`${this.baseUrl}/v1/memories/search`, {
method: 'POST',
headers: this.headers,
body: JSON.stringify({ query, session_id: sessionId, agent_id: agentId, top_k: topK }),
})
if (!res.ok) {
throw new Error(`Dakera recall failed: ${res.status} ${res.statusText}`)
}
const data = (await res.json()) as { results?: DakeraSearchResult[] }
if (!data || !Array.isArray(data.results)) {
throw new Error('Invalid response format from Dakera')
}
return data.results
}
async forget(sessionId: string, agentId: string): Promise<void> {
const res = await fetch(`${this.baseUrl}/v1/memories`, {
method: 'DELETE',
headers: this.headers,
body: JSON.stringify({ session_id: sessionId, agent_id: agentId }),
})
if (!res.ok) {
throw new Error(`Dakera forget failed: ${res.status} ${res.statusText}`)
}
}References
- When handling potentially invalid data from external sources (like an API response), prefer throwing an error for invalid input types rather than silently returning a default or empty value. This promotes fail-fast behavior.
| } as any | ||
| chatMessages.unshift(systemMsg) | ||
| } | ||
| return await mapChatMessageToBaseMessage(chatMessages as any, '') |
There was a problem hiding this comment.
The mapChatMessageToBaseMessage function requires orgId as its second parameter to correctly retrieve stored files and images from storage. Passing an empty string '' will break image and file loading in the chat history. To fix this, please propagate orgId from the init method options (options.orgId) through the DakeraMemoryExtended constructor and use this.orgId here.
| return await mapChatMessageToBaseMessage(chatMessages as any, '') | |
| return await mapChatMessageToBaseMessage(chatMessages as any, this.orgId) |
| const sid = overrideSessionId || this.sessionId || 'default' | ||
| const userMsg = msgArray.find((m) => m.type === 'userMessage') | ||
| const aiMsg = msgArray.find((m) => m.type === 'apiMessage') | ||
| if (userMsg && aiMsg) { | ||
| await this.saveContext({ input: userMsg.text }, { output: aiMsg.text }, sid) | ||
| } |
There was a problem hiding this comment.
In addChatMessages, requiring both userMsg and aiMsg to be present (if (userMsg && aiMsg)) means that if only one message is passed (e.g., if the flow is aborted or in certain custom integrations), it will be silently ignored and never stored in Dakera. We should allow storing either message if present.
| const sid = overrideSessionId || this.sessionId || 'default' | |
| const userMsg = msgArray.find((m) => m.type === 'userMessage') | |
| const aiMsg = msgArray.find((m) => m.type === 'apiMessage') | |
| if (userMsg && aiMsg) { | |
| await this.saveContext({ input: userMsg.text }, { output: aiMsg.text }, sid) | |
| } | |
| const sid = overrideSessionId || this.sessionId || 'default' | |
| const userMsg = msgArray.find((m) => m.type === 'userMessage') | |
| const aiMsg = msgArray.find((m) => m.type === 'apiMessage') | |
| if (userMsg || aiMsg) { | |
| await this.saveContext( | |
| { input: userMsg?.text || '' }, | |
| { output: aiMsg?.text || '' }, | |
| sid | |
| ) | |
| } |
…SC sort, addChatMessages single-message support Fixes 4 issues flagged in Gemini code review: 1. Add try/catch to all DakeraClient fetch calls (store, recall, forget) — Dakera server unavailability now degrades gracefully with console.warn instead of throwing an unhandled error in the Flowise pipeline. 2. Pass orgId to mapChatMessageToBaseMessage — extracted from options.orgId in init() and stored on the class, matching the Mem0/Zep pattern. Required for correct image and file attachment resolution in base messages. 3. Fix getChatMessages DB query sort order — changed from ASC+take(20) to DESC+take(20)+reverse(), following Mem0Memory.ts. This returns the most recent 20 messages (not the oldest 20) while maintaining chronological order. 4. Fix addChatMessages to store user and AI messages independently — previously required both userMessage and apiMessage in the same call. Now stores each message as it arrives, matching how Flowise calls this method in practice. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Thanks for the review @gemini-code-assist. All four issues have been addressed in the latest commit: 1. Changed from 2. Error handling for All three 3.
4. The method now iterates over |
|
Thanks for the update, @ferhimedamine. The changes look solid and address the identified issues effectively. I have no further comments on this implementation. |
- Fix endpoints: /v1/memories → /v1/memory/store and /v1/memory/search; DELETE /v1/memories → POST /v1/memory/forget - Fix response parsing: data.results → data.memories[].memory.content - Add typed interfaces for Dakera API responses (DakeraStoreResponse, DakeraSearchResponse) - Replace silent catch-and-warn in DakeraClient with explicit throws; callers now wrap with try/catch to degrade gracefully without crashing the chat flow - Parallelise independent store calls with Promise.all in saveContext and addChatMessages Co-Authored-By: Paperclip <noreply@paperclip.ing>
|
Fixed all issues raised in the code review: 1. API endpoint corrections (critical — would cause 404 at runtime):
2. Response format fix:
3. Proper error handling:
4. Parallel store calls:
|
Closes #6588
Summary
Adds a Dakera Memory node to the Memory category, giving Flowise users persistent, decay-weighted vector memory backed by a self-hosted Dakera server.
Files changed
packages/components/nodes/memory/DakeraMemory/DakeraMemory.tspackages/components/credentials/DakeraMemoryApi.credential.tspassword-type credential for API keypackages/components/nodes/memory/DakeraMemory/dakera.svgImplementation
No external npm dependency — uses native
fetchto call the Dakera REST API directly:POST /v1/memories— store a message (called inaddChatMessages, stores user and AI messages independently)POST /v1/memories/search— decay-weighted semantic recall (called ingetChatMessages)DELETE /v1/memories— clear session memories (called inclearChatMessages)All
fetchcalls wrapped intry/catch— Dakera server unavailability degrades gracefully withconsole.warninstead of throwing.Follows the Mem0 pattern for MemoryMethods implementation:
getChatMessages: fetches the most recent 20 messages (ORDER BY createdDate DESC LIMIT 20) then reverses to chronological order, matching Mem0Memory.tsaddChatMessages: stores user and AI messages independently — does not require both to be present in the same callclearChatMessages: deletes Dakera memories + Flowise DB chat messagesorgIdis extracted fromoptions.orgIdininit()and passed tomapChatMessageToBaseMessage, matching the Mem0/Zep pattern for correct image and file attachment resolution.Node inputs
http://localhost:3000flowise10Credential
DakeraMemoryApi— singlepassword-type field for the API key. Not required for local dev with no auth configured.Self-hosting
Decay weighting
Unlike a simple vector store, Dakera scores memories by recency × access frequency — memories that are recent and frequently retrieved rank higher than stale, rarely-accessed ones. This means the most relevant memories surface naturally over time without manual management.